- Add semaphore to limit concurrent message handlers to 100 - Drop messages when at capacity instead of creating unbounded goroutines - Track and log dropped messages (every 1000 drops) - Remove nested goroutine spawning in handler loop - Add metrics for dropped messages and active handlers This prevents the memory usage from growing unboundedly when the database can't keep up with the incoming BGP message stream. Messages are dropped gracefully rather than causing OOM errors.
323 lines
8.4 KiB
Go
323 lines
8.4 KiB
Go
// Package streamer implements an HTTP client that connects to the RIPE RIS Live streaming API,
|
|
// parses BGP UPDATE messages from the JSON stream, and dispatches them to registered handlers.
|
|
package streamer
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.eeqj.de/sneak/routewatch/internal/metrics"
|
|
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
|
)
|
|
|
|
const (
|
|
risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json"
|
|
metricsWindowSize = 60 // seconds for rolling average
|
|
metricsUpdateRate = time.Second
|
|
metricsLogInterval = 10 * time.Second
|
|
bytesPerKB = 1024
|
|
bytesPerMB = 1024 * 1024
|
|
maxConcurrentHandlers = 100 // Maximum number of concurrent message handlers
|
|
)
|
|
|
|
// MessageHandler is an interface for handling RIS messages
|
|
type MessageHandler interface {
|
|
// WantsMessage returns true if this handler wants to process messages of the given type
|
|
WantsMessage(messageType string) bool
|
|
|
|
// HandleMessage processes a RIS message
|
|
HandleMessage(msg *ristypes.RISMessage)
|
|
}
|
|
|
|
// RawMessageHandler is a callback for handling raw JSON lines from the stream
|
|
type RawMessageHandler func(line string)
|
|
|
|
// Streamer handles streaming BGP updates from RIS Live
|
|
type Streamer struct {
|
|
logger *slog.Logger
|
|
client *http.Client
|
|
handlers []MessageHandler
|
|
rawHandler RawMessageHandler
|
|
mu sync.RWMutex
|
|
cancel context.CancelFunc
|
|
running bool
|
|
metrics *metrics.Tracker
|
|
semaphore chan struct{} // Limits concurrent message processing
|
|
droppedMessages uint64 // Atomic counter for dropped messages
|
|
}
|
|
|
|
// New creates a new RIS streamer
|
|
func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer {
|
|
return &Streamer{
|
|
logger: logger,
|
|
client: &http.Client{
|
|
Timeout: 0, // No timeout for streaming
|
|
},
|
|
handlers: make([]MessageHandler, 0),
|
|
metrics: metrics,
|
|
semaphore: make(chan struct{}, maxConcurrentHandlers),
|
|
}
|
|
}
|
|
|
|
// RegisterHandler adds a callback for message processing
|
|
func (s *Streamer) RegisterHandler(handler MessageHandler) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.handlers = append(s.handlers, handler)
|
|
}
|
|
|
|
// RegisterRawHandler sets a callback for raw message lines
|
|
func (s *Streamer) RegisterRawHandler(handler RawMessageHandler) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.rawHandler = handler
|
|
}
|
|
|
|
// Start begins streaming in a goroutine
|
|
func (s *Streamer) Start() error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.running {
|
|
return fmt.Errorf("streamer already running")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s.cancel = cancel
|
|
s.running = true
|
|
|
|
go func() {
|
|
if err := s.stream(ctx); err != nil {
|
|
s.logger.Error("Streaming error", "error", err)
|
|
}
|
|
s.mu.Lock()
|
|
s.running = false
|
|
s.mu.Unlock()
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop halts the streaming
|
|
func (s *Streamer) Stop() {
|
|
s.mu.Lock()
|
|
if s.cancel != nil {
|
|
s.cancel()
|
|
}
|
|
s.mu.Unlock()
|
|
s.metrics.SetConnected(false)
|
|
}
|
|
|
|
// IsRunning returns whether the streamer is currently active
|
|
func (s *Streamer) IsRunning() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
return s.running
|
|
}
|
|
|
|
// GetMetrics returns current streaming metrics
|
|
func (s *Streamer) GetMetrics() metrics.StreamMetrics {
|
|
return s.metrics.GetStreamMetrics()
|
|
}
|
|
|
|
// GetDroppedMessages returns the total number of dropped messages
|
|
func (s *Streamer) GetDroppedMessages() uint64 {
|
|
return atomic.LoadUint64(&s.droppedMessages)
|
|
}
|
|
|
|
// logMetrics logs the current streaming statistics
|
|
func (s *Streamer) logMetrics() {
|
|
metrics := s.metrics.GetStreamMetrics()
|
|
uptime := time.Since(metrics.ConnectedSince)
|
|
|
|
const bitsPerMegabit = 1000000
|
|
droppedMessages := atomic.LoadUint64(&s.droppedMessages)
|
|
s.logger.Info("Stream statistics",
|
|
"uptime", uptime,
|
|
"total_messages", metrics.TotalMessages,
|
|
"total_bytes", metrics.TotalBytes,
|
|
"total_mb", fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
|
|
"messages_per_sec", fmt.Sprintf("%.2f", metrics.MessagesPerSec),
|
|
"bits_per_sec", fmt.Sprintf("%.0f", metrics.BitsPerSec),
|
|
"mbps", fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
|
|
"dropped_messages", droppedMessages,
|
|
"active_handlers", len(s.semaphore),
|
|
)
|
|
}
|
|
|
|
// updateMetrics updates the metrics counters and rates
|
|
func (s *Streamer) updateMetrics(messageBytes int) {
|
|
s.metrics.RecordMessage(int64(messageBytes))
|
|
}
|
|
|
|
func (s *Streamer) stream(ctx context.Context) error {
|
|
req, err := http.NewRequestWithContext(ctx, "GET", risLiveURL, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
resp, err := s.client.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to RIS Live: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := resp.Body.Close(); err != nil {
|
|
s.logger.Error("Failed to close response body", "error", err)
|
|
}
|
|
}()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
|
}
|
|
|
|
s.logger.Info("Connected to RIS Live stream")
|
|
s.metrics.SetConnected(true)
|
|
|
|
// Start metrics logging goroutine
|
|
metricsTicker := time.NewTicker(metricsLogInterval)
|
|
defer metricsTicker.Stop()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-metricsTicker.C:
|
|
s.logMetrics()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
scanner := bufio.NewScanner(resp.Body)
|
|
|
|
for scanner.Scan() {
|
|
select {
|
|
case <-ctx.Done():
|
|
s.logger.Info("Stream stopped by context")
|
|
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
line := scanner.Bytes()
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Update metrics with message size
|
|
s.updateMetrics(len(line))
|
|
|
|
// Call raw handler if registered
|
|
s.mu.RLock()
|
|
rawHandler := s.rawHandler
|
|
s.mu.RUnlock()
|
|
|
|
if rawHandler != nil {
|
|
// Call raw handler synchronously to preserve order
|
|
rawHandler(string(line))
|
|
}
|
|
|
|
// Get current handlers
|
|
s.mu.RLock()
|
|
handlers := make([]MessageHandler, len(s.handlers))
|
|
copy(handlers, s.handlers)
|
|
s.mu.RUnlock()
|
|
|
|
// Try to acquire semaphore, drop message if at capacity
|
|
select {
|
|
case s.semaphore <- struct{}{}:
|
|
// Successfully acquired semaphore, process message
|
|
go func(rawLine []byte, messageHandlers []MessageHandler) {
|
|
defer func() { <-s.semaphore }() // Release semaphore when done
|
|
|
|
// Parse the outer wrapper first
|
|
var wrapper ristypes.RISLiveMessage
|
|
if err := json.Unmarshal(rawLine, &wrapper); err != nil {
|
|
// Output the raw line and panic on parse failure
|
|
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err)
|
|
fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(rawLine))
|
|
panic(fmt.Sprintf("JSON parse error: %v", err))
|
|
}
|
|
|
|
// Check if it's a ris_message wrapper
|
|
if wrapper.Type != "ris_message" {
|
|
s.logger.Error("Unexpected wrapper type",
|
|
"type", wrapper.Type,
|
|
"line", string(rawLine),
|
|
)
|
|
|
|
return
|
|
}
|
|
|
|
// Get the actual message
|
|
msg := wrapper.Data
|
|
|
|
// Parse the timestamp
|
|
msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC()
|
|
|
|
// Process based on message type
|
|
switch msg.Type {
|
|
case "UPDATE":
|
|
// Process BGP UPDATE messages
|
|
// Will be handled by registered handlers
|
|
case "RIS_PEER_STATE":
|
|
// RIS peer state messages - silently ignore
|
|
case "KEEPALIVE":
|
|
// BGP keepalive messages - silently process
|
|
case "OPEN":
|
|
// BGP open messages
|
|
s.logger.Info("BGP session opened",
|
|
"peer", msg.Peer,
|
|
"peer_asn", msg.PeerASN,
|
|
)
|
|
case "NOTIFICATION":
|
|
// BGP notification messages (errors)
|
|
s.logger.Warn("BGP notification",
|
|
"peer", msg.Peer,
|
|
"peer_asn", msg.PeerASN,
|
|
)
|
|
case "STATE":
|
|
// Peer state changes - silently ignore
|
|
default:
|
|
fmt.Fprintf(
|
|
os.Stderr,
|
|
"UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n",
|
|
msg.Type,
|
|
string(rawLine),
|
|
)
|
|
panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type))
|
|
}
|
|
|
|
// Call handlers synchronously within this goroutine
|
|
// This prevents unbounded goroutine growth at the handler level
|
|
for _, handler := range messageHandlers {
|
|
if handler.WantsMessage(msg.Type) {
|
|
handler.HandleMessage(&msg)
|
|
}
|
|
}
|
|
}(append([]byte(nil), line...), handlers) // Copy the line to avoid data races
|
|
default:
|
|
// Semaphore is full, drop the message
|
|
dropped := atomic.AddUint64(&s.droppedMessages, 1)
|
|
if dropped%1000 == 0 { // Log every 1000 dropped messages
|
|
s.logger.Warn("Dropping messages due to overload", "total_dropped", dropped, "max_handlers", maxConcurrentHandlers)
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
return fmt.Errorf("scanner error: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|