routewatch/internal/streamer/streamer.go
sneak 23127b86e9 Add queue high water marks to handler statistics
- Track the maximum queue length seen for each handler
- Display high water marks on the status page with percentage
- Helps identify which handlers are experiencing queue pressure
2025-07-29 02:46:53 +02:00

600 lines
15 KiB
Go

// Package streamer implements an HTTP client that connects to the RIPE RIS Live streaming API,
// parses BGP UPDATE messages from the JSON stream, and dispatches them to registered handlers.
package streamer
import (
"bufio"
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
"net/http"
"sync"
"sync/atomic"
"time"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/metrics"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json&" +
"client=https%3A%2F%2Fgit.eeqj.de%2Fsneak%2Froutewatch"
metricsWindowSize = 60 // seconds for rolling average
metricsUpdateRate = time.Second
minBackoffDelay = 5 * time.Second
maxBackoffDelay = 320 * time.Second
metricsLogInterval = 10 * time.Second
bytesPerKB = 1024
bytesPerMB = 1024 * 1024
maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers
// Backpressure constants
backpressureThreshold = 0.5 // Start dropping at 50% queue utilization
backpressureSlope = 2.0 // Slope for linear drop probability increase
)
// MessageHandler is an interface for handling RIS messages
type MessageHandler interface {
// WantsMessage returns true if this handler wants to process messages of the given type
WantsMessage(messageType string) bool
// HandleMessage processes a RIS message
HandleMessage(msg *ristypes.RISMessage)
// QueueCapacity returns the desired queue capacity for this handler
// Handlers that process quickly can have larger queues
QueueCapacity() int
}
// RawMessageHandler is a callback for handling raw JSON lines from the stream
type RawMessageHandler func(line string)
// handlerMetrics tracks performance metrics for a handler
type handlerMetrics struct {
processedCount uint64 // Total messages processed
droppedCount uint64 // Total messages dropped
totalTime time.Duration // Total processing time (for average calculation)
minTime time.Duration // Minimum processing time
maxTime time.Duration // Maximum processing time
queueHighWaterMark int // Maximum queue length seen
mu sync.Mutex // Protects the metrics
}
// handlerInfo wraps a handler with its queue and metrics
type handlerInfo struct {
handler MessageHandler
queue chan *ristypes.RISMessage
metrics handlerMetrics
}
// Streamer handles streaming BGP updates from RIS Live
type Streamer struct {
logger *logger.Logger
client *http.Client
handlers []*handlerInfo
rawHandler RawMessageHandler
mu sync.RWMutex
cancel context.CancelFunc
running bool
metrics *metrics.Tracker
totalDropped uint64 // Total dropped messages across all handlers
random *rand.Rand // Random number generator for backpressure drops
}
// New creates a new RIS streamer
func New(logger *logger.Logger, metrics *metrics.Tracker) *Streamer {
return &Streamer{
logger: logger,
client: &http.Client{
Timeout: 0, // No timeout for streaming
},
handlers: make([]*handlerInfo, 0),
metrics: metrics,
//nolint:gosec // Non-cryptographic randomness is fine for backpressure
random: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
// RegisterHandler adds a callback for message processing
func (s *Streamer) RegisterHandler(handler MessageHandler) {
s.mu.Lock()
defer s.mu.Unlock()
// Create handler info with its own queue based on capacity
info := &handlerInfo{
handler: handler,
queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()),
metrics: handlerMetrics{
minTime: time.Duration(math.MaxInt64), // Initialize to max so first value sets the floor
},
}
s.handlers = append(s.handlers, info)
// If we're already running, start a worker for this handler
if s.running {
go s.runHandlerWorker(info)
}
}
// RegisterRawHandler sets a callback for raw message lines
func (s *Streamer) RegisterRawHandler(handler RawMessageHandler) {
s.mu.Lock()
defer s.mu.Unlock()
s.rawHandler = handler
}
// Start begins streaming in a goroutine
func (s *Streamer) Start() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.running {
return fmt.Errorf("streamer already running")
}
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
s.running = true
// Start workers for each handler
for _, info := range s.handlers {
go s.runHandlerWorker(info)
}
go func() {
s.streamWithReconnect(ctx)
s.mu.Lock()
s.running = false
s.mu.Unlock()
}()
return nil
}
// Stop halts the streaming
func (s *Streamer) Stop() {
s.mu.Lock()
if s.cancel != nil {
s.cancel()
}
// Close all handler queues to signal workers to stop
for _, info := range s.handlers {
close(info.queue)
}
s.running = false
s.mu.Unlock()
s.metrics.SetConnected(false)
}
// runHandlerWorker processes messages for a specific handler
func (s *Streamer) runHandlerWorker(info *handlerInfo) {
for msg := range info.queue {
start := time.Now()
info.handler.HandleMessage(msg)
elapsed := time.Since(start)
// Update metrics
info.metrics.mu.Lock()
info.metrics.processedCount++
info.metrics.totalTime += elapsed
// Update min time
if elapsed < info.metrics.minTime {
info.metrics.minTime = elapsed
}
// Update max time
if elapsed > info.metrics.maxTime {
info.metrics.maxTime = elapsed
}
info.metrics.mu.Unlock()
}
}
// IsRunning returns whether the streamer is currently active
func (s *Streamer) IsRunning() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.running
}
// GetMetrics returns current streaming metrics
func (s *Streamer) GetMetrics() metrics.StreamMetrics {
return s.metrics.GetStreamMetrics()
}
// GetMetricsTracker returns the metrics tracker instance
func (s *Streamer) GetMetricsTracker() *metrics.Tracker {
return s.metrics
}
// HandlerStats represents metrics for a single handler
type HandlerStats struct {
Name string
QueueLength int
QueueCapacity int
QueueHighWaterMark int
ProcessedCount uint64
DroppedCount uint64
AvgProcessTime time.Duration
MinProcessTime time.Duration
MaxProcessTime time.Duration
}
// GetHandlerStats returns current handler statistics
func (s *Streamer) GetHandlerStats() []HandlerStats {
s.mu.RLock()
defer s.mu.RUnlock()
stats := make([]HandlerStats, 0, len(s.handlers))
for _, info := range s.handlers {
info.metrics.mu.Lock()
hs := HandlerStats{
Name: fmt.Sprintf("%T", info.handler),
QueueLength: len(info.queue),
QueueCapacity: cap(info.queue),
QueueHighWaterMark: info.metrics.queueHighWaterMark,
ProcessedCount: info.metrics.processedCount,
DroppedCount: info.metrics.droppedCount,
MinProcessTime: info.metrics.minTime,
MaxProcessTime: info.metrics.maxTime,
}
// Calculate average time
if info.metrics.processedCount > 0 {
processedCount := info.metrics.processedCount
const maxInt64 = 1<<63 - 1
if processedCount > maxInt64 {
processedCount = maxInt64
}
//nolint:gosec // processedCount is explicitly bounded above
hs.AvgProcessTime = info.metrics.totalTime / time.Duration(processedCount)
}
info.metrics.mu.Unlock()
stats = append(stats, hs)
}
return stats
}
// GetDroppedMessages returns the total number of dropped messages
func (s *Streamer) GetDroppedMessages() uint64 {
return atomic.LoadUint64(&s.totalDropped)
}
// logMetrics logs the current streaming statistics
func (s *Streamer) logMetrics() {
metrics := s.metrics.GetStreamMetrics()
uptime := time.Since(metrics.ConnectedSince)
const bitsPerMegabit = 1000000
totalDropped := atomic.LoadUint64(&s.totalDropped)
s.logger.Info(
"Stream statistics",
"uptime",
uptime,
"total_messages",
metrics.TotalMessages,
"total_bytes",
metrics.TotalBytes,
"total_mb",
fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
"messages_per_sec",
fmt.Sprintf("%.2f", metrics.MessagesPerSec),
"bits_per_sec",
fmt.Sprintf("%.0f", metrics.BitsPerSec),
"mbps",
fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
"total_dropped",
totalDropped,
)
// Log per-handler statistics
s.mu.RLock()
for i, info := range s.handlers {
info.metrics.mu.Lock()
if info.metrics.processedCount > 0 {
// Safe conversion: processedCount is bounded by maxInt64
processedCount := info.metrics.processedCount
const maxInt64 = 1<<63 - 1
if processedCount > maxInt64 {
processedCount = maxInt64
}
//nolint:gosec // processedCount is explicitly bounded above
avgTime := info.metrics.totalTime / time.Duration(processedCount)
s.logger.Info(
"Handler statistics",
"handler", fmt.Sprintf("%T", info.handler),
"index", i,
"queue_len", len(info.queue),
"queue_cap", cap(info.queue),
"processed", info.metrics.processedCount,
"dropped", info.metrics.droppedCount,
"avg_time", avgTime,
"min_time", info.metrics.minTime,
"max_time", info.metrics.maxTime,
)
}
info.metrics.mu.Unlock()
}
s.mu.RUnlock()
}
// updateMetrics updates the metrics counters and rates
func (s *Streamer) updateMetrics(messageBytes int) {
s.metrics.RecordMessage(int64(messageBytes))
}
// streamWithReconnect handles streaming with automatic reconnection and exponential backoff
func (s *Streamer) streamWithReconnect(ctx context.Context) {
backoffDelay := minBackoffDelay
consecutiveFailures := 0
for {
select {
case <-ctx.Done():
s.logger.Info("Stream context cancelled, stopping reconnection attempts")
return
default:
}
// Attempt to stream
startTime := time.Now()
err := s.stream(ctx)
streamDuration := time.Since(startTime)
if err == nil {
// Clean exit (context cancelled)
return
}
// Log the error
s.logger.Error("Stream disconnected",
"error", err,
"consecutive_failures", consecutiveFailures+1,
"stream_duration", streamDuration)
s.metrics.SetConnected(false)
// Check if context is cancelled
if ctx.Err() != nil {
return
}
// If we streamed for more than 30 seconds, reset the backoff
// This indicates we had a successful connection that received data
if streamDuration > 30*time.Second {
s.logger.Info("Resetting backoff delay due to successful connection",
"stream_duration", streamDuration)
backoffDelay = minBackoffDelay
consecutiveFailures = 0
} else {
// Increment consecutive failures
consecutiveFailures++
}
// Wait with exponential backoff
s.logger.Info("Waiting before reconnection attempt",
"delay_seconds", backoffDelay.Seconds(),
"consecutive_failures", consecutiveFailures)
select {
case <-ctx.Done():
return
case <-time.After(backoffDelay):
// Double the backoff delay for next time, up to max
backoffDelay *= 2
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
}
}
}
}
func (s *Streamer) stream(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, "GET", risLiveURL, nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
resp, err := s.client.Do(req)
if err != nil {
return fmt.Errorf("failed to connect to RIS Live: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
s.logger.Error("Failed to close response body", "error", err)
}
}()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
s.logger.Info("Connected to RIS Live stream")
s.metrics.SetConnected(true)
// Start metrics logging goroutine
metricsTicker := time.NewTicker(metricsLogInterval)
defer metricsTicker.Stop()
go func() {
for {
select {
case <-metricsTicker.C:
s.logMetrics()
case <-ctx.Done():
return
}
}
}()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
select {
case <-ctx.Done():
s.logger.Info("Stream stopped by context")
return ctx.Err()
default:
}
line := scanner.Bytes()
if len(line) == 0 {
continue
}
// Update metrics with message size
s.updateMetrics(len(line))
// Call raw handler if registered
s.mu.RLock()
rawHandler := s.rawHandler
s.mu.RUnlock()
if rawHandler != nil {
// Call raw handler synchronously to preserve order
rawHandler(string(line))
}
// Parse the message first
var wrapper ristypes.RISLiveMessage
if err := json.Unmarshal(line, &wrapper); err != nil {
// Log the error and return to trigger reconnection
s.logger.Error("Failed to parse JSON",
"error", err,
"line", string(line),
"line_length", len(line))
return fmt.Errorf("JSON parse error: %w", err)
}
// Check if it's a ris_message wrapper
if wrapper.Type != "ris_message" {
s.logger.Error("Unexpected wrapper type",
"type", wrapper.Type,
"line", string(line),
)
continue
}
// Get the actual message
msg := wrapper.Data
// Parse the timestamp
msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC()
// Process based on message type
switch msg.Type {
case "UPDATE":
// Process BGP UPDATE messages
// Will be dispatched to handlers
case "RIS_PEER_STATE":
// RIS peer state messages - silently ignore
continue
case "KEEPALIVE":
// BGP keepalive messages - silently process
continue
case "OPEN":
// BGP open messages
s.logger.Info("BGP session opened",
"peer", msg.Peer,
"peer_asn", msg.PeerASN,
)
continue
case "NOTIFICATION":
// BGP notification messages (errors)
s.logger.Warn("BGP notification",
"peer", msg.Peer,
"peer_asn", msg.PeerASN,
)
continue
case "STATE":
// Peer state changes - silently ignore
continue
default:
s.logger.Error("Unknown message type",
"type", msg.Type,
"line", string(line),
)
panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type))
}
// Dispatch to interested handlers
s.mu.RLock()
for _, info := range s.handlers {
if !info.handler.WantsMessage(msg.Type) {
continue
}
// Check if we should drop due to backpressure
if s.shouldDropForBackpressure(info) {
atomic.AddUint64(&info.metrics.droppedCount, 1)
atomic.AddUint64(&s.totalDropped, 1)
continue
}
// Try to queue the message
select {
case info.queue <- &msg:
// Message queued successfully
// Update high water mark if needed
queueLen := len(info.queue)
info.metrics.mu.Lock()
if queueLen > info.metrics.queueHighWaterMark {
info.metrics.queueHighWaterMark = queueLen
}
info.metrics.mu.Unlock()
default:
// Queue is full, drop the message
atomic.AddUint64(&info.metrics.droppedCount, 1)
atomic.AddUint64(&s.totalDropped, 1)
}
}
s.mu.RUnlock()
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("scanner error: %w", err)
}
return nil
}
// shouldDropForBackpressure determines if a message should be dropped based on queue utilization
func (s *Streamer) shouldDropForBackpressure(info *handlerInfo) bool {
// Calculate queue utilization
queueLen := len(info.queue)
queueCap := cap(info.queue)
utilization := float64(queueLen) / float64(queueCap)
// No drops below threshold
if utilization < backpressureThreshold {
return false
}
// Calculate drop probability (0.0 at threshold, 1.0 at 100% full)
dropProbability := (utilization - backpressureThreshold) * backpressureSlope
if dropProbability > 1.0 {
dropProbability = 1.0
}
// Random drop based on probability
return s.random.Float64() < dropProbability
}