diff --git a/Makefile b/Makefile index f84d9b8..e85a557 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ lint: golangci-lint run build: - go build -o bin/routewatch cmd/routewatch/main.go + CGO_ENABLED=1 go build -o bin/routewatch cmd/routewatch/main.go clean: rm -rf bin/ diff --git a/internal/database/database.go b/internal/database/database.go index d73c9eb..8b361a5 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -340,75 +340,6 @@ func (d *Database) RecordPeering(fromASNID, toASNID string, timestamp time.Time) return nil } -// UpdateLiveRoute updates the live routing table for an announcement -func (d *Database) UpdateLiveRoute( - prefixID, originASNID uuid.UUID, - peerASN int, - nextHop string, - timestamp time.Time, -) error { - // Use SQLite's UPSERT capability to avoid the SELECT+UPDATE/INSERT pattern - // This reduces the number of queries and improves performance - // Note: We removed the WHERE clause from ON CONFLICT UPDATE because - // if we're updating, we want to update regardless of withdrawn_at status - err := d.exec(` - INSERT INTO live_routes (id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at, withdrawn_at) - VALUES (?, ?, ?, ?, ?, ?, NULL) - ON CONFLICT(prefix_id, origin_asn_id, peer_asn) DO UPDATE SET - next_hop = excluded.next_hop, - announced_at = excluded.announced_at, - withdrawn_at = NULL`, - generateUUID().String(), prefixID.String(), originASNID.String(), - peerASN, nextHop, timestamp) - - return err -} - -// WithdrawLiveRoute marks a route as withdrawn in the live routing table -func (d *Database) WithdrawLiveRoute(prefixID uuid.UUID, peerASN int, timestamp time.Time) error { - err := d.exec(` - UPDATE live_routes - SET withdrawn_at = ? - WHERE prefix_id = ? AND peer_asn = ? AND withdrawn_at IS NULL`, - timestamp, prefixID.String(), peerASN) - - return err -} - -// GetActiveLiveRoutes returns all currently active routes (not withdrawn) -func (d *Database) GetActiveLiveRoutes() ([]LiveRoute, error) { - rows, err := d.query(` - SELECT id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at - FROM live_routes - WHERE withdrawn_at IS NULL - ORDER BY announced_at DESC`) - if err != nil { - return nil, err - } - defer func() { - _ = rows.Close() - }() - - var routes []LiveRoute - for rows.Next() { - var route LiveRoute - var idStr, prefixIDStr, originASNIDStr string - err := rows.Scan(&idStr, &prefixIDStr, &originASNIDStr, - &route.PeerASN, &route.NextHop, &route.AnnouncedAt) - if err != nil { - return nil, err - } - - route.ID, _ = uuid.Parse(idStr) - route.PrefixID, _ = uuid.Parse(prefixIDStr) - route.OriginASNID, _ = uuid.Parse(originASNIDStr) - - routes = append(routes, route) - } - - return routes, rows.Err() -} - // UpdatePeer updates or creates a BGP peer record func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error { tx, err := d.beginTx() @@ -495,13 +426,6 @@ func (d *Database) GetStats() (Stats, error) { return stats, err } - // Count live routes - d.logger.Info("Counting live routes") - err = d.queryRow("SELECT COUNT(*) FROM live_routes WHERE withdrawn_at IS NULL").Scan(&stats.LiveRoutes) - if err != nil { - return stats, err - } - d.logger.Info("Stats collection complete") return stats, nil diff --git a/internal/database/interface.go b/internal/database/interface.go index ef933de..bd1117f 100644 --- a/internal/database/interface.go +++ b/internal/database/interface.go @@ -2,8 +2,6 @@ package database import ( "time" - - "github.com/google/uuid" ) // Stats contains database statistics @@ -13,7 +11,6 @@ type Stats struct { IPv4Prefixes int IPv6Prefixes int Peerings int - LiveRoutes int } // Store defines the interface for database operations @@ -30,11 +27,6 @@ type Store interface { // Peering operations RecordPeering(fromASNID, toASNID string, timestamp time.Time) error - // Live route operations - UpdateLiveRoute(prefixID, originASNID uuid.UUID, peerASN int, nextHop string, timestamp time.Time) error - WithdrawLiveRoute(prefixID uuid.UUID, peerASN int, timestamp time.Time) error - GetActiveLiveRoutes() ([]LiveRoute, error) - // Statistics GetStats() (Stats, error) diff --git a/internal/database/models.go b/internal/database/models.go index b9777f7..89b9051 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -43,15 +43,3 @@ type ASNPeering struct { FirstSeen time.Time `json:"first_seen"` LastSeen time.Time `json:"last_seen"` } - -// LiveRoute represents the current state of a route in the live routing table -type LiveRoute struct { - ID uuid.UUID `json:"id"` - PrefixID uuid.UUID `json:"prefix_id"` - OriginASNID uuid.UUID `json:"origin_asn_id"` - PeerASN int `json:"peer_asn"` - Path string `json:"path"` - NextHop string `json:"next_hop"` - AnnouncedAt time.Time `json:"announced_at"` - WithdrawnAt *time.Time `json:"withdrawn_at"` -} diff --git a/internal/database/schema.sql b/internal/database/schema.sql index eccf426..6b6941c 100644 --- a/internal/database/schema.sql +++ b/internal/database/schema.sql @@ -48,20 +48,6 @@ CREATE TABLE IF NOT EXISTS bgp_peers ( last_message_type TEXT ); --- Live routing table: current state of announced routes -CREATE TABLE IF NOT EXISTS live_routes ( - id TEXT PRIMARY KEY, - prefix_id TEXT NOT NULL, - origin_asn_id TEXT NOT NULL, - peer_asn INTEGER NOT NULL, - next_hop TEXT, - announced_at DATETIME NOT NULL, - withdrawn_at DATETIME, - FOREIGN KEY (prefix_id) REFERENCES prefixes(id), - FOREIGN KEY (origin_asn_id) REFERENCES asns(id), - UNIQUE(prefix_id, origin_asn_id, peer_asn) -); - CREATE INDEX IF NOT EXISTS idx_prefixes_ip_version ON prefixes(ip_version); CREATE INDEX IF NOT EXISTS idx_prefixes_version_prefix ON prefixes(ip_version, prefix); CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp); @@ -71,43 +57,6 @@ CREATE INDEX IF NOT EXISTS idx_asn_peerings_from_asn ON asn_peerings(from_asn_id CREATE INDEX IF NOT EXISTS idx_asn_peerings_to_asn ON asn_peerings(to_asn_id); CREATE INDEX IF NOT EXISTS idx_asn_peerings_lookup ON asn_peerings(from_asn_id, to_asn_id); --- Indexes for live routes table -CREATE INDEX IF NOT EXISTS idx_live_routes_active - ON live_routes(prefix_id, origin_asn_id) - WHERE withdrawn_at IS NULL; - -CREATE INDEX IF NOT EXISTS idx_live_routes_origin - ON live_routes(origin_asn_id) - WHERE withdrawn_at IS NULL; - -CREATE INDEX IF NOT EXISTS idx_live_routes_prefix - ON live_routes(prefix_id) - WHERE withdrawn_at IS NULL; - --- Critical index for the most common query pattern -CREATE INDEX IF NOT EXISTS idx_live_routes_lookup - ON live_routes(prefix_id, origin_asn_id, peer_asn) - WHERE withdrawn_at IS NULL; - --- Index for withdrawal updates by prefix and peer -CREATE INDEX IF NOT EXISTS idx_live_routes_withdraw - ON live_routes(prefix_id, peer_asn) - WHERE withdrawn_at IS NULL; - --- Covering index for SELECT id queries (includes id in index) -CREATE INDEX IF NOT EXISTS idx_live_routes_covering - ON live_routes(prefix_id, origin_asn_id, peer_asn, id) - WHERE withdrawn_at IS NULL; - --- Index for UPDATE by id operations -CREATE INDEX IF NOT EXISTS idx_live_routes_id - ON live_routes(id); - --- Index for stats queries -CREATE INDEX IF NOT EXISTS idx_live_routes_stats - ON live_routes(withdrawn_at) - WHERE withdrawn_at IS NULL; - -- Additional indexes for prefixes table CREATE INDEX IF NOT EXISTS idx_prefixes_prefix ON prefixes(prefix); diff --git a/internal/database/slowquery.go b/internal/database/slowquery.go index 9c4af75..8ebc4cd 100644 --- a/internal/database/slowquery.go +++ b/internal/database/slowquery.go @@ -26,6 +26,7 @@ func (d *Database) queryRow(query string, args ...interface{}) *sql.Row { } // query wraps Query with slow query logging +// nolint:unused // kept for future use to ensure all queries go through slow query logging func (d *Database) query(query string, args ...interface{}) (*sql.Rows, error) { start := time.Now() defer logSlowQuery(d.logger, query, start) diff --git a/internal/routewatch/app.go b/internal/routewatch/app.go index 0d3bae1..7a95214 100644 --- a/internal/routewatch/app.go +++ b/internal/routewatch/app.go @@ -11,6 +11,7 @@ import ( "git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/metrics" + "git.eeqj.de/sneak/routewatch/internal/routingtable" "git.eeqj.de/sneak/routewatch/internal/server" "git.eeqj.de/sneak/routewatch/internal/streamer" @@ -33,30 +34,33 @@ func NewConfig() Config { type Dependencies struct { fx.In - DB database.Store - Streamer *streamer.Streamer - Server *server.Server - Logger *slog.Logger - Config Config `optional:"true"` + DB database.Store + RoutingTable *routingtable.RoutingTable + Streamer *streamer.Streamer + Server *server.Server + Logger *slog.Logger + Config Config `optional:"true"` } // RouteWatch represents the main application instance type RouteWatch struct { - db database.Store - streamer *streamer.Streamer - server *server.Server - logger *slog.Logger - maxRuntime time.Duration + db database.Store + routingTable *routingtable.RoutingTable + streamer *streamer.Streamer + server *server.Server + logger *slog.Logger + maxRuntime time.Duration } // New creates a new RouteWatch instance func New(deps Dependencies) *RouteWatch { return &RouteWatch{ - db: deps.DB, - streamer: deps.Streamer, - server: deps.Server, - logger: deps.Logger, - maxRuntime: deps.Config.MaxRuntime, + db: deps.DB, + routingTable: deps.RoutingTable, + streamer: deps.Streamer, + server: deps.Server, + logger: deps.Logger, + maxRuntime: deps.Config.MaxRuntime, } } @@ -76,6 +80,10 @@ func (rw *RouteWatch) Run(ctx context.Context) error { dbHandler := NewDatabaseHandler(rw.db, rw.logger) rw.streamer.RegisterHandler(dbHandler) + // Register routing table handler to maintain in-memory routing table + rtHandler := NewRoutingTableHandler(rw.routingTable, rw.logger) + rw.streamer.RegisterHandler(rtHandler) + // Register peer tracking handler to track all peers peerHandler := NewPeerHandler(rw.db, rw.logger) rw.streamer.RegisterHandler(peerHandler) @@ -154,8 +162,12 @@ func getModule() fx.Option { }, fx.As(new(database.Store)), ), + routingtable.New, streamer.New, - server.New, + fx.Annotate( + server.New, + fx.ParamTags(``, ``, ``, ``), + ), New, ), ) diff --git a/internal/routewatch/app_integration_test.go b/internal/routewatch/app_integration_test.go index 59b79e3..fe2c1d8 100644 --- a/internal/routewatch/app_integration_test.go +++ b/internal/routewatch/app_integration_test.go @@ -9,6 +9,7 @@ import ( "git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/metrics" + "git.eeqj.de/sneak/routewatch/internal/routingtable" "git.eeqj.de/sneak/routewatch/internal/server" "git.eeqj.de/sneak/routewatch/internal/streamer" "github.com/google/uuid" @@ -129,35 +130,6 @@ func (m *mockStore) RecordPeering(fromASNID, toASNID string, _ time.Time) error return nil } -// UpdateLiveRoute mock implementation -func (m *mockStore) UpdateLiveRoute(prefixID, originASNID uuid.UUID, peerASN int, _ string, _ time.Time) error { - m.mu.Lock() - defer m.mu.Unlock() - - key := prefixID.String() + "_" + originASNID.String() + "_" + string(rune(peerASN)) - if !m.Routes[key] { - m.Routes[key] = true - m.RouteCount++ - } - - return nil -} - -// WithdrawLiveRoute mock implementation -func (m *mockStore) WithdrawLiveRoute(_ uuid.UUID, _ int, _ time.Time) error { - m.mu.Lock() - defer m.mu.Unlock() - - m.WithdrawalCount++ - - return nil -} - -// GetActiveLiveRoutes mock implementation -func (m *mockStore) GetActiveLiveRoutes() ([]database.LiveRoute, error) { - return []database.LiveRoute{}, nil -} - // UpdatePeer mock implementation func (m *mockStore) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error { // Simple mock - just return nil @@ -180,7 +152,6 @@ func (m *mockStore) GetStats() (database.Stats, error) { IPv4Prefixes: m.IPv4Prefixes, IPv6Prefixes: m.IPv6Prefixes, Peerings: m.PeeringCount, - LiveRoutes: m.RouteCount, }, nil } @@ -197,15 +168,19 @@ func TestRouteWatchLiveFeed(t *testing.T) { // Create streamer s := streamer.New(logger, metricsTracker) + // Create routing table + rt := routingtable.New() + // Create server - srv := server.New(mockDB, s, logger) + srv := server.New(mockDB, rt, s, logger) // Create RouteWatch with 5 second limit deps := Dependencies{ - DB: mockDB, - Streamer: s, - Server: srv, - Logger: logger, + DB: mockDB, + RoutingTable: rt, + Streamer: s, + Server: srv, + Logger: logger, Config: Config{ MaxRuntime: 5 * time.Second, }, @@ -242,8 +217,4 @@ func TestRouteWatchLiveFeed(t *testing.T) { } t.Logf("Recorded %d AS peering relationships in 5 seconds", stats.Peerings) - if stats.LiveRoutes == 0 { - t.Error("Expected to have some active routes") - } - t.Logf("Active routes: %d", stats.LiveRoutes) } diff --git a/internal/routewatch/dbhandler.go b/internal/routewatch/dbhandler.go index 9e29281..ad090a3 100644 --- a/internal/routewatch/dbhandler.go +++ b/internal/routewatch/dbhandler.go @@ -2,7 +2,6 @@ package routewatch import ( "log/slog" - "strconv" "git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/ristypes" @@ -33,14 +32,6 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) { // Use the pre-parsed timestamp timestamp := msg.ParsedTimestamp - // Parse peer ASN - peerASN, err := strconv.Atoi(msg.PeerASN) - if err != nil { - h.logger.Error("Failed to parse peer ASN", "peer_asn", msg.PeerASN, "error", err) - - return - } - // Get origin ASN from path (last element) var originASN int if len(msg.Path) > 0 { @@ -51,7 +42,7 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) { for _, announcement := range msg.Announcements { for _, prefix := range announcement.Prefixes { // Get or create prefix - p, err := h.db.GetOrCreatePrefix(prefix, timestamp) + _, err := h.db.GetOrCreatePrefix(prefix, timestamp) if err != nil { h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err) @@ -59,30 +50,13 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) { } // Get or create origin ASN - asn, err := h.db.GetOrCreateASN(originASN, timestamp) + _, err = h.db.GetOrCreateASN(originASN, timestamp) if err != nil { h.logger.Error("Failed to get/create ASN", "asn", originASN, "error", err) continue } - // Update live route - err = h.db.UpdateLiveRoute( - p.ID, - asn.ID, - peerASN, - announcement.NextHop, - timestamp, - ) - if err != nil { - h.logger.Error("Failed to update live route", - "prefix", prefix, - "origin_asn", originASN, - "peer_asn", peerASN, - "error", err, - ) - } - // TODO: Record the announcement in the announcements table // Process AS path to update peerings if len(msg.Path) > 1 { @@ -122,23 +96,13 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) { // Process withdrawals for _, prefix := range msg.Withdrawals { // Get prefix - p, err := h.db.GetOrCreatePrefix(prefix, timestamp) + _, err := h.db.GetOrCreatePrefix(prefix, timestamp) if err != nil { h.logger.Error("Failed to get prefix for withdrawal", "prefix", prefix, "error", err) continue } - // Withdraw the route - err = h.db.WithdrawLiveRoute(p.ID, peerASN, timestamp) - if err != nil { - h.logger.Error("Failed to withdraw route", - "prefix", prefix, - "peer_asn", peerASN, - "error", err, - ) - } - - // TODO: Record the withdrawal in the withdrawals table + // TODO: Record the withdrawal in the announcements table as a withdrawal } } diff --git a/internal/routewatch/routingtablehandler.go b/internal/routewatch/routingtablehandler.go new file mode 100644 index 0000000..f1c679b --- /dev/null +++ b/internal/routewatch/routingtablehandler.go @@ -0,0 +1,133 @@ +package routewatch + +import ( + "log/slog" + "strconv" + + "git.eeqj.de/sneak/routewatch/internal/ristypes" + "git.eeqj.de/sneak/routewatch/internal/routingtable" + "github.com/google/uuid" +) + +// RoutingTableHandler handles BGP messages and updates the in-memory routing table +type RoutingTableHandler struct { + rt *routingtable.RoutingTable + logger *slog.Logger +} + +// NewRoutingTableHandler creates a new routing table handler +func NewRoutingTableHandler(rt *routingtable.RoutingTable, logger *slog.Logger) *RoutingTableHandler { + return &RoutingTableHandler{ + rt: rt, + logger: logger, + } +} + +// WantsMessage returns true if this handler wants to process messages of the given type +func (h *RoutingTableHandler) WantsMessage(messageType string) bool { + // We only care about UPDATE messages for the routing table + return messageType == "UPDATE" +} + +// HandleMessage processes a RIS message and updates the routing table +func (h *RoutingTableHandler) HandleMessage(msg *ristypes.RISMessage) { + // Use the pre-parsed timestamp + timestamp := msg.ParsedTimestamp + + // Parse peer ASN + peerASN, err := strconv.Atoi(msg.PeerASN) + if err != nil { + h.logger.Error("Failed to parse peer ASN", "peer_asn", msg.PeerASN, "error", err) + + return + } + + // Get origin ASN from path (last element) + var originASN int + if len(msg.Path) > 0 { + originASN = msg.Path[len(msg.Path)-1] + } + + // Process announcements + for _, announcement := range msg.Announcements { + for _, prefix := range announcement.Prefixes { + // Generate deterministic UUIDs based on the prefix and origin ASN + // This ensures consistency across restarts + prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix)) + originASNID := uuid.NewSHA1(uuid.NameSpaceOID, []byte(strconv.Itoa(originASN))) + + // Create route for the routing table + route := &routingtable.Route{ + PrefixID: prefixID, + Prefix: prefix, + OriginASNID: originASNID, + OriginASN: originASN, + PeerASN: peerASN, + ASPath: msg.Path, + NextHop: announcement.NextHop, + AnnouncedAt: timestamp, + } + + // Add route to routing table + h.rt.AddRoute(route) + + h.logger.Debug("Added route to routing table", + "prefix", prefix, + "origin_asn", originASN, + "peer_asn", peerASN, + "path", msg.Path, + ) + } + } + + // Process withdrawals + for _, prefix := range msg.Withdrawals { + // Generate deterministic UUID for the prefix + prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix)) + + // Withdraw all routes for this prefix from this peer + count := h.rt.WithdrawRoutesByPrefixAndPeer(prefixID, peerASN) + + h.logger.Debug("Withdrew routes from routing table", + "prefix", prefix, + "peer_asn", peerASN, + "routes_removed", count, + ) + } +} + +// GetRoutingTableStats returns statistics about the routing table +func (h *RoutingTableHandler) GetRoutingTableStats() map[string]int { + return h.rt.Stats() +} + +// GetActiveRouteCount returns the number of active routes +func (h *RoutingTableHandler) GetActiveRouteCount() int { + return h.rt.Size() +} + +// GetRoutesByPrefix returns all routes for a specific prefix +func (h *RoutingTableHandler) GetRoutesByPrefix(prefixID uuid.UUID) []*routingtable.Route { + return h.rt.GetRoutesByPrefix(prefixID) +} + +// GetRoutesByOriginASN returns all routes originated by a specific ASN +func (h *RoutingTableHandler) GetRoutesByOriginASN(originASNID uuid.UUID) []*routingtable.Route { + return h.rt.GetRoutesByOriginASN(originASNID) +} + +// GetRoutesByPeerASN returns all routes received from a specific peer ASN +func (h *RoutingTableHandler) GetRoutesByPeerASN(peerASN int) []*routingtable.Route { + return h.rt.GetRoutesByPeerASN(peerASN) +} + +// GetAllRoutes returns all active routes +func (h *RoutingTableHandler) GetAllRoutes() []*routingtable.Route { + return h.rt.GetAllRoutes() +} + +// ClearRoutingTable clears all routes from the routing table +func (h *RoutingTableHandler) ClearRoutingTable() { + h.rt.Clear() + h.logger.Info("Cleared routing table") +} diff --git a/internal/routingtable/routingtable.go b/internal/routingtable/routingtable.go new file mode 100644 index 0000000..aad6cb1 --- /dev/null +++ b/internal/routingtable/routingtable.go @@ -0,0 +1,299 @@ +// Package routingtable provides a thread-safe in-memory representation of the DFZ routing table. +package routingtable + +import ( + "fmt" + "sync" + "time" + + "github.com/google/uuid" +) + +// Route represents a single route entry in the routing table +type Route struct { + PrefixID uuid.UUID `json:"prefix_id"` + Prefix string `json:"prefix"` // The actual prefix string (e.g., "10.0.0.0/8") + OriginASNID uuid.UUID `json:"origin_asn_id"` + OriginASN int `json:"origin_asn"` // The actual ASN number + PeerASN int `json:"peer_asn"` + ASPath []int `json:"as_path"` // Full AS path + NextHop string `json:"next_hop"` + AnnouncedAt time.Time `json:"announced_at"` +} + +// RouteKey uniquely identifies a route in the table +type RouteKey struct { + PrefixID uuid.UUID + OriginASNID uuid.UUID + PeerASN int +} + +// RoutingTable is a thread-safe in-memory routing table +type RoutingTable struct { + mu sync.RWMutex + routes map[RouteKey]*Route + + // Secondary indexes for efficient lookups + byPrefix map[uuid.UUID]map[RouteKey]*Route // Routes indexed by prefix ID + byOriginASN map[uuid.UUID]map[RouteKey]*Route // Routes indexed by origin ASN ID + byPeerASN map[int]map[RouteKey]*Route // Routes indexed by peer ASN +} + +// New creates a new empty routing table +func New() *RoutingTable { + return &RoutingTable{ + routes: make(map[RouteKey]*Route), + byPrefix: make(map[uuid.UUID]map[RouteKey]*Route), + byOriginASN: make(map[uuid.UUID]map[RouteKey]*Route), + byPeerASN: make(map[int]map[RouteKey]*Route), + } +} + +// AddRoute adds or updates a route in the routing table +func (rt *RoutingTable) AddRoute(route *Route) { + rt.mu.Lock() + defer rt.mu.Unlock() + + key := RouteKey{ + PrefixID: route.PrefixID, + OriginASNID: route.OriginASNID, + PeerASN: route.PeerASN, + } + + // If route already exists, remove it from indexes first + if existingRoute, exists := rt.routes[key]; exists { + rt.removeFromIndexes(key, existingRoute) + } + + // Add to main map + rt.routes[key] = route + + // Update indexes + rt.addToIndexes(key, route) +} + +// RemoveRoute removes a route from the routing table +func (rt *RoutingTable) RemoveRoute(prefixID, originASNID uuid.UUID, peerASN int) bool { + rt.mu.Lock() + defer rt.mu.Unlock() + + key := RouteKey{ + PrefixID: prefixID, + OriginASNID: originASNID, + PeerASN: peerASN, + } + + route, exists := rt.routes[key] + if !exists { + return false + } + + // Remove from indexes + rt.removeFromIndexes(key, route) + + // Remove from main map + delete(rt.routes, key) + + return true +} + +// WithdrawRoutesByPrefixAndPeer removes all routes for a specific prefix from a specific peer +func (rt *RoutingTable) WithdrawRoutesByPrefixAndPeer(prefixID uuid.UUID, peerASN int) int { + rt.mu.Lock() + defer rt.mu.Unlock() + + count := 0 + + // Find all routes for this prefix + if prefixRoutes, exists := rt.byPrefix[prefixID]; exists { + // Collect keys to delete (can't delete while iterating) + var keysToDelete []RouteKey + for key, route := range prefixRoutes { + if route.PeerASN == peerASN { + keysToDelete = append(keysToDelete, key) + } + } + + // Delete the routes + for _, key := range keysToDelete { + if route, exists := rt.routes[key]; exists { + rt.removeFromIndexes(key, route) + delete(rt.routes, key) + count++ + } + } + } + + return count +} + +// GetRoute retrieves a specific route +func (rt *RoutingTable) GetRoute(prefixID, originASNID uuid.UUID, peerASN int) (*Route, bool) { + rt.mu.RLock() + defer rt.mu.RUnlock() + + key := RouteKey{ + PrefixID: prefixID, + OriginASNID: originASNID, + PeerASN: peerASN, + } + + route, exists := rt.routes[key] + if !exists { + return nil, false + } + + // Return a copy to prevent external modification + routeCopy := *route + + return &routeCopy, true +} + +// GetRoutesByPrefix returns all routes for a specific prefix +func (rt *RoutingTable) GetRoutesByPrefix(prefixID uuid.UUID) []*Route { + rt.mu.RLock() + defer rt.mu.RUnlock() + + routes := make([]*Route, 0) + if prefixRoutes, exists := rt.byPrefix[prefixID]; exists { + for _, route := range prefixRoutes { + routeCopy := *route + routes = append(routes, &routeCopy) + } + } + + return routes +} + +// GetRoutesByOriginASN returns all routes originated by a specific ASN +func (rt *RoutingTable) GetRoutesByOriginASN(originASNID uuid.UUID) []*Route { + rt.mu.RLock() + defer rt.mu.RUnlock() + + routes := make([]*Route, 0) + if asnRoutes, exists := rt.byOriginASN[originASNID]; exists { + for _, route := range asnRoutes { + routeCopy := *route + routes = append(routes, &routeCopy) + } + } + + return routes +} + +// GetRoutesByPeerASN returns all routes received from a specific peer ASN +func (rt *RoutingTable) GetRoutesByPeerASN(peerASN int) []*Route { + rt.mu.RLock() + defer rt.mu.RUnlock() + + routes := make([]*Route, 0) + if peerRoutes, exists := rt.byPeerASN[peerASN]; exists { + for _, route := range peerRoutes { + routeCopy := *route + routes = append(routes, &routeCopy) + } + } + + return routes +} + +// GetAllRoutes returns all active routes in the routing table +func (rt *RoutingTable) GetAllRoutes() []*Route { + rt.mu.RLock() + defer rt.mu.RUnlock() + + routes := make([]*Route, 0, len(rt.routes)) + for _, route := range rt.routes { + routeCopy := *route + routes = append(routes, &routeCopy) + } + + return routes +} + +// Size returns the total number of routes in the table +func (rt *RoutingTable) Size() int { + rt.mu.RLock() + defer rt.mu.RUnlock() + + return len(rt.routes) +} + +// Stats returns statistics about the routing table +func (rt *RoutingTable) Stats() map[string]int { + rt.mu.RLock() + defer rt.mu.RUnlock() + + stats := map[string]int{ + "total_routes": len(rt.routes), + "unique_prefixes": len(rt.byPrefix), + "unique_origins": len(rt.byOriginASN), + "unique_peers": len(rt.byPeerASN), + } + + return stats +} + +// Clear removes all routes from the routing table +func (rt *RoutingTable) Clear() { + rt.mu.Lock() + defer rt.mu.Unlock() + + rt.routes = make(map[RouteKey]*Route) + rt.byPrefix = make(map[uuid.UUID]map[RouteKey]*Route) + rt.byOriginASN = make(map[uuid.UUID]map[RouteKey]*Route) + rt.byPeerASN = make(map[int]map[RouteKey]*Route) +} + +// Helper methods for index management + +func (rt *RoutingTable) addToIndexes(key RouteKey, route *Route) { + // Add to prefix index + if rt.byPrefix[route.PrefixID] == nil { + rt.byPrefix[route.PrefixID] = make(map[RouteKey]*Route) + } + rt.byPrefix[route.PrefixID][key] = route + + // Add to origin ASN index + if rt.byOriginASN[route.OriginASNID] == nil { + rt.byOriginASN[route.OriginASNID] = make(map[RouteKey]*Route) + } + rt.byOriginASN[route.OriginASNID][key] = route + + // Add to peer ASN index + if rt.byPeerASN[route.PeerASN] == nil { + rt.byPeerASN[route.PeerASN] = make(map[RouteKey]*Route) + } + rt.byPeerASN[route.PeerASN][key] = route +} + +func (rt *RoutingTable) removeFromIndexes(key RouteKey, route *Route) { + // Remove from prefix index + if prefixRoutes, exists := rt.byPrefix[route.PrefixID]; exists { + delete(prefixRoutes, key) + if len(prefixRoutes) == 0 { + delete(rt.byPrefix, route.PrefixID) + } + } + + // Remove from origin ASN index + if asnRoutes, exists := rt.byOriginASN[route.OriginASNID]; exists { + delete(asnRoutes, key) + if len(asnRoutes) == 0 { + delete(rt.byOriginASN, route.OriginASNID) + } + } + + // Remove from peer ASN index + if peerRoutes, exists := rt.byPeerASN[route.PeerASN]; exists { + delete(peerRoutes, key) + if len(peerRoutes) == 0 { + delete(rt.byPeerASN, route.PeerASN) + } + } +} + +// String returns a string representation of the route key +func (k RouteKey) String() string { + return fmt.Sprintf("%s/%s/%d", k.PrefixID, k.OriginASNID, k.PeerASN) +} diff --git a/internal/routingtable/routingtable_test.go b/internal/routingtable/routingtable_test.go new file mode 100644 index 0000000..a3bb48e --- /dev/null +++ b/internal/routingtable/routingtable_test.go @@ -0,0 +1,219 @@ +package routingtable + +import ( + "sync" + "testing" + "time" + + "github.com/google/uuid" +) + +func TestRoutingTable(t *testing.T) { + rt := New() + + // Test data + prefixID1 := uuid.New() + prefixID2 := uuid.New() + originASNID1 := uuid.New() + originASNID2 := uuid.New() + + route1 := &Route{ + PrefixID: prefixID1, + Prefix: "10.0.0.0/8", + OriginASNID: originASNID1, + OriginASN: 64512, + PeerASN: 64513, + ASPath: []int{64513, 64512}, + NextHop: "192.168.1.1", + AnnouncedAt: time.Now(), + } + + route2 := &Route{ + PrefixID: prefixID2, + Prefix: "192.168.0.0/16", + OriginASNID: originASNID2, + OriginASN: 64514, + PeerASN: 64513, + ASPath: []int{64513, 64514}, + NextHop: "192.168.1.1", + AnnouncedAt: time.Now(), + } + + // Test AddRoute + rt.AddRoute(route1) + rt.AddRoute(route2) + + if rt.Size() != 2 { + t.Errorf("Expected 2 routes, got %d", rt.Size()) + } + + // Test GetRoute + retrievedRoute, exists := rt.GetRoute(prefixID1, originASNID1, 64513) + if !exists { + t.Error("Route 1 should exist") + } + if retrievedRoute.Prefix != "10.0.0.0/8" { + t.Errorf("Expected prefix 10.0.0.0/8, got %s", retrievedRoute.Prefix) + } + + // Test GetRoutesByPrefix + prefixRoutes := rt.GetRoutesByPrefix(prefixID1) + if len(prefixRoutes) != 1 { + t.Errorf("Expected 1 route for prefix, got %d", len(prefixRoutes)) + } + + // Test GetRoutesByPeerASN + peerRoutes := rt.GetRoutesByPeerASN(64513) + if len(peerRoutes) != 2 { + t.Errorf("Expected 2 routes from peer 64513, got %d", len(peerRoutes)) + } + + // Test RemoveRoute + removed := rt.RemoveRoute(prefixID1, originASNID1, 64513) + if !removed { + t.Error("Route should have been removed") + } + if rt.Size() != 1 { + t.Errorf("Expected 1 route after removal, got %d", rt.Size()) + } + + // Test WithdrawRoutesByPrefixAndPeer + // Add the route back first + rt.AddRoute(route1) + + // Add another route for the same prefix from the same peer + route3 := &Route{ + PrefixID: prefixID1, + Prefix: "10.0.0.0/8", + OriginASNID: originASNID2, // Different origin + OriginASN: 64515, + PeerASN: 64513, + ASPath: []int{64513, 64515}, + NextHop: "192.168.1.1", + AnnouncedAt: time.Now(), + } + rt.AddRoute(route3) + + count := rt.WithdrawRoutesByPrefixAndPeer(prefixID1, 64513) + if count != 2 { + t.Errorf("Expected to withdraw 2 routes, withdrew %d", count) + } + + // Should only have route2 left + if rt.Size() != 1 { + t.Errorf("Expected 1 route after withdrawal, got %d", rt.Size()) + } + + // Test Stats + stats := rt.Stats() + if stats["total_routes"] != 1 { + t.Errorf("Expected 1 total route in stats, got %d", stats["total_routes"]) + } + + // Test Clear + rt.Clear() + if rt.Size() != 0 { + t.Errorf("Expected 0 routes after clear, got %d", rt.Size()) + } +} + +func TestRoutingTableConcurrency(t *testing.T) { + rt := New() + + // Test concurrent access + var wg sync.WaitGroup + numGoroutines := 10 + numOperations := 100 + + // Start multiple goroutines that add/remove routes + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + for j := 0; j < numOperations; j++ { + prefixID := uuid.New() + originASNID := uuid.New() + + route := &Route{ + PrefixID: prefixID, + Prefix: "10.0.0.0/8", + OriginASNID: originASNID, + OriginASN: 64512 + id, + PeerASN: 64500, + ASPath: []int{64500, 64512 + id}, + NextHop: "192.168.1.1", + AnnouncedAt: time.Now(), + } + + // Add route + rt.AddRoute(route) + + // Try to get it + _, _ = rt.GetRoute(prefixID, originASNID, 64500) + + // Get stats + _ = rt.Stats() + + // Remove it + rt.RemoveRoute(prefixID, originASNID, 64500) + } + }(i) + } + + wg.Wait() + + // Table should be empty after all operations + if rt.Size() != 0 { + t.Errorf("Expected empty table after concurrent operations, got %d routes", rt.Size()) + } +} + +func TestRouteUpdate(t *testing.T) { + rt := New() + + prefixID := uuid.New() + originASNID := uuid.New() + + route1 := &Route{ + PrefixID: prefixID, + Prefix: "10.0.0.0/8", + OriginASNID: originASNID, + OriginASN: 64512, + PeerASN: 64513, + ASPath: []int{64513, 64512}, + NextHop: "192.168.1.1", + AnnouncedAt: time.Now(), + } + + // Add initial route + rt.AddRoute(route1) + + // Update the same route with new next hop + route2 := &Route{ + PrefixID: prefixID, + Prefix: "10.0.0.0/8", + OriginASNID: originASNID, + OriginASN: 64512, + PeerASN: 64513, + ASPath: []int{64513, 64512}, + NextHop: "192.168.1.2", // Changed + AnnouncedAt: time.Now().Add(1 * time.Minute), + } + + rt.AddRoute(route2) + + // Should still have only 1 route + if rt.Size() != 1 { + t.Errorf("Expected 1 route after update, got %d", rt.Size()) + } + + // Check that the route was updated + retrievedRoute, exists := rt.GetRoute(prefixID, originASNID, 64513) + if !exists { + t.Error("Route should exist after update") + } + if retrievedRoute.NextHop != "192.168.1.2" { + t.Errorf("Expected updated next hop 192.168.1.2, got %s", retrievedRoute.NextHop) + } +} diff --git a/internal/server/server.go b/internal/server/server.go index a6623d2..929c728 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -10,6 +10,7 @@ import ( "time" "git.eeqj.de/sneak/routewatch/internal/database" + "git.eeqj.de/sneak/routewatch/internal/routingtable" "git.eeqj.de/sneak/routewatch/internal/streamer" "git.eeqj.de/sneak/routewatch/internal/templates" "github.com/go-chi/chi/v5" @@ -18,19 +19,21 @@ import ( // Server provides HTTP endpoints for status monitoring type Server struct { - router *chi.Mux - db database.Store - streamer *streamer.Streamer - logger *slog.Logger - srv *http.Server + router *chi.Mux + db database.Store + routingTable *routingtable.RoutingTable + streamer *streamer.Streamer + logger *slog.Logger + srv *http.Server } // New creates a new HTTP server -func New(db database.Store, streamer *streamer.Streamer, logger *slog.Logger) *Server { +func New(db database.Store, rt *routingtable.RoutingTable, streamer *streamer.Streamer, logger *slog.Logger) *Server { s := &Server{ - db: db, - streamer: streamer, - logger: logger, + db: db, + routingTable: rt, + streamer: streamer, + logger: logger, } s.setupRoutes() @@ -200,7 +203,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc { IPv4Prefixes: dbStats.IPv4Prefixes, IPv6Prefixes: dbStats.IPv6Prefixes, Peerings: dbStats.Peerings, - LiveRoutes: dbStats.LiveRoutes, + LiveRoutes: s.routingTable.Size(), } w.Header().Set("Content-Type", "application/json") @@ -300,7 +303,7 @@ func (s *Server) handleStats() http.HandlerFunc { IPv4Prefixes: dbStats.IPv4Prefixes, IPv6Prefixes: dbStats.IPv6Prefixes, Peerings: dbStats.Peerings, - LiveRoutes: dbStats.LiveRoutes, + LiveRoutes: s.routingTable.Size(), } w.Header().Set("Content-Type", "application/json") diff --git a/internal/streamer/streamer.go b/internal/streamer/streamer.go index 027d896..6f0e662 100644 --- a/internal/streamer/streamer.go +++ b/internal/streamer/streamer.go @@ -25,7 +25,7 @@ const ( metricsLogInterval = 10 * time.Second bytesPerKB = 1024 bytesPerMB = 1024 * 1024 - maxConcurrentHandlers = 100 // Maximum number of concurrent message handlers + maxConcurrentHandlers = 200 // Maximum number of concurrent message handlers ) // MessageHandler is an interface for handling RIS messages @@ -141,16 +141,26 @@ func (s *Streamer) logMetrics() { const bitsPerMegabit = 1000000 droppedMessages := atomic.LoadUint64(&s.droppedMessages) - s.logger.Info("Stream statistics", - "uptime", uptime, - "total_messages", metrics.TotalMessages, - "total_bytes", metrics.TotalBytes, - "total_mb", fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB), - "messages_per_sec", fmt.Sprintf("%.2f", metrics.MessagesPerSec), - "bits_per_sec", fmt.Sprintf("%.0f", metrics.BitsPerSec), - "mbps", fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit), - "dropped_messages", droppedMessages, - "active_handlers", len(s.semaphore), + s.logger.Info( + "Stream statistics", + "uptime", + uptime, + "total_messages", + metrics.TotalMessages, + "total_bytes", + metrics.TotalBytes, + "total_mb", + fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB), + "messages_per_sec", + fmt.Sprintf("%.2f", metrics.MessagesPerSec), + "bits_per_sec", + fmt.Sprintf("%.0f", metrics.BitsPerSec), + "mbps", + fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit), + "dropped_messages", + droppedMessages, + "active_handlers", + len(s.semaphore), ) } @@ -262,7 +272,8 @@ func (s *Streamer) stream(ctx context.Context) error { msg := wrapper.Data // Parse the timestamp - msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC() + msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0). + UTC() // Process based on message type switch msg.Type { @@ -294,7 +305,12 @@ func (s *Streamer) stream(ctx context.Context) error { msg.Type, string(rawLine), ) - panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type)) + panic( + fmt.Sprintf( + "Unknown RIS message type: %s", + msg.Type, + ), + ) } // Call handlers synchronously within this goroutine @@ -309,7 +325,13 @@ func (s *Streamer) stream(ctx context.Context) error { // Semaphore is full, drop the message dropped := atomic.AddUint64(&s.droppedMessages, 1) if dropped%1000 == 0 { // Log every 1000 dropped messages - s.logger.Warn("Dropping messages due to overload", "total_dropped", dropped, "max_handlers", maxConcurrentHandlers) + s.logger.Warn( + "Dropping messages due to overload", + "total_dropped", + dropped, + "max_handlers", + maxConcurrentHandlers, + ) } } }