// Package routingtable provides a thread-safe in-memory representation of the DFZ routing table. package routingtable import ( "compress/gzip" "encoding/json" "fmt" "log/slog" "os" "path/filepath" "strings" "sync" "sync/atomic" "time" "git.eeqj.de/sneak/routewatch/internal/config" "github.com/google/uuid" ) const ( // routeStalenessThreshold is how old a route can be before we consider it stale // Using 30 minutes as a conservative value for snapshot loading routeStalenessThreshold = 30 * time.Minute // snapshotFilename is the name of the snapshot file snapshotFilename = "routingtable.json.gz" ) // Route represents a single route entry in the routing table type Route struct { PrefixID uuid.UUID `json:"prefix_id"` Prefix string `json:"prefix"` // The actual prefix string (e.g., "10.0.0.0/8") OriginASNID uuid.UUID `json:"origin_asn_id"` OriginASN int `json:"origin_asn"` // The actual ASN number PeerASN int `json:"peer_asn"` ASPath []int `json:"as_path"` // Full AS path NextHop string `json:"next_hop"` AnnouncedAt time.Time `json:"announced_at"` AddedAt time.Time `json:"added_at"` // When we added this route to our table } // RouteKey uniquely identifies a route in the table type RouteKey struct { PrefixID uuid.UUID OriginASNID uuid.UUID PeerASN int } // RoutingTable is a thread-safe in-memory routing table type RoutingTable struct { mu sync.RWMutex routes map[RouteKey]*Route // Secondary indexes for efficient lookups byPrefix map[uuid.UUID]map[RouteKey]*Route // Routes indexed by prefix ID byOriginASN map[uuid.UUID]map[RouteKey]*Route // Routes indexed by origin ASN ID byPeerASN map[int]map[RouteKey]*Route // Routes indexed by peer ASN // Metrics tracking ipv4Routes int ipv6Routes int ipv4Updates uint64 // Updates counter for rate calculation ipv6Updates uint64 // Updates counter for rate calculation lastMetricsReset time.Time // Configuration snapshotDir string } // New creates a new routing table, loading from snapshot if available func New(cfg *config.Config, logger *slog.Logger) *RoutingTable { rt := &RoutingTable{ routes: make(map[RouteKey]*Route), byPrefix: make(map[uuid.UUID]map[RouteKey]*Route), byOriginASN: make(map[uuid.UUID]map[RouteKey]*Route), byPeerASN: make(map[int]map[RouteKey]*Route), lastMetricsReset: time.Now(), snapshotDir: cfg.GetStateDir(), } // Try to load from snapshot if err := rt.loadFromSnapshot(logger); err != nil { logger.Warn("Failed to load routing table from snapshot", "error", err) } return rt } // AddRoute adds or updates a route in the routing table func (rt *RoutingTable) AddRoute(route *Route) { rt.mu.Lock() defer rt.mu.Unlock() key := RouteKey{ PrefixID: route.PrefixID, OriginASNID: route.OriginASNID, PeerASN: route.PeerASN, } // If route already exists, remove it from indexes first if existingRoute, exists := rt.routes[key]; exists { rt.removeFromIndexes(key, existingRoute) // Decrement counter for existing route if isIPv6(existingRoute.Prefix) { rt.ipv6Routes-- } else { rt.ipv4Routes-- } } // Set AddedAt if not already set if route.AddedAt.IsZero() { route.AddedAt = time.Now().UTC() } // Add to main map rt.routes[key] = route // Update indexes rt.addToIndexes(key, route) // Update metrics if isIPv6(route.Prefix) { rt.ipv6Routes++ atomic.AddUint64(&rt.ipv6Updates, 1) } else { rt.ipv4Routes++ atomic.AddUint64(&rt.ipv4Updates, 1) } } // RemoveRoute removes a route from the routing table func (rt *RoutingTable) RemoveRoute(prefixID, originASNID uuid.UUID, peerASN int) bool { rt.mu.Lock() defer rt.mu.Unlock() key := RouteKey{ PrefixID: prefixID, OriginASNID: originASNID, PeerASN: peerASN, } route, exists := rt.routes[key] if !exists { return false } // Remove from indexes rt.removeFromIndexes(key, route) // Remove from main map delete(rt.routes, key) // Update metrics if isIPv6(route.Prefix) { rt.ipv6Routes-- atomic.AddUint64(&rt.ipv6Updates, 1) } else { rt.ipv4Routes-- atomic.AddUint64(&rt.ipv4Updates, 1) } return true } // WithdrawRoutesByPrefixAndPeer removes all routes for a specific prefix from a specific peer func (rt *RoutingTable) WithdrawRoutesByPrefixAndPeer(prefixID uuid.UUID, peerASN int) int { rt.mu.Lock() defer rt.mu.Unlock() prefixRoutes, exists := rt.byPrefix[prefixID] if !exists { return 0 } // Collect keys to delete (can't delete while iterating) var keysToDelete []RouteKey for key, route := range prefixRoutes { if route.PeerASN == peerASN { keysToDelete = append(keysToDelete, key) } } // Delete the routes count := 0 for _, key := range keysToDelete { route, exists := rt.routes[key] if !exists { continue } rt.removeFromIndexes(key, route) delete(rt.routes, key) count++ // Update metrics if isIPv6(route.Prefix) { rt.ipv6Routes-- atomic.AddUint64(&rt.ipv6Updates, 1) } else { rt.ipv4Routes-- atomic.AddUint64(&rt.ipv4Updates, 1) } } return count } // GetRoute retrieves a specific route func (rt *RoutingTable) GetRoute(prefixID, originASNID uuid.UUID, peerASN int) (*Route, bool) { rt.mu.RLock() defer rt.mu.RUnlock() key := RouteKey{ PrefixID: prefixID, OriginASNID: originASNID, PeerASN: peerASN, } route, exists := rt.routes[key] if !exists { return nil, false } // Return a copy to prevent external modification routeCopy := *route return &routeCopy, true } // GetRoutesByPrefix returns all routes for a specific prefix func (rt *RoutingTable) GetRoutesByPrefix(prefixID uuid.UUID) []*Route { rt.mu.RLock() defer rt.mu.RUnlock() routes := make([]*Route, 0) if prefixRoutes, exists := rt.byPrefix[prefixID]; exists { for _, route := range prefixRoutes { routeCopy := *route routes = append(routes, &routeCopy) } } return routes } // GetRoutesByOriginASN returns all routes originated by a specific ASN func (rt *RoutingTable) GetRoutesByOriginASN(originASNID uuid.UUID) []*Route { rt.mu.RLock() defer rt.mu.RUnlock() routes := make([]*Route, 0) if asnRoutes, exists := rt.byOriginASN[originASNID]; exists { for _, route := range asnRoutes { routeCopy := *route routes = append(routes, &routeCopy) } } return routes } // GetRoutesByPeerASN returns all routes received from a specific peer ASN func (rt *RoutingTable) GetRoutesByPeerASN(peerASN int) []*Route { rt.mu.RLock() defer rt.mu.RUnlock() routes := make([]*Route, 0) if peerRoutes, exists := rt.byPeerASN[peerASN]; exists { for _, route := range peerRoutes { routeCopy := *route routes = append(routes, &routeCopy) } } return routes } // GetAllRoutes returns all active routes in the routing table func (rt *RoutingTable) GetAllRoutes() []*Route { rt.mu.RLock() defer rt.mu.RUnlock() routes := make([]*Route, 0, len(rt.routes)) for _, route := range rt.routes { routeCopy := *route routes = append(routes, &routeCopy) } return routes } // Size returns the total number of routes in the table func (rt *RoutingTable) Size() int { rt.mu.RLock() defer rt.mu.RUnlock() return len(rt.routes) } // Stats returns statistics about the routing table func (rt *RoutingTable) Stats() map[string]int { rt.mu.RLock() defer rt.mu.RUnlock() stats := map[string]int{ "total_routes": len(rt.routes), "unique_prefixes": len(rt.byPrefix), "unique_origins": len(rt.byOriginASN), "unique_peers": len(rt.byPeerASN), } return stats } // DetailedStats contains detailed routing table statistics type DetailedStats struct { IPv4Routes int IPv6Routes int IPv4UpdatesRate float64 IPv6UpdatesRate float64 TotalRoutes int UniquePrefixes int UniqueOrigins int UniquePeers int } // GetDetailedStats returns detailed statistics including IPv4/IPv6 breakdown and update rates func (rt *RoutingTable) GetDetailedStats() DetailedStats { rt.mu.Lock() defer rt.mu.Unlock() // Calculate update rates elapsed := time.Since(rt.lastMetricsReset).Seconds() ipv4Updates := atomic.LoadUint64(&rt.ipv4Updates) ipv6Updates := atomic.LoadUint64(&rt.ipv6Updates) stats := DetailedStats{ IPv4Routes: rt.ipv4Routes, IPv6Routes: rt.ipv6Routes, IPv4UpdatesRate: float64(ipv4Updates) / elapsed, IPv6UpdatesRate: float64(ipv6Updates) / elapsed, TotalRoutes: len(rt.routes), UniquePrefixes: len(rt.byPrefix), UniqueOrigins: len(rt.byOriginASN), UniquePeers: len(rt.byPeerASN), } // Reset counters for next period atomic.StoreUint64(&rt.ipv4Updates, 0) atomic.StoreUint64(&rt.ipv6Updates, 0) rt.lastMetricsReset = time.Now() return stats } // Clear removes all routes from the routing table func (rt *RoutingTable) Clear() { rt.mu.Lock() defer rt.mu.Unlock() rt.routes = make(map[RouteKey]*Route) rt.byPrefix = make(map[uuid.UUID]map[RouteKey]*Route) rt.byOriginASN = make(map[uuid.UUID]map[RouteKey]*Route) rt.byPeerASN = make(map[int]map[RouteKey]*Route) rt.ipv4Routes = 0 rt.ipv6Routes = 0 atomic.StoreUint64(&rt.ipv4Updates, 0) atomic.StoreUint64(&rt.ipv6Updates, 0) rt.lastMetricsReset = time.Now() } // RLock acquires a read lock on the routing table // This is exposed for the snapshotter to use func (rt *RoutingTable) RLock() { rt.mu.RLock() } // RUnlock releases a read lock on the routing table // This is exposed for the snapshotter to use func (rt *RoutingTable) RUnlock() { rt.mu.RUnlock() } // GetAllRoutesUnsafe returns all routes without copying // IMPORTANT: Caller must hold RLock before calling this method func (rt *RoutingTable) GetAllRoutesUnsafe() []*Route { routes := make([]*Route, 0, len(rt.routes)) for _, route := range rt.routes { routes = append(routes, route) } return routes } // Helper methods for index management func (rt *RoutingTable) addToIndexes(key RouteKey, route *Route) { // Add to prefix index if rt.byPrefix[route.PrefixID] == nil { rt.byPrefix[route.PrefixID] = make(map[RouteKey]*Route) } rt.byPrefix[route.PrefixID][key] = route // Add to origin ASN index if rt.byOriginASN[route.OriginASNID] == nil { rt.byOriginASN[route.OriginASNID] = make(map[RouteKey]*Route) } rt.byOriginASN[route.OriginASNID][key] = route // Add to peer ASN index if rt.byPeerASN[route.PeerASN] == nil { rt.byPeerASN[route.PeerASN] = make(map[RouteKey]*Route) } rt.byPeerASN[route.PeerASN][key] = route } func (rt *RoutingTable) removeFromIndexes(key RouteKey, route *Route) { // Remove from prefix index if prefixRoutes, exists := rt.byPrefix[route.PrefixID]; exists { delete(prefixRoutes, key) if len(prefixRoutes) == 0 { delete(rt.byPrefix, route.PrefixID) } } // Remove from origin ASN index if asnRoutes, exists := rt.byOriginASN[route.OriginASNID]; exists { delete(asnRoutes, key) if len(asnRoutes) == 0 { delete(rt.byOriginASN, route.OriginASNID) } } // Remove from peer ASN index if peerRoutes, exists := rt.byPeerASN[route.PeerASN]; exists { delete(peerRoutes, key) if len(peerRoutes) == 0 { delete(rt.byPeerASN, route.PeerASN) } } } // String returns a string representation of the route key func (k RouteKey) String() string { return fmt.Sprintf("%s/%s/%d", k.PrefixID, k.OriginASNID, k.PeerASN) } // isIPv6 returns true if the prefix is an IPv6 address func isIPv6(prefix string) bool { return strings.Contains(prefix, ":") } // loadFromSnapshot attempts to load the routing table from a snapshot file func (rt *RoutingTable) loadFromSnapshot(logger *slog.Logger) error { // If no snapshot directory specified, nothing to load if rt.snapshotDir == "" { return nil } snapshotPath := filepath.Join(rt.snapshotDir, snapshotFilename) // Check if snapshot file exists if _, err := os.Stat(snapshotPath); os.IsNotExist(err) { // No snapshot file exists, this is normal - start with empty routing table return nil } // Open the snapshot file file, err := os.Open(filepath.Clean(snapshotPath)) if err != nil { return fmt.Errorf("failed to open snapshot file: %w", err) } defer func() { _ = file.Close() }() // Create gzip reader gzReader, err := gzip.NewReader(file) if err != nil { return fmt.Errorf("failed to create gzip reader: %w", err) } defer func() { _ = gzReader.Close() }() // Decode the snapshot var snapshot struct { Timestamp time.Time `json:"timestamp"` Stats DetailedStats `json:"stats"` Routes []*Route `json:"routes"` } decoder := json.NewDecoder(gzReader) if err := decoder.Decode(&snapshot); err != nil { return fmt.Errorf("failed to decode snapshot: %w", err) } // Calculate staleness cutoff time now := time.Now().UTC() cutoffTime := now.Add(-routeStalenessThreshold) // Load non-stale routes loadedCount := 0 staleCount := 0 for _, route := range snapshot.Routes { // Check if route is stale based on AddedAt time if route.AddedAt.Before(cutoffTime) { staleCount++ continue } // Add the route (this will update counters and indexes) rt.AddRoute(route) loadedCount++ } logger.Info("Loaded routing table from snapshot", "snapshot_time", snapshot.Timestamp, "loaded_routes", loadedCount, "stale_routes", staleCount, "total_routes_in_snapshot", len(snapshot.Routes), ) return nil }