diff --git a/internal/streamer/streamer.go b/internal/streamer/streamer.go index 2b2f061..027d896 100644 --- a/internal/streamer/streamer.go +++ b/internal/streamer/streamer.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "sync" + "sync/atomic" "time" "git.eeqj.de/sneak/routewatch/internal/metrics" @@ -18,12 +19,13 @@ import ( ) const ( - risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json" - metricsWindowSize = 60 // seconds for rolling average - metricsUpdateRate = time.Second - metricsLogInterval = 10 * time.Second - bytesPerKB = 1024 - bytesPerMB = 1024 * 1024 + risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json" + metricsWindowSize = 60 // seconds for rolling average + metricsUpdateRate = time.Second + metricsLogInterval = 10 * time.Second + bytesPerKB = 1024 + bytesPerMB = 1024 * 1024 + maxConcurrentHandlers = 100 // Maximum number of concurrent message handlers ) // MessageHandler is an interface for handling RIS messages @@ -40,14 +42,16 @@ type RawMessageHandler func(line string) // Streamer handles streaming BGP updates from RIS Live type Streamer struct { - logger *slog.Logger - client *http.Client - handlers []MessageHandler - rawHandler RawMessageHandler - mu sync.RWMutex - cancel context.CancelFunc - running bool - metrics *metrics.Tracker + logger *slog.Logger + client *http.Client + handlers []MessageHandler + rawHandler RawMessageHandler + mu sync.RWMutex + cancel context.CancelFunc + running bool + metrics *metrics.Tracker + semaphore chan struct{} // Limits concurrent message processing + droppedMessages uint64 // Atomic counter for dropped messages } // New creates a new RIS streamer @@ -57,8 +61,9 @@ func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer { client: &http.Client{ Timeout: 0, // No timeout for streaming }, - handlers: make([]MessageHandler, 0), - metrics: metrics, + handlers: make([]MessageHandler, 0), + metrics: metrics, + semaphore: make(chan struct{}, maxConcurrentHandlers), } } @@ -124,12 +129,18 @@ func (s *Streamer) GetMetrics() metrics.StreamMetrics { return s.metrics.GetStreamMetrics() } +// GetDroppedMessages returns the total number of dropped messages +func (s *Streamer) GetDroppedMessages() uint64 { + return atomic.LoadUint64(&s.droppedMessages) +} + // logMetrics logs the current streaming statistics func (s *Streamer) logMetrics() { metrics := s.metrics.GetStreamMetrics() uptime := time.Since(metrics.ConnectedSince) const bitsPerMegabit = 1000000 + droppedMessages := atomic.LoadUint64(&s.droppedMessages) s.logger.Info("Stream statistics", "uptime", uptime, "total_messages", metrics.TotalMessages, @@ -138,6 +149,8 @@ func (s *Streamer) logMetrics() { "messages_per_sec", fmt.Sprintf("%.2f", metrics.MessagesPerSec), "bits_per_sec", fmt.Sprintf("%.0f", metrics.BitsPerSec), "mbps", fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit), + "dropped_messages", droppedMessages, + "active_handlers", len(s.semaphore), ) } @@ -219,76 +232,86 @@ func (s *Streamer) stream(ctx context.Context) error { copy(handlers, s.handlers) s.mu.RUnlock() - // Spawn goroutine to parse and process the message - go func(rawLine []byte, messageHandlers []MessageHandler) { + // Try to acquire semaphore, drop message if at capacity + select { + case s.semaphore <- struct{}{}: + // Successfully acquired semaphore, process message + go func(rawLine []byte, messageHandlers []MessageHandler) { + defer func() { <-s.semaphore }() // Release semaphore when done - // Parse the outer wrapper first - var wrapper ristypes.RISLiveMessage - if err := json.Unmarshal(rawLine, &wrapper); err != nil { - // Output the raw line and panic on parse failure - fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err) - fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(rawLine)) - panic(fmt.Sprintf("JSON parse error: %v", err)) - } - - // Check if it's a ris_message wrapper - if wrapper.Type != "ris_message" { - s.logger.Error("Unexpected wrapper type", - "type", wrapper.Type, - "line", string(rawLine), - ) - - return - } - - // Get the actual message - msg := wrapper.Data - - // Parse the timestamp - msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC() - - // Process based on message type - switch msg.Type { - case "UPDATE": - // Process BGP UPDATE messages - // Will be handled by registered handlers - case "RIS_PEER_STATE": - // RIS peer state messages - silently ignore - case "KEEPALIVE": - // BGP keepalive messages - silently process - case "OPEN": - // BGP open messages - s.logger.Info("BGP session opened", - "peer", msg.Peer, - "peer_asn", msg.PeerASN, - ) - case "NOTIFICATION": - // BGP notification messages (errors) - s.logger.Warn("BGP notification", - "peer", msg.Peer, - "peer_asn", msg.PeerASN, - ) - case "STATE": - // Peer state changes - silently ignore - default: - fmt.Fprintf( - os.Stderr, - "UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n", - msg.Type, - string(rawLine), - ) - panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type)) - } - - // Spawn goroutine for each handler callback that wants this message type - for _, handler := range messageHandlers { - if handler.WantsMessage(msg.Type) { - go func(h MessageHandler) { - h.HandleMessage(&msg) - }(handler) + // Parse the outer wrapper first + var wrapper ristypes.RISLiveMessage + if err := json.Unmarshal(rawLine, &wrapper); err != nil { + // Output the raw line and panic on parse failure + fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err) + fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(rawLine)) + panic(fmt.Sprintf("JSON parse error: %v", err)) } + + // Check if it's a ris_message wrapper + if wrapper.Type != "ris_message" { + s.logger.Error("Unexpected wrapper type", + "type", wrapper.Type, + "line", string(rawLine), + ) + + return + } + + // Get the actual message + msg := wrapper.Data + + // Parse the timestamp + msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC() + + // Process based on message type + switch msg.Type { + case "UPDATE": + // Process BGP UPDATE messages + // Will be handled by registered handlers + case "RIS_PEER_STATE": + // RIS peer state messages - silently ignore + case "KEEPALIVE": + // BGP keepalive messages - silently process + case "OPEN": + // BGP open messages + s.logger.Info("BGP session opened", + "peer", msg.Peer, + "peer_asn", msg.PeerASN, + ) + case "NOTIFICATION": + // BGP notification messages (errors) + s.logger.Warn("BGP notification", + "peer", msg.Peer, + "peer_asn", msg.PeerASN, + ) + case "STATE": + // Peer state changes - silently ignore + default: + fmt.Fprintf( + os.Stderr, + "UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n", + msg.Type, + string(rawLine), + ) + panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type)) + } + + // Call handlers synchronously within this goroutine + // This prevents unbounded goroutine growth at the handler level + for _, handler := range messageHandlers { + if handler.WantsMessage(msg.Type) { + handler.HandleMessage(&msg) + } + } + }(append([]byte(nil), line...), handlers) // Copy the line to avoid data races + default: + // Semaphore is full, drop the message + dropped := atomic.AddUint64(&s.droppedMessages, 1) + if dropped%1000 == 0 { // Log every 1000 dropped messages + s.logger.Warn("Dropping messages due to overload", "total_dropped", dropped, "max_handlers", maxConcurrentHandlers) } - }(append([]byte(nil), line...), handlers) // Copy the line to avoid data races + } } if err := scanner.Err(); err != nil {