package routewatch import ( "encoding/json" "sync" "time" "git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/logger" "git.eeqj.de/sneak/routewatch/internal/ristypes" ) const ( // peeringHandlerQueueSize defines the buffer capacity for the peering // handler's message queue. This should be large enough to handle bursts // of BGP UPDATE messages without blocking. peeringHandlerQueueSize = 100000 // minPathLengthForPeering specifies the minimum number of ASNs required // in a BGP AS path to extract peering relationships. A path with fewer // than 2 ASNs cannot contain any peering information. minPathLengthForPeering = 2 // pathExpirationTime determines how long AS paths are kept in memory // before being eligible for pruning. Paths older than this are removed // to prevent unbounded memory growth. pathExpirationTime = 30 * time.Minute // peeringProcessInterval controls how frequently the handler processes // accumulated AS paths and extracts peering relationships to store // in the database. peeringProcessInterval = 30 * time.Second // pathPruneInterval determines how often the handler checks for and // removes expired AS paths from memory. pathPruneInterval = 5 * time.Minute ) // PeeringHandler processes BGP UPDATE messages to extract and track // AS peering relationships. It accumulates AS paths in memory and // periodically processes them to extract unique peering pairs, which // are then stored in the database. The handler implements the Handler // interface for integration with the message processing pipeline. type PeeringHandler struct { db database.Store logger *logger.Logger // In-memory AS path tracking mu sync.RWMutex asPaths map[string]time.Time // key is JSON-encoded AS path stopCh chan struct{} } // NewPeeringHandler creates and initializes a new PeeringHandler with the // provided database store and logger. It starts two background goroutines: // one for periodic processing of accumulated AS paths into peering records, // and one for pruning expired paths from memory. The handler begins // processing immediately upon creation. func NewPeeringHandler(db database.Store, logger *logger.Logger) *PeeringHandler { h := &PeeringHandler{ db: db, logger: logger, asPaths: make(map[string]time.Time), stopCh: make(chan struct{}), } // Start the periodic processing goroutines go h.processLoop() go h.pruneLoop() return h } // WantsMessage reports whether the handler should receive messages of the // given type. PeeringHandler only processes UPDATE messages, as these contain // the AS path information needed to extract peering relationships. func (h *PeeringHandler) WantsMessage(messageType string) bool { // We only care about UPDATE messages that have AS paths return messageType == "UPDATE" } // QueueCapacity returns the buffer size for the handler's message queue. // This value is used by the message dispatcher to allocate the channel // buffer when registering the handler. func (h *PeeringHandler) QueueCapacity() int { return peeringHandlerQueueSize } // HandleMessage processes a BGP UPDATE message by storing its AS path // in memory for later batch processing. Messages with AS paths shorter // than minPathLengthForPeering are ignored as they cannot contain valid // peering information. func (h *PeeringHandler) HandleMessage(msg *ristypes.RISMessage) { // Skip if no AS path or only one AS if len(msg.Path) < minPathLengthForPeering { return } timestamp := msg.ParsedTimestamp // Encode AS path as JSON for use as map key pathJSON, err := json.Marshal(msg.Path) if err != nil { h.logger.Error("Failed to encode AS path", "error", err) return } h.mu.Lock() h.asPaths[string(pathJSON)] = timestamp h.mu.Unlock() } // processLoop runs periodically to process AS paths into peerings func (h *PeeringHandler) processLoop() { ticker := time.NewTicker(peeringProcessInterval) defer ticker.Stop() for { select { case <-ticker.C: h.processPeerings() case <-h.stopCh: // Final processing h.processPeerings() return } } } // pruneLoop runs periodically to remove old AS paths func (h *PeeringHandler) pruneLoop() { ticker := time.NewTicker(pathPruneInterval) defer ticker.Stop() for { select { case <-ticker.C: h.prunePaths() case <-h.stopCh: return } } } // prunePaths removes AS paths older than pathExpirationTime func (h *PeeringHandler) prunePaths() { cutoff := time.Now().Add(-pathExpirationTime) var removed int h.mu.Lock() for pathKey, timestamp := range h.asPaths { if timestamp.Before(cutoff) { delete(h.asPaths, pathKey) removed++ } } pathCount := len(h.asPaths) h.mu.Unlock() if removed > 0 { h.logger.Debug("Pruned old AS paths", "removed", removed, "remaining", pathCount) } } // ProcessPeeringsNow triggers immediate processing of all accumulated AS // paths into peering records. This bypasses the normal periodic processing // schedule and is primarily intended for testing purposes. func (h *PeeringHandler) ProcessPeeringsNow() { h.processPeerings() } // processPeerings extracts peerings from AS paths and writes to database func (h *PeeringHandler) processPeerings() { // Take a snapshot of current AS paths h.mu.RLock() pathsCopy := make(map[string]time.Time, len(h.asPaths)) for k, v := range h.asPaths { pathsCopy[k] = v } h.mu.RUnlock() if len(pathsCopy) == 0 { return } // Extract unique peerings from AS paths type peeringKey struct { low, high int } peerings := make(map[peeringKey]time.Time) for pathJSON, timestamp := range pathsCopy { var path []int if err := json.Unmarshal([]byte(pathJSON), &path); err != nil { h.logger.Error("Failed to decode AS path", "error", err) continue } // Extract peerings from path for i := range len(path) - 1 { asn1 := path[i] asn2 := path[i+1] // Skip invalid ASNs if asn1 <= 0 || asn2 <= 0 || asn1 == asn2 { continue } // Normalize: lower AS number first low, high := asn1, asn2 if low > high { low, high = high, low } key := peeringKey{low: low, high: high} // Update timestamp if this is newer if existing, ok := peerings[key]; !ok || timestamp.After(existing) { peerings[key] = timestamp } } } // Record peerings in database start := time.Now() successCount := 0 for key, ts := range peerings { err := h.db.RecordPeering(key.low, key.high, ts) if err != nil { h.logger.Error("Failed to record peering", "as_a", key.low, "as_b", key.high, "error", err, ) } else { successCount++ } } h.logger.Info("Processed AS peerings", "paths", len(pathsCopy), "unique_peerings", len(peerings), "success", successCount, "duration", time.Since(start), ) } // Stop gracefully shuts down the handler by signaling the background // goroutines to stop and performing a final synchronous processing of // any remaining AS paths. This ensures no peering data is lost during // shutdown. func (h *PeeringHandler) Stop() { close(h.stopCh) // Process any remaining peerings synchronously h.processPeerings() }