// Package streamer implements an HTTP client that connects to the RIPE RIS Live streaming API, // parses BGP UPDATE messages from the JSON stream, and dispatches them to registered handlers. package streamer import ( "bufio" "context" "encoding/json" "fmt" "log/slog" "net/http" "os" "sync" "time" "git.eeqj.de/sneak/routewatch/internal/metrics" "git.eeqj.de/sneak/routewatch/internal/ristypes" ) const ( risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json" metricsWindowSize = 60 // seconds for rolling average metricsUpdateRate = time.Second metricsLogInterval = 10 * time.Second bytesPerKB = 1024 bytesPerMB = 1024 * 1024 ) // MessageHandler is an interface for handling RIS messages type MessageHandler interface { // WantsMessage returns true if this handler wants to process messages of the given type WantsMessage(messageType string) bool // HandleMessage processes a RIS message HandleMessage(msg *ristypes.RISMessage) } // RawMessageHandler is a callback for handling raw JSON lines from the stream type RawMessageHandler func(line string) // Streamer handles streaming BGP updates from RIS Live type Streamer struct { logger *slog.Logger client *http.Client handlers []MessageHandler rawHandler RawMessageHandler mu sync.RWMutex cancel context.CancelFunc running bool metrics *metrics.Tracker } // New creates a new RIS streamer func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer { return &Streamer{ logger: logger, client: &http.Client{ Timeout: 0, // No timeout for streaming }, handlers: make([]MessageHandler, 0), metrics: metrics, } } // RegisterHandler adds a callback for message processing func (s *Streamer) RegisterHandler(handler MessageHandler) { s.mu.Lock() defer s.mu.Unlock() s.handlers = append(s.handlers, handler) } // RegisterRawHandler sets a callback for raw message lines func (s *Streamer) RegisterRawHandler(handler RawMessageHandler) { s.mu.Lock() defer s.mu.Unlock() s.rawHandler = handler } // Start begins streaming in a goroutine func (s *Streamer) Start() error { s.mu.Lock() defer s.mu.Unlock() if s.running { return fmt.Errorf("streamer already running") } ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel s.running = true go func() { if err := s.stream(ctx); err != nil { s.logger.Error("Streaming error", "error", err) } s.mu.Lock() s.running = false s.mu.Unlock() }() return nil } // Stop halts the streaming func (s *Streamer) Stop() { s.mu.Lock() if s.cancel != nil { s.cancel() } s.mu.Unlock() s.metrics.SetConnected(false) } // IsRunning returns whether the streamer is currently active func (s *Streamer) IsRunning() bool { s.mu.RLock() defer s.mu.RUnlock() return s.running } // GetMetrics returns current streaming metrics func (s *Streamer) GetMetrics() metrics.StreamMetrics { return s.metrics.GetStreamMetrics() } // logMetrics logs the current streaming statistics func (s *Streamer) logMetrics() { metrics := s.metrics.GetStreamMetrics() uptime := time.Since(metrics.ConnectedSince) const bitsPerMegabit = 1000000 s.logger.Info("Stream statistics", "uptime", uptime, "total_messages", metrics.TotalMessages, "total_bytes", metrics.TotalBytes, "total_mb", fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB), "messages_per_sec", fmt.Sprintf("%.2f", metrics.MessagesPerSec), "bits_per_sec", fmt.Sprintf("%.0f", metrics.BitsPerSec), "mbps", fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit), ) } // updateMetrics updates the metrics counters and rates func (s *Streamer) updateMetrics(messageBytes int) { s.metrics.RecordMessage(int64(messageBytes)) } func (s *Streamer) stream(ctx context.Context) error { req, err := http.NewRequestWithContext(ctx, "GET", risLiveURL, nil) if err != nil { return fmt.Errorf("failed to create request: %w", err) } resp, err := s.client.Do(req) if err != nil { return fmt.Errorf("failed to connect to RIS Live: %w", err) } defer func() { if err := resp.Body.Close(); err != nil { s.logger.Error("Failed to close response body", "error", err) } }() if resp.StatusCode != http.StatusOK { return fmt.Errorf("unexpected status code: %d", resp.StatusCode) } s.logger.Info("Connected to RIS Live stream") s.metrics.SetConnected(true) // Start metrics logging goroutine metricsTicker := time.NewTicker(metricsLogInterval) defer metricsTicker.Stop() go func() { for { select { case <-metricsTicker.C: s.logMetrics() case <-ctx.Done(): return } } }() scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { select { case <-ctx.Done(): s.logger.Info("Stream stopped by context") return ctx.Err() default: } line := scanner.Bytes() if len(line) == 0 { continue } // Update metrics with message size s.updateMetrics(len(line)) // Call raw handler if registered s.mu.RLock() rawHandler := s.rawHandler s.mu.RUnlock() if rawHandler != nil { // Call raw handler synchronously to preserve order rawHandler(string(line)) } // Get current handlers s.mu.RLock() handlers := make([]MessageHandler, len(s.handlers)) copy(handlers, s.handlers) s.mu.RUnlock() // Spawn goroutine to parse and process the message go func(rawLine []byte, messageHandlers []MessageHandler) { // Parse the outer wrapper first var wrapper ristypes.RISLiveMessage if err := json.Unmarshal(rawLine, &wrapper); err != nil { // Output the raw line and panic on parse failure fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err) fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(rawLine)) panic(fmt.Sprintf("JSON parse error: %v", err)) } // Check if it's a ris_message wrapper if wrapper.Type != "ris_message" { s.logger.Error("Unexpected wrapper type", "type", wrapper.Type, "line", string(rawLine), ) return } // Get the actual message msg := wrapper.Data // Parse the timestamp msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC() // Process based on message type switch msg.Type { case "UPDATE": // Process BGP UPDATE messages // Will be handled by registered handlers case "RIS_PEER_STATE": // RIS peer state messages - silently ignore case "KEEPALIVE": // BGP keepalive messages - silently process case "OPEN": // BGP open messages s.logger.Info("BGP session opened", "peer", msg.Peer, "peer_asn", msg.PeerASN, ) case "NOTIFICATION": // BGP notification messages (errors) s.logger.Warn("BGP notification", "peer", msg.Peer, "peer_asn", msg.PeerASN, ) case "STATE": // Peer state changes - silently ignore default: fmt.Fprintf( os.Stderr, "UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n", msg.Type, string(rawLine), ) panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type)) } // Spawn goroutine for each handler callback that wants this message type for _, handler := range messageHandlers { if handler.WantsMessage(msg.Type) { go func(h MessageHandler) { h.HandleMessage(&msg) }(handler) } } }(append([]byte(nil), line...), handlers) // Copy the line to avoid data races } if err := scanner.Err(); err != nil { return fmt.Errorf("scanner error: %w", err) } return nil }