// Package streamer implements an HTTP client that connects to the RIPE RIS Live streaming API, // parses BGP UPDATE messages from the JSON stream, and dispatches them to registered handlers. package streamer import ( "bufio" "context" "encoding/json" "fmt" "math" "math/rand" "net/http" "sync" "sync/atomic" "time" "git.eeqj.de/sneak/routewatch/internal/logger" "git.eeqj.de/sneak/routewatch/internal/metrics" "git.eeqj.de/sneak/routewatch/internal/ristypes" ) const ( risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json&" + "client=https%3A%2F%2Fgit.eeqj.de%2Fsneak%2Froutewatch" metricsWindowSize = 60 // seconds for rolling average metricsUpdateRate = time.Second minBackoffDelay = 5 * time.Second maxBackoffDelay = 320 * time.Second metricsLogInterval = 10 * time.Second bytesPerKB = 1024 bytesPerMB = 1024 * 1024 maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers // Backpressure constants backpressureThreshold = 0.5 // Start dropping at 50% queue utilization backpressureSlope = 2.0 // Slope for linear drop probability increase ) // MessageHandler is an interface for handling RIS messages type MessageHandler interface { // WantsMessage returns true if this handler wants to process messages of the given type WantsMessage(messageType string) bool // HandleMessage processes a RIS message HandleMessage(msg *ristypes.RISMessage) // QueueCapacity returns the desired queue capacity for this handler // Handlers that process quickly can have larger queues QueueCapacity() int } // RawMessageHandler is a callback for handling raw JSON lines from the stream type RawMessageHandler func(line string) // handlerMetrics tracks performance metrics for a handler type handlerMetrics struct { processedCount uint64 // Total messages processed droppedCount uint64 // Total messages dropped totalTime time.Duration // Total processing time (for average calculation) minTime time.Duration // Minimum processing time maxTime time.Duration // Maximum processing time queueHighWaterMark int // Maximum queue length seen mu sync.Mutex // Protects the metrics } // handlerInfo wraps a handler with its queue and metrics type handlerInfo struct { handler MessageHandler queue chan *ristypes.RISMessage metrics handlerMetrics } // Streamer handles streaming BGP updates from RIS Live type Streamer struct { logger *logger.Logger client *http.Client handlers []*handlerInfo rawHandler RawMessageHandler mu sync.RWMutex cancel context.CancelFunc running bool metrics *metrics.Tracker totalDropped uint64 // Total dropped messages across all handlers random *rand.Rand // Random number generator for backpressure drops } // New creates a new RIS streamer func New(logger *logger.Logger, metrics *metrics.Tracker) *Streamer { return &Streamer{ logger: logger, client: &http.Client{ Timeout: 0, // No timeout for streaming }, handlers: make([]*handlerInfo, 0), metrics: metrics, //nolint:gosec // Non-cryptographic randomness is fine for backpressure random: rand.New(rand.NewSource(time.Now().UnixNano())), } } // RegisterHandler adds a callback for message processing func (s *Streamer) RegisterHandler(handler MessageHandler) { s.mu.Lock() defer s.mu.Unlock() // Create handler info with its own queue based on capacity info := &handlerInfo{ handler: handler, queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()), metrics: handlerMetrics{ minTime: time.Duration(math.MaxInt64), // Initialize to max so first value sets the floor }, } s.handlers = append(s.handlers, info) // If we're already running, start a worker for this handler if s.running { go s.runHandlerWorker(info) } } // RegisterRawHandler sets a callback for raw message lines func (s *Streamer) RegisterRawHandler(handler RawMessageHandler) { s.mu.Lock() defer s.mu.Unlock() s.rawHandler = handler } // Start begins streaming in a goroutine func (s *Streamer) Start() error { s.mu.Lock() defer s.mu.Unlock() if s.running { return fmt.Errorf("streamer already running") } ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel s.running = true // Start workers for each handler for _, info := range s.handlers { go s.runHandlerWorker(info) } go func() { s.streamWithReconnect(ctx) s.mu.Lock() s.running = false s.mu.Unlock() }() return nil } // Stop halts the streaming func (s *Streamer) Stop() { s.mu.Lock() if s.cancel != nil { s.cancel() } // Close all handler queues to signal workers to stop for _, info := range s.handlers { close(info.queue) } s.running = false s.mu.Unlock() s.metrics.SetConnected(false) } // runHandlerWorker processes messages for a specific handler func (s *Streamer) runHandlerWorker(info *handlerInfo) { for msg := range info.queue { start := time.Now() info.handler.HandleMessage(msg) elapsed := time.Since(start) // Update metrics info.metrics.mu.Lock() info.metrics.processedCount++ info.metrics.totalTime += elapsed // Update min time if elapsed < info.metrics.minTime { info.metrics.minTime = elapsed } // Update max time if elapsed > info.metrics.maxTime { info.metrics.maxTime = elapsed } info.metrics.mu.Unlock() } } // IsRunning returns whether the streamer is currently active func (s *Streamer) IsRunning() bool { s.mu.RLock() defer s.mu.RUnlock() return s.running } // GetMetrics returns current streaming metrics func (s *Streamer) GetMetrics() metrics.StreamMetrics { return s.metrics.GetStreamMetrics() } // GetMetricsTracker returns the metrics tracker instance func (s *Streamer) GetMetricsTracker() *metrics.Tracker { return s.metrics } // HandlerStats represents metrics for a single handler type HandlerStats struct { Name string QueueLength int QueueCapacity int QueueHighWaterMark int ProcessedCount uint64 DroppedCount uint64 AvgProcessTime time.Duration MinProcessTime time.Duration MaxProcessTime time.Duration } // GetHandlerStats returns current handler statistics func (s *Streamer) GetHandlerStats() []HandlerStats { s.mu.RLock() defer s.mu.RUnlock() stats := make([]HandlerStats, 0, len(s.handlers)) for _, info := range s.handlers { info.metrics.mu.Lock() hs := HandlerStats{ Name: fmt.Sprintf("%T", info.handler), QueueLength: len(info.queue), QueueCapacity: cap(info.queue), QueueHighWaterMark: info.metrics.queueHighWaterMark, ProcessedCount: info.metrics.processedCount, DroppedCount: info.metrics.droppedCount, MinProcessTime: info.metrics.minTime, MaxProcessTime: info.metrics.maxTime, } // Calculate average time if info.metrics.processedCount > 0 { processedCount := info.metrics.processedCount const maxInt64 = 1<<63 - 1 if processedCount > maxInt64 { processedCount = maxInt64 } //nolint:gosec // processedCount is explicitly bounded above hs.AvgProcessTime = info.metrics.totalTime / time.Duration(processedCount) } info.metrics.mu.Unlock() stats = append(stats, hs) } return stats } // GetDroppedMessages returns the total number of dropped messages func (s *Streamer) GetDroppedMessages() uint64 { return atomic.LoadUint64(&s.totalDropped) } // logMetrics logs the current streaming statistics func (s *Streamer) logMetrics() { metrics := s.metrics.GetStreamMetrics() uptime := time.Since(metrics.ConnectedSince) const bitsPerMegabit = 1000000 totalDropped := atomic.LoadUint64(&s.totalDropped) s.logger.Info( "Stream statistics", "uptime", uptime, "total_messages", metrics.TotalMessages, "total_bytes", metrics.TotalBytes, "total_mb", fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB), "messages_per_sec", fmt.Sprintf("%.2f", metrics.MessagesPerSec), "bits_per_sec", fmt.Sprintf("%.0f", metrics.BitsPerSec), "mbps", fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit), "total_dropped", totalDropped, ) // Log per-handler statistics s.mu.RLock() for i, info := range s.handlers { info.metrics.mu.Lock() if info.metrics.processedCount > 0 { // Safe conversion: processedCount is bounded by maxInt64 processedCount := info.metrics.processedCount const maxInt64 = 1<<63 - 1 if processedCount > maxInt64 { processedCount = maxInt64 } //nolint:gosec // processedCount is explicitly bounded above avgTime := info.metrics.totalTime / time.Duration(processedCount) s.logger.Info( "Handler statistics", "handler", fmt.Sprintf("%T", info.handler), "index", i, "queue_len", len(info.queue), "queue_cap", cap(info.queue), "processed", info.metrics.processedCount, "dropped", info.metrics.droppedCount, "avg_time", avgTime, "min_time", info.metrics.minTime, "max_time", info.metrics.maxTime, ) } info.metrics.mu.Unlock() } s.mu.RUnlock() } // updateMetrics updates the metrics counters and rates func (s *Streamer) updateMetrics(messageBytes int) { s.metrics.RecordMessage(int64(messageBytes)) } // streamWithReconnect handles streaming with automatic reconnection and exponential backoff func (s *Streamer) streamWithReconnect(ctx context.Context) { backoffDelay := minBackoffDelay consecutiveFailures := 0 for { select { case <-ctx.Done(): s.logger.Info("Stream context cancelled, stopping reconnection attempts") return default: } // Attempt to stream startTime := time.Now() err := s.stream(ctx) streamDuration := time.Since(startTime) if err == nil { // Clean exit (context cancelled) return } // Log the error s.logger.Error("Stream disconnected", "error", err, "consecutive_failures", consecutiveFailures+1, "stream_duration", streamDuration) s.metrics.SetConnected(false) // Check if context is cancelled if ctx.Err() != nil { return } // If we streamed for more than 30 seconds, reset the backoff // This indicates we had a successful connection that received data if streamDuration > 30*time.Second { s.logger.Info("Resetting backoff delay due to successful connection", "stream_duration", streamDuration) backoffDelay = minBackoffDelay consecutiveFailures = 0 } else { // Increment consecutive failures consecutiveFailures++ } // Wait with exponential backoff s.logger.Info("Waiting before reconnection attempt", "delay_seconds", backoffDelay.Seconds(), "consecutive_failures", consecutiveFailures) select { case <-ctx.Done(): return case <-time.After(backoffDelay): // Double the backoff delay for next time, up to max backoffDelay *= 2 if backoffDelay > maxBackoffDelay { backoffDelay = maxBackoffDelay } } } } func (s *Streamer) stream(ctx context.Context) error { req, err := http.NewRequestWithContext(ctx, "GET", risLiveURL, nil) if err != nil { return fmt.Errorf("failed to create request: %w", err) } resp, err := s.client.Do(req) if err != nil { return fmt.Errorf("failed to connect to RIS Live: %w", err) } defer func() { if err := resp.Body.Close(); err != nil { s.logger.Error("Failed to close response body", "error", err) } }() if resp.StatusCode != http.StatusOK { return fmt.Errorf("unexpected status code: %d", resp.StatusCode) } s.logger.Info("Connected to RIS Live stream") s.metrics.SetConnected(true) // Start metrics logging goroutine metricsTicker := time.NewTicker(metricsLogInterval) defer metricsTicker.Stop() go func() { for { select { case <-metricsTicker.C: s.logMetrics() case <-ctx.Done(): return } } }() scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { select { case <-ctx.Done(): s.logger.Info("Stream stopped by context") return ctx.Err() default: } line := scanner.Bytes() if len(line) == 0 { continue } // Update metrics with message size s.updateMetrics(len(line)) // Call raw handler if registered s.mu.RLock() rawHandler := s.rawHandler s.mu.RUnlock() if rawHandler != nil { // Call raw handler synchronously to preserve order rawHandler(string(line)) } // Parse the message first var wrapper ristypes.RISLiveMessage if err := json.Unmarshal(line, &wrapper); err != nil { // Log the error and return to trigger reconnection s.logger.Error("Failed to parse JSON", "error", err, "line", string(line), "line_length", len(line)) return fmt.Errorf("JSON parse error: %w", err) } // Check if it's a ris_message wrapper if wrapper.Type != "ris_message" { s.logger.Error("Unexpected wrapper type", "type", wrapper.Type, "line", string(line), ) continue } // Get the actual message msg := wrapper.Data // Parse the timestamp msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC() // Process based on message type switch msg.Type { case "UPDATE": // Process BGP UPDATE messages // Will be dispatched to handlers case "RIS_PEER_STATE": // RIS peer state messages - silently ignore continue case "KEEPALIVE": // BGP keepalive messages - silently process continue case "OPEN": // BGP open messages s.logger.Info("BGP session opened", "peer", msg.Peer, "peer_asn", msg.PeerASN, ) continue case "NOTIFICATION": // BGP notification messages (errors) s.logger.Warn("BGP notification", "peer", msg.Peer, "peer_asn", msg.PeerASN, ) continue case "STATE": // Peer state changes - silently ignore continue default: s.logger.Error("Unknown message type", "type", msg.Type, "line", string(line), ) panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type)) } // Dispatch to interested handlers s.mu.RLock() for _, info := range s.handlers { if !info.handler.WantsMessage(msg.Type) { continue } // Check if we should drop due to backpressure if s.shouldDropForBackpressure(info) { atomic.AddUint64(&info.metrics.droppedCount, 1) atomic.AddUint64(&s.totalDropped, 1) continue } // Try to queue the message select { case info.queue <- &msg: // Message queued successfully // Update high water mark if needed queueLen := len(info.queue) info.metrics.mu.Lock() if queueLen > info.metrics.queueHighWaterMark { info.metrics.queueHighWaterMark = queueLen } info.metrics.mu.Unlock() default: // Queue is full, drop the message atomic.AddUint64(&info.metrics.droppedCount, 1) atomic.AddUint64(&s.totalDropped, 1) } } s.mu.RUnlock() } if err := scanner.Err(); err != nil { return fmt.Errorf("scanner error: %w", err) } return nil } // shouldDropForBackpressure determines if a message should be dropped based on queue utilization func (s *Streamer) shouldDropForBackpressure(info *handlerInfo) bool { // Calculate queue utilization queueLen := len(info.queue) queueCap := cap(info.queue) utilization := float64(queueLen) / float64(queueCap) // No drops below threshold if utilization < backpressureThreshold { return false } // Calculate drop probability (0.0 at threshold, 1.0 at 100% full) dropProbability := (utilization - backpressureThreshold) * backpressureSlope if dropProbability > 1.0 { dropProbability = 1.0 } // Random drop based on probability return s.random.Float64() < dropProbability }