diff --git a/internal/routewatch/dbhandler.go b/internal/routewatch/dbhandler.go index ad090a3..ca41a71 100644 --- a/internal/routewatch/dbhandler.go +++ b/internal/routewatch/dbhandler.go @@ -7,6 +7,11 @@ import ( "git.eeqj.de/sneak/routewatch/internal/ristypes" ) +const ( + // databaseHandlerQueueSize is the queue capacity for database operations + databaseHandlerQueueSize = 100 +) + // DatabaseHandler handles BGP messages and stores them in the database type DatabaseHandler struct { db database.Store @@ -27,6 +32,12 @@ func (h *DatabaseHandler) WantsMessage(messageType string) bool { return messageType == "UPDATE" } +// QueueCapacity returns the desired queue capacity for this handler +func (h *DatabaseHandler) QueueCapacity() int { + // Database operations are slow, so use a smaller queue + return databaseHandlerQueueSize +} + // HandleMessage processes a RIS message and updates the database func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) { // Use the pre-parsed timestamp diff --git a/internal/routewatch/peerhandler.go b/internal/routewatch/peerhandler.go index 1f7e7b8..a004383 100644 --- a/internal/routewatch/peerhandler.go +++ b/internal/routewatch/peerhandler.go @@ -8,6 +8,11 @@ import ( "git.eeqj.de/sneak/routewatch/internal/ristypes" ) +const ( + // peerHandlerQueueSize is the queue capacity for peer tracking operations + peerHandlerQueueSize = 500 +) + // PeerHandler tracks BGP peers from all message types type PeerHandler struct { db database.Store @@ -27,6 +32,12 @@ func (h *PeerHandler) WantsMessage(_ string) bool { return true } +// QueueCapacity returns the desired queue capacity for this handler +func (h *PeerHandler) QueueCapacity() int { + // Peer tracking is lightweight but involves database ops, use moderate queue + return peerHandlerQueueSize +} + // HandleMessage processes a message to track peer information func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) { // Parse peer ASN from string diff --git a/internal/routewatch/routingtablehandler.go b/internal/routewatch/routingtablehandler.go index 61f9fe4..fbbbaef 100644 --- a/internal/routewatch/routingtablehandler.go +++ b/internal/routewatch/routingtablehandler.go @@ -9,6 +9,11 @@ import ( "github.com/google/uuid" ) +const ( + // routingTableHandlerQueueSize is the queue capacity for in-memory routing table operations + routingTableHandlerQueueSize = 10000 +) + // RoutingTableHandler handles BGP messages and updates the in-memory routing table type RoutingTableHandler struct { rt *routingtable.RoutingTable @@ -29,6 +34,12 @@ func (h *RoutingTableHandler) WantsMessage(messageType string) bool { return messageType == "UPDATE" } +// QueueCapacity returns the desired queue capacity for this handler +func (h *RoutingTableHandler) QueueCapacity() int { + // In-memory operations are very fast, so use a large queue + return routingTableHandlerQueueSize +} + // HandleMessage processes a RIS message and updates the routing table func (h *RoutingTableHandler) HandleMessage(msg *ristypes.RISMessage) { // Use the pre-parsed timestamp diff --git a/internal/streamer/streamer.go b/internal/streamer/streamer.go index 6f0e662..8698af0 100644 --- a/internal/streamer/streamer.go +++ b/internal/streamer/streamer.go @@ -25,7 +25,7 @@ const ( metricsLogInterval = 10 * time.Second bytesPerKB = 1024 bytesPerMB = 1024 * 1024 - maxConcurrentHandlers = 200 // Maximum number of concurrent message handlers + maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers ) // MessageHandler is an interface for handling RIS messages @@ -35,23 +35,43 @@ type MessageHandler interface { // HandleMessage processes a RIS message HandleMessage(msg *ristypes.RISMessage) + + // QueueCapacity returns the desired queue capacity for this handler + // Handlers that process quickly can have larger queues + QueueCapacity() int } // RawMessageHandler is a callback for handling raw JSON lines from the stream type RawMessageHandler func(line string) +// handlerMetrics tracks performance metrics for a handler +type handlerMetrics struct { + processedCount uint64 // Total messages processed + droppedCount uint64 // Total messages dropped + totalTime time.Duration // Total processing time (for average calculation) + minTime time.Duration // Minimum processing time + maxTime time.Duration // Maximum processing time + mu sync.Mutex // Protects the metrics +} + +// handlerInfo wraps a handler with its queue and metrics +type handlerInfo struct { + handler MessageHandler + queue chan *ristypes.RISMessage + metrics handlerMetrics +} + // Streamer handles streaming BGP updates from RIS Live type Streamer struct { - logger *slog.Logger - client *http.Client - handlers []MessageHandler - rawHandler RawMessageHandler - mu sync.RWMutex - cancel context.CancelFunc - running bool - metrics *metrics.Tracker - semaphore chan struct{} // Limits concurrent message processing - droppedMessages uint64 // Atomic counter for dropped messages + logger *slog.Logger + client *http.Client + handlers []*handlerInfo + rawHandler RawMessageHandler + mu sync.RWMutex + cancel context.CancelFunc + running bool + metrics *metrics.Tracker + totalDropped uint64 // Total dropped messages across all handlers } // New creates a new RIS streamer @@ -61,9 +81,8 @@ func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer { client: &http.Client{ Timeout: 0, // No timeout for streaming }, - handlers: make([]MessageHandler, 0), - metrics: metrics, - semaphore: make(chan struct{}, maxConcurrentHandlers), + handlers: make([]*handlerInfo, 0), + metrics: metrics, } } @@ -71,7 +90,19 @@ func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer { func (s *Streamer) RegisterHandler(handler MessageHandler) { s.mu.Lock() defer s.mu.Unlock() - s.handlers = append(s.handlers, handler) + + // Create handler info with its own queue based on capacity + info := &handlerInfo{ + handler: handler, + queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()), + } + + s.handlers = append(s.handlers, info) + + // If we're already running, start a worker for this handler + if s.running { + go s.runHandlerWorker(info) + } } // RegisterRawHandler sets a callback for raw message lines @@ -94,6 +125,11 @@ func (s *Streamer) Start() error { s.cancel = cancel s.running = true + // Start workers for each handler + for _, info := range s.handlers { + go s.runHandlerWorker(info) + } + go func() { if err := s.stream(ctx); err != nil { s.logger.Error("Streaming error", "error", err) @@ -112,10 +148,40 @@ func (s *Streamer) Stop() { if s.cancel != nil { s.cancel() } + // Close all handler queues to signal workers to stop + for _, info := range s.handlers { + close(info.queue) + } + s.running = false s.mu.Unlock() s.metrics.SetConnected(false) } +// runHandlerWorker processes messages for a specific handler +func (s *Streamer) runHandlerWorker(info *handlerInfo) { + for msg := range info.queue { + start := time.Now() + info.handler.HandleMessage(msg) + elapsed := time.Since(start) + + // Update metrics + info.metrics.mu.Lock() + info.metrics.processedCount++ + info.metrics.totalTime += elapsed + + // Update min time + if info.metrics.minTime == 0 || elapsed < info.metrics.minTime { + info.metrics.minTime = elapsed + } + + // Update max time + if elapsed > info.metrics.maxTime { + info.metrics.maxTime = elapsed + } + info.metrics.mu.Unlock() + } +} + // IsRunning returns whether the streamer is currently active func (s *Streamer) IsRunning() bool { s.mu.RLock() @@ -131,7 +197,7 @@ func (s *Streamer) GetMetrics() metrics.StreamMetrics { // GetDroppedMessages returns the total number of dropped messages func (s *Streamer) GetDroppedMessages() uint64 { - return atomic.LoadUint64(&s.droppedMessages) + return atomic.LoadUint64(&s.totalDropped) } // logMetrics logs the current streaming statistics @@ -140,7 +206,8 @@ func (s *Streamer) logMetrics() { uptime := time.Since(metrics.ConnectedSince) const bitsPerMegabit = 1000000 - droppedMessages := atomic.LoadUint64(&s.droppedMessages) + totalDropped := atomic.LoadUint64(&s.totalDropped) + s.logger.Info( "Stream statistics", "uptime", @@ -157,11 +224,39 @@ func (s *Streamer) logMetrics() { fmt.Sprintf("%.0f", metrics.BitsPerSec), "mbps", fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit), - "dropped_messages", - droppedMessages, - "active_handlers", - len(s.semaphore), + "total_dropped", + totalDropped, ) + + // Log per-handler statistics + s.mu.RLock() + for i, info := range s.handlers { + info.metrics.mu.Lock() + if info.metrics.processedCount > 0 { + // Safe conversion: processedCount is bounded by maxInt64 + processedCount := info.metrics.processedCount + const maxInt64 = 1<<63 - 1 + if processedCount > maxInt64 { + processedCount = maxInt64 + } + //nolint:gosec // processedCount is explicitly bounded above + avgTime := info.metrics.totalTime / time.Duration(processedCount) + s.logger.Info( + "Handler statistics", + "handler", fmt.Sprintf("%T", info.handler), + "index", i, + "queue_len", len(info.queue), + "queue_cap", cap(info.queue), + "processed", info.metrics.processedCount, + "dropped", info.metrics.droppedCount, + "avg_time", avgTime, + "min_time", info.metrics.minTime, + "max_time", info.metrics.maxTime, + ) + } + info.metrics.mu.Unlock() + } + s.mu.RUnlock() } // updateMetrics updates the metrics counters and rates @@ -236,104 +331,91 @@ func (s *Streamer) stream(ctx context.Context) error { rawHandler(string(line)) } - // Get current handlers - s.mu.RLock() - handlers := make([]MessageHandler, len(s.handlers)) - copy(handlers, s.handlers) - s.mu.RUnlock() + // Parse the message first + var wrapper ristypes.RISLiveMessage + if err := json.Unmarshal(line, &wrapper); err != nil { + // Output the raw line and panic on parse failure + fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err) + fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(line)) + panic(fmt.Sprintf("JSON parse error: %v", err)) + } - // Try to acquire semaphore, drop message if at capacity - select { - case s.semaphore <- struct{}{}: - // Successfully acquired semaphore, process message - go func(rawLine []byte, messageHandlers []MessageHandler) { - defer func() { <-s.semaphore }() // Release semaphore when done + // Check if it's a ris_message wrapper + if wrapper.Type != "ris_message" { + s.logger.Error("Unexpected wrapper type", + "type", wrapper.Type, + "line", string(line), + ) - // Parse the outer wrapper first - var wrapper ristypes.RISLiveMessage - if err := json.Unmarshal(rawLine, &wrapper); err != nil { - // Output the raw line and panic on parse failure - fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err) - fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(rawLine)) - panic(fmt.Sprintf("JSON parse error: %v", err)) - } + continue + } - // Check if it's a ris_message wrapper - if wrapper.Type != "ris_message" { - s.logger.Error("Unexpected wrapper type", - "type", wrapper.Type, - "line", string(rawLine), - ) + // Get the actual message + msg := wrapper.Data - return - } + // Parse the timestamp + msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC() - // Get the actual message - msg := wrapper.Data + // Process based on message type + switch msg.Type { + case "UPDATE": + // Process BGP UPDATE messages + // Will be dispatched to handlers + case "RIS_PEER_STATE": + // RIS peer state messages - silently ignore + continue + case "KEEPALIVE": + // BGP keepalive messages - silently process + continue + case "OPEN": + // BGP open messages + s.logger.Info("BGP session opened", + "peer", msg.Peer, + "peer_asn", msg.PeerASN, + ) - // Parse the timestamp - msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0). - UTC() + continue + case "NOTIFICATION": + // BGP notification messages (errors) + s.logger.Warn("BGP notification", + "peer", msg.Peer, + "peer_asn", msg.PeerASN, + ) - // Process based on message type - switch msg.Type { - case "UPDATE": - // Process BGP UPDATE messages - // Will be handled by registered handlers - case "RIS_PEER_STATE": - // RIS peer state messages - silently ignore - case "KEEPALIVE": - // BGP keepalive messages - silently process - case "OPEN": - // BGP open messages - s.logger.Info("BGP session opened", - "peer", msg.Peer, - "peer_asn", msg.PeerASN, - ) - case "NOTIFICATION": - // BGP notification messages (errors) - s.logger.Warn("BGP notification", - "peer", msg.Peer, - "peer_asn", msg.PeerASN, - ) - case "STATE": - // Peer state changes - silently ignore - default: - fmt.Fprintf( - os.Stderr, - "UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n", - msg.Type, - string(rawLine), - ) - panic( - fmt.Sprintf( - "Unknown RIS message type: %s", - msg.Type, - ), - ) - } - - // Call handlers synchronously within this goroutine - // This prevents unbounded goroutine growth at the handler level - for _, handler := range messageHandlers { - if handler.WantsMessage(msg.Type) { - handler.HandleMessage(&msg) - } - } - }(append([]byte(nil), line...), handlers) // Copy the line to avoid data races + continue + case "STATE": + // Peer state changes - silently ignore + continue default: - // Semaphore is full, drop the message - dropped := atomic.AddUint64(&s.droppedMessages, 1) - if dropped%1000 == 0 { // Log every 1000 dropped messages - s.logger.Warn( - "Dropping messages due to overload", - "total_dropped", - dropped, - "max_handlers", - maxConcurrentHandlers, - ) + fmt.Fprintf( + os.Stderr, + "UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n", + msg.Type, + string(line), + ) + panic( + fmt.Sprintf( + "Unknown RIS message type: %s", + msg.Type, + ), + ) + } + + // Dispatch to interested handlers + s.mu.RLock() + for _, info := range s.handlers { + if info.handler.WantsMessage(msg.Type) { + select { + case info.queue <- &msg: + // Message queued successfully + default: + // Queue is full, drop the message + atomic.AddUint64(&info.metrics.droppedCount, 1) + atomic.AddUint64(&s.totalDropped, 1) + } } } + s.mu.RUnlock() } if err := scanner.Err(); err != nil {