routewatch/internal/streamer/streamer.go
sneak a555a1dee2 Replace live_routes database table with in-memory routing table
- Remove live_routes table from SQL schema and all related indexes
- Create new internal/routingtable package with thread-safe RoutingTable
- Implement RouteKey-based indexing with secondary indexes for efficient lookups
- Add RoutingTableHandler to manage in-memory routes separately from database
- Update DatabaseHandler to only handle persistent database operations
- Wire up RoutingTable through fx dependency injection
- Update server to get live route count from routing table instead of database
- Remove LiveRoutes field from database.Stats struct
- Update tests to work with new architecture
2025-07-27 23:16:19 +02:00

345 lines
8.5 KiB
Go

// Package streamer implements an HTTP client that connects to the RIPE RIS Live streaming API,
// parses BGP UPDATE messages from the JSON stream, and dispatches them to registered handlers.
package streamer
import (
"bufio"
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os"
"sync"
"sync/atomic"
"time"
"git.eeqj.de/sneak/routewatch/internal/metrics"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json"
metricsWindowSize = 60 // seconds for rolling average
metricsUpdateRate = time.Second
metricsLogInterval = 10 * time.Second
bytesPerKB = 1024
bytesPerMB = 1024 * 1024
maxConcurrentHandlers = 200 // Maximum number of concurrent message handlers
)
// MessageHandler is an interface for handling RIS messages
type MessageHandler interface {
// WantsMessage returns true if this handler wants to process messages of the given type
WantsMessage(messageType string) bool
// HandleMessage processes a RIS message
HandleMessage(msg *ristypes.RISMessage)
}
// RawMessageHandler is a callback for handling raw JSON lines from the stream
type RawMessageHandler func(line string)
// Streamer handles streaming BGP updates from RIS Live
type Streamer struct {
logger *slog.Logger
client *http.Client
handlers []MessageHandler
rawHandler RawMessageHandler
mu sync.RWMutex
cancel context.CancelFunc
running bool
metrics *metrics.Tracker
semaphore chan struct{} // Limits concurrent message processing
droppedMessages uint64 // Atomic counter for dropped messages
}
// New creates a new RIS streamer
func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer {
return &Streamer{
logger: logger,
client: &http.Client{
Timeout: 0, // No timeout for streaming
},
handlers: make([]MessageHandler, 0),
metrics: metrics,
semaphore: make(chan struct{}, maxConcurrentHandlers),
}
}
// RegisterHandler adds a callback for message processing
func (s *Streamer) RegisterHandler(handler MessageHandler) {
s.mu.Lock()
defer s.mu.Unlock()
s.handlers = append(s.handlers, handler)
}
// RegisterRawHandler sets a callback for raw message lines
func (s *Streamer) RegisterRawHandler(handler RawMessageHandler) {
s.mu.Lock()
defer s.mu.Unlock()
s.rawHandler = handler
}
// Start begins streaming in a goroutine
func (s *Streamer) Start() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.running {
return fmt.Errorf("streamer already running")
}
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
s.running = true
go func() {
if err := s.stream(ctx); err != nil {
s.logger.Error("Streaming error", "error", err)
}
s.mu.Lock()
s.running = false
s.mu.Unlock()
}()
return nil
}
// Stop halts the streaming
func (s *Streamer) Stop() {
s.mu.Lock()
if s.cancel != nil {
s.cancel()
}
s.mu.Unlock()
s.metrics.SetConnected(false)
}
// IsRunning returns whether the streamer is currently active
func (s *Streamer) IsRunning() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.running
}
// GetMetrics returns current streaming metrics
func (s *Streamer) GetMetrics() metrics.StreamMetrics {
return s.metrics.GetStreamMetrics()
}
// GetDroppedMessages returns the total number of dropped messages
func (s *Streamer) GetDroppedMessages() uint64 {
return atomic.LoadUint64(&s.droppedMessages)
}
// logMetrics logs the current streaming statistics
func (s *Streamer) logMetrics() {
metrics := s.metrics.GetStreamMetrics()
uptime := time.Since(metrics.ConnectedSince)
const bitsPerMegabit = 1000000
droppedMessages := atomic.LoadUint64(&s.droppedMessages)
s.logger.Info(
"Stream statistics",
"uptime",
uptime,
"total_messages",
metrics.TotalMessages,
"total_bytes",
metrics.TotalBytes,
"total_mb",
fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
"messages_per_sec",
fmt.Sprintf("%.2f", metrics.MessagesPerSec),
"bits_per_sec",
fmt.Sprintf("%.0f", metrics.BitsPerSec),
"mbps",
fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
"dropped_messages",
droppedMessages,
"active_handlers",
len(s.semaphore),
)
}
// updateMetrics updates the metrics counters and rates
func (s *Streamer) updateMetrics(messageBytes int) {
s.metrics.RecordMessage(int64(messageBytes))
}
func (s *Streamer) stream(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, "GET", risLiveURL, nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
resp, err := s.client.Do(req)
if err != nil {
return fmt.Errorf("failed to connect to RIS Live: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
s.logger.Error("Failed to close response body", "error", err)
}
}()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
s.logger.Info("Connected to RIS Live stream")
s.metrics.SetConnected(true)
// Start metrics logging goroutine
metricsTicker := time.NewTicker(metricsLogInterval)
defer metricsTicker.Stop()
go func() {
for {
select {
case <-metricsTicker.C:
s.logMetrics()
case <-ctx.Done():
return
}
}
}()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
select {
case <-ctx.Done():
s.logger.Info("Stream stopped by context")
return ctx.Err()
default:
}
line := scanner.Bytes()
if len(line) == 0 {
continue
}
// Update metrics with message size
s.updateMetrics(len(line))
// Call raw handler if registered
s.mu.RLock()
rawHandler := s.rawHandler
s.mu.RUnlock()
if rawHandler != nil {
// Call raw handler synchronously to preserve order
rawHandler(string(line))
}
// Get current handlers
s.mu.RLock()
handlers := make([]MessageHandler, len(s.handlers))
copy(handlers, s.handlers)
s.mu.RUnlock()
// Try to acquire semaphore, drop message if at capacity
select {
case s.semaphore <- struct{}{}:
// Successfully acquired semaphore, process message
go func(rawLine []byte, messageHandlers []MessageHandler) {
defer func() { <-s.semaphore }() // Release semaphore when done
// Parse the outer wrapper first
var wrapper ristypes.RISLiveMessage
if err := json.Unmarshal(rawLine, &wrapper); err != nil {
// Output the raw line and panic on parse failure
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err)
fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(rawLine))
panic(fmt.Sprintf("JSON parse error: %v", err))
}
// Check if it's a ris_message wrapper
if wrapper.Type != "ris_message" {
s.logger.Error("Unexpected wrapper type",
"type", wrapper.Type,
"line", string(rawLine),
)
return
}
// Get the actual message
msg := wrapper.Data
// Parse the timestamp
msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).
UTC()
// Process based on message type
switch msg.Type {
case "UPDATE":
// Process BGP UPDATE messages
// Will be handled by registered handlers
case "RIS_PEER_STATE":
// RIS peer state messages - silently ignore
case "KEEPALIVE":
// BGP keepalive messages - silently process
case "OPEN":
// BGP open messages
s.logger.Info("BGP session opened",
"peer", msg.Peer,
"peer_asn", msg.PeerASN,
)
case "NOTIFICATION":
// BGP notification messages (errors)
s.logger.Warn("BGP notification",
"peer", msg.Peer,
"peer_asn", msg.PeerASN,
)
case "STATE":
// Peer state changes - silently ignore
default:
fmt.Fprintf(
os.Stderr,
"UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n",
msg.Type,
string(rawLine),
)
panic(
fmt.Sprintf(
"Unknown RIS message type: %s",
msg.Type,
),
)
}
// Call handlers synchronously within this goroutine
// This prevents unbounded goroutine growth at the handler level
for _, handler := range messageHandlers {
if handler.WantsMessage(msg.Type) {
handler.HandleMessage(&msg)
}
}
}(append([]byte(nil), line...), handlers) // Copy the line to avoid data races
default:
// Semaphore is full, drop the message
dropped := atomic.AddUint64(&s.droppedMessages, 1)
if dropped%1000 == 0 { // Log every 1000 dropped messages
s.logger.Warn(
"Dropping messages due to overload",
"total_dropped",
dropped,
"max_handlers",
maxConcurrentHandlers,
)
}
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("scanner error: %w", err)
}
return nil
}