Add comprehensive godoc comments to all exported types, functions, and constants throughout the codebase. Create README.md documenting the project architecture, execution flow, database schema, and component relationships.
631 lines
18 KiB
Go
631 lines
18 KiB
Go
// Package streamer implements an HTTP client that connects to the RIPE RIS Live streaming API,
|
|
// parses BGP UPDATE messages from the JSON stream, and dispatches them to registered handlers.
|
|
package streamer
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"net/http"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.eeqj.de/sneak/routewatch/internal/logger"
|
|
"git.eeqj.de/sneak/routewatch/internal/metrics"
|
|
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
|
)
|
|
|
|
// Configuration constants for the RIS Live streamer.
|
|
const (
|
|
risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json&" +
|
|
"client=https%3A%2F%2Fgit.eeqj.de%2Fsneak%2Froutewatch"
|
|
metricsWindowSize = 60 // seconds for rolling average
|
|
metricsUpdateRate = time.Second
|
|
minBackoffDelay = 5 * time.Second
|
|
maxBackoffDelay = 320 * time.Second
|
|
metricsLogInterval = 10 * time.Second
|
|
bytesPerKB = 1024
|
|
bytesPerMB = 1024 * 1024
|
|
maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers
|
|
|
|
// Backpressure constants
|
|
backpressureThreshold = 0.5 // Start dropping at 50% queue utilization
|
|
backpressureSlope = 2.0 // Slope for linear drop probability increase
|
|
)
|
|
|
|
// MessageHandler defines the interface for processing RIS messages.
|
|
// Implementations must specify which message types they want to receive,
|
|
// how to process messages, and their desired queue capacity.
|
|
type MessageHandler interface {
|
|
// WantsMessage returns true if this handler wants to process messages of the given type.
|
|
WantsMessage(messageType string) bool
|
|
|
|
// HandleMessage processes a RIS message. This method is called from a dedicated
|
|
// goroutine for each handler, so implementations do not need to be goroutine-safe
|
|
// with respect to other handlers.
|
|
HandleMessage(msg *ristypes.RISMessage)
|
|
|
|
// QueueCapacity returns the desired queue capacity for this handler.
|
|
// Handlers that process quickly can have larger queues to buffer bursts.
|
|
// When the queue fills up, messages will be dropped according to the
|
|
// backpressure algorithm.
|
|
QueueCapacity() int
|
|
}
|
|
|
|
// RawMessageHandler is a function type for processing raw JSON lines from the stream.
|
|
// It receives the unmodified JSON line as a string before any parsing occurs.
|
|
type RawMessageHandler func(line string)
|
|
|
|
// handlerMetrics tracks performance metrics for a handler
|
|
type handlerMetrics struct {
|
|
processedCount uint64 // Total messages processed
|
|
droppedCount uint64 // Total messages dropped
|
|
totalTime time.Duration // Total processing time (for average calculation)
|
|
minTime time.Duration // Minimum processing time
|
|
maxTime time.Duration // Maximum processing time
|
|
queueHighWaterMark int // Maximum queue length seen
|
|
mu sync.Mutex // Protects the metrics
|
|
}
|
|
|
|
// handlerInfo wraps a handler with its queue and metrics
|
|
type handlerInfo struct {
|
|
handler MessageHandler
|
|
queue chan *ristypes.RISMessage
|
|
metrics handlerMetrics
|
|
}
|
|
|
|
// Streamer manages a connection to the RIPE RIS Live streaming API for receiving
|
|
// real-time BGP UPDATE messages. It handles automatic reconnection with exponential
|
|
// backoff, dispatches messages to registered handlers via per-handler queues, and
|
|
// implements backpressure to prevent queue overflow during high traffic periods.
|
|
type Streamer struct {
|
|
logger *logger.Logger
|
|
client *http.Client
|
|
handlers []*handlerInfo
|
|
rawHandler RawMessageHandler
|
|
mu sync.RWMutex
|
|
cancel context.CancelFunc
|
|
running bool
|
|
metrics *metrics.Tracker
|
|
totalDropped uint64 // Total dropped messages across all handlers
|
|
random *rand.Rand // Random number generator for backpressure drops
|
|
}
|
|
|
|
// New creates a new Streamer instance configured to connect to the RIS Live API.
|
|
// The logger is used for structured logging of connection events and errors.
|
|
// The metrics tracker is used to record message counts, bytes received, and connection status.
|
|
func New(logger *logger.Logger, metrics *metrics.Tracker) *Streamer {
|
|
return &Streamer{
|
|
logger: logger,
|
|
client: &http.Client{
|
|
Timeout: 0, // No timeout for streaming
|
|
},
|
|
handlers: make([]*handlerInfo, 0),
|
|
metrics: metrics,
|
|
//nolint:gosec // Non-cryptographic randomness is fine for backpressure
|
|
random: rand.New(rand.NewSource(time.Now().UnixNano())),
|
|
}
|
|
}
|
|
|
|
// RegisterHandler adds a MessageHandler to receive parsed RIS messages.
|
|
// Each handler gets its own dedicated queue and worker goroutine for processing.
|
|
// If the streamer is already running, the handler's worker is started immediately.
|
|
func (s *Streamer) RegisterHandler(handler MessageHandler) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
// Create handler info with its own queue based on capacity
|
|
info := &handlerInfo{
|
|
handler: handler,
|
|
queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()),
|
|
metrics: handlerMetrics{
|
|
minTime: time.Duration(math.MaxInt64), // Initialize to max so first value sets the floor
|
|
},
|
|
}
|
|
|
|
s.handlers = append(s.handlers, info)
|
|
|
|
// If we're already running, start a worker for this handler
|
|
if s.running {
|
|
go s.runHandlerWorker(info)
|
|
}
|
|
}
|
|
|
|
// RegisterRawHandler sets a callback to receive raw JSON lines from the stream
|
|
// before they are parsed. Only one raw handler can be registered at a time;
|
|
// subsequent calls will replace the previous handler.
|
|
func (s *Streamer) RegisterRawHandler(handler RawMessageHandler) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.rawHandler = handler
|
|
}
|
|
|
|
// Start begins streaming BGP updates from the RIS Live API in a background goroutine.
|
|
// It starts worker goroutines for each registered handler and manages automatic
|
|
// reconnection with exponential backoff on connection failures.
|
|
// Returns an error if the streamer is already running.
|
|
func (s *Streamer) Start() error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.running {
|
|
return fmt.Errorf("streamer already running")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s.cancel = cancel
|
|
s.running = true
|
|
|
|
// Start workers for each handler
|
|
for _, info := range s.handlers {
|
|
go s.runHandlerWorker(info)
|
|
}
|
|
|
|
go func() {
|
|
s.streamWithReconnect(ctx)
|
|
s.mu.Lock()
|
|
s.running = false
|
|
s.mu.Unlock()
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop halts the streaming connection and shuts down all handler workers.
|
|
// It cancels the streaming context, closes all handler queues, and updates
|
|
// the connection status in metrics. This method is safe to call multiple times.
|
|
func (s *Streamer) Stop() {
|
|
s.mu.Lock()
|
|
if s.cancel != nil {
|
|
s.cancel()
|
|
}
|
|
// Close all handler queues to signal workers to stop
|
|
for _, info := range s.handlers {
|
|
close(info.queue)
|
|
}
|
|
s.running = false
|
|
s.mu.Unlock()
|
|
s.metrics.SetConnected(false)
|
|
}
|
|
|
|
// runHandlerWorker processes messages for a specific handler
|
|
func (s *Streamer) runHandlerWorker(info *handlerInfo) {
|
|
for msg := range info.queue {
|
|
start := time.Now()
|
|
info.handler.HandleMessage(msg)
|
|
elapsed := time.Since(start)
|
|
|
|
// Update metrics
|
|
info.metrics.mu.Lock()
|
|
info.metrics.processedCount++
|
|
info.metrics.totalTime += elapsed
|
|
|
|
// Update min time
|
|
if elapsed < info.metrics.minTime {
|
|
info.metrics.minTime = elapsed
|
|
}
|
|
|
|
// Update max time
|
|
if elapsed > info.metrics.maxTime {
|
|
info.metrics.maxTime = elapsed
|
|
}
|
|
info.metrics.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// IsRunning reports whether the streamer is currently connected and processing messages.
|
|
// This is safe to call concurrently from multiple goroutines.
|
|
func (s *Streamer) IsRunning() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
return s.running
|
|
}
|
|
|
|
// GetMetrics returns the current streaming metrics including message counts,
|
|
// bytes received, and throughput rates. The returned struct is a snapshot
|
|
// of the current state and is safe to use without synchronization.
|
|
func (s *Streamer) GetMetrics() metrics.StreamMetrics {
|
|
return s.metrics.GetStreamMetrics()
|
|
}
|
|
|
|
// GetMetricsTracker returns the underlying metrics.Tracker instance for direct access
|
|
// to metrics recording and retrieval functionality.
|
|
func (s *Streamer) GetMetricsTracker() *metrics.Tracker {
|
|
return s.metrics
|
|
}
|
|
|
|
// HandlerStats contains performance metrics for a single message handler.
|
|
// It includes queue utilization, message counts, and processing time statistics.
|
|
type HandlerStats struct {
|
|
Name string
|
|
QueueLength int
|
|
QueueCapacity int
|
|
QueueHighWaterMark int
|
|
ProcessedCount uint64
|
|
DroppedCount uint64
|
|
AvgProcessTime time.Duration
|
|
MinProcessTime time.Duration
|
|
MaxProcessTime time.Duration
|
|
}
|
|
|
|
// GetHandlerStats returns a snapshot of performance statistics for all registered
|
|
// handlers. The returned slice contains one HandlerStats entry per handler with
|
|
// current queue depth, processed/dropped counts, and processing time statistics.
|
|
func (s *Streamer) GetHandlerStats() []HandlerStats {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
stats := make([]HandlerStats, 0, len(s.handlers))
|
|
|
|
for _, info := range s.handlers {
|
|
info.metrics.mu.Lock()
|
|
|
|
hs := HandlerStats{
|
|
Name: fmt.Sprintf("%T", info.handler),
|
|
QueueLength: len(info.queue),
|
|
QueueCapacity: cap(info.queue),
|
|
QueueHighWaterMark: info.metrics.queueHighWaterMark,
|
|
ProcessedCount: info.metrics.processedCount,
|
|
DroppedCount: info.metrics.droppedCount,
|
|
MinProcessTime: info.metrics.minTime,
|
|
MaxProcessTime: info.metrics.maxTime,
|
|
}
|
|
|
|
// Calculate average time
|
|
if info.metrics.processedCount > 0 {
|
|
processedCount := info.metrics.processedCount
|
|
const maxInt64 = 1<<63 - 1
|
|
if processedCount > maxInt64 {
|
|
processedCount = maxInt64
|
|
}
|
|
//nolint:gosec // processedCount is explicitly bounded above
|
|
hs.AvgProcessTime = info.metrics.totalTime / time.Duration(processedCount)
|
|
}
|
|
|
|
info.metrics.mu.Unlock()
|
|
|
|
stats = append(stats, hs)
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// GetDroppedMessages returns the total number of messages dropped across all handlers
|
|
// due to queue overflow or backpressure. This counter is monotonically increasing
|
|
// and is safe to call concurrently.
|
|
func (s *Streamer) GetDroppedMessages() uint64 {
|
|
return atomic.LoadUint64(&s.totalDropped)
|
|
}
|
|
|
|
// logMetrics logs the current streaming statistics
|
|
func (s *Streamer) logMetrics() {
|
|
metrics := s.metrics.GetStreamMetrics()
|
|
uptime := time.Since(metrics.ConnectedSince)
|
|
|
|
const bitsPerMegabit = 1000000
|
|
totalDropped := atomic.LoadUint64(&s.totalDropped)
|
|
|
|
s.logger.Info(
|
|
"Stream statistics",
|
|
"uptime",
|
|
uptime,
|
|
"total_messages",
|
|
metrics.TotalMessages,
|
|
"total_bytes",
|
|
metrics.TotalBytes,
|
|
"total_mb",
|
|
fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
|
|
"messages_per_sec",
|
|
fmt.Sprintf("%.2f", metrics.MessagesPerSec),
|
|
"bits_per_sec",
|
|
fmt.Sprintf("%.0f", metrics.BitsPerSec),
|
|
"mbps",
|
|
fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
|
|
"total_dropped",
|
|
totalDropped,
|
|
)
|
|
|
|
// Log per-handler statistics
|
|
s.mu.RLock()
|
|
for i, info := range s.handlers {
|
|
info.metrics.mu.Lock()
|
|
if info.metrics.processedCount > 0 {
|
|
// Safe conversion: processedCount is bounded by maxInt64
|
|
processedCount := info.metrics.processedCount
|
|
const maxInt64 = 1<<63 - 1
|
|
if processedCount > maxInt64 {
|
|
processedCount = maxInt64
|
|
}
|
|
//nolint:gosec // processedCount is explicitly bounded above
|
|
avgTime := info.metrics.totalTime / time.Duration(processedCount)
|
|
s.logger.Info(
|
|
"Handler statistics",
|
|
"handler", fmt.Sprintf("%T", info.handler),
|
|
"index", i,
|
|
"queue_len", len(info.queue),
|
|
"queue_cap", cap(info.queue),
|
|
"processed", info.metrics.processedCount,
|
|
"dropped", info.metrics.droppedCount,
|
|
"avg_time", avgTime,
|
|
"min_time", info.metrics.minTime,
|
|
"max_time", info.metrics.maxTime,
|
|
)
|
|
}
|
|
info.metrics.mu.Unlock()
|
|
}
|
|
s.mu.RUnlock()
|
|
}
|
|
|
|
// updateMetrics updates the metrics counters and rates
|
|
func (s *Streamer) updateMetrics(messageBytes int) {
|
|
s.metrics.RecordMessage(int64(messageBytes))
|
|
}
|
|
|
|
// streamWithReconnect handles streaming with automatic reconnection and exponential backoff
|
|
func (s *Streamer) streamWithReconnect(ctx context.Context) {
|
|
backoffDelay := minBackoffDelay
|
|
consecutiveFailures := 0
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
s.logger.Info("Stream context cancelled, stopping reconnection attempts")
|
|
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Attempt to stream
|
|
startTime := time.Now()
|
|
err := s.stream(ctx)
|
|
streamDuration := time.Since(startTime)
|
|
|
|
if err == nil {
|
|
// Clean exit (context cancelled)
|
|
return
|
|
}
|
|
|
|
// Log the error
|
|
s.logger.Error("Stream disconnected",
|
|
"error", err,
|
|
"consecutive_failures", consecutiveFailures+1,
|
|
"stream_duration", streamDuration)
|
|
s.metrics.SetConnected(false)
|
|
|
|
// Check if context is cancelled
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
// If we streamed for more than 30 seconds, reset the backoff
|
|
// This indicates we had a successful connection that received data
|
|
if streamDuration > 30*time.Second {
|
|
s.logger.Info("Resetting backoff delay due to successful connection",
|
|
"stream_duration", streamDuration)
|
|
backoffDelay = minBackoffDelay
|
|
consecutiveFailures = 0
|
|
} else {
|
|
// Increment consecutive failures
|
|
consecutiveFailures++
|
|
}
|
|
|
|
// Wait with exponential backoff
|
|
s.logger.Info("Waiting before reconnection attempt",
|
|
"delay_seconds", backoffDelay.Seconds(),
|
|
"consecutive_failures", consecutiveFailures)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(backoffDelay):
|
|
// Double the backoff delay for next time, up to max
|
|
backoffDelay *= 2
|
|
if backoffDelay > maxBackoffDelay {
|
|
backoffDelay = maxBackoffDelay
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Streamer) stream(ctx context.Context) error {
|
|
req, err := http.NewRequestWithContext(ctx, "GET", risLiveURL, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
resp, err := s.client.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to RIS Live: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := resp.Body.Close(); err != nil {
|
|
s.logger.Error("Failed to close response body", "error", err)
|
|
}
|
|
}()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
|
}
|
|
|
|
s.logger.Info("Connected to RIS Live stream")
|
|
s.metrics.SetConnected(true)
|
|
|
|
// Start metrics logging goroutine
|
|
metricsTicker := time.NewTicker(metricsLogInterval)
|
|
defer metricsTicker.Stop()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-metricsTicker.C:
|
|
s.logMetrics()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
scanner := bufio.NewScanner(resp.Body)
|
|
|
|
for scanner.Scan() {
|
|
select {
|
|
case <-ctx.Done():
|
|
s.logger.Info("Stream stopped by context")
|
|
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
line := scanner.Bytes()
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Update metrics with message size
|
|
s.updateMetrics(len(line))
|
|
|
|
// Call raw handler if registered
|
|
s.mu.RLock()
|
|
rawHandler := s.rawHandler
|
|
s.mu.RUnlock()
|
|
|
|
if rawHandler != nil {
|
|
// Call raw handler synchronously to preserve order
|
|
rawHandler(string(line))
|
|
}
|
|
|
|
// Parse the message first
|
|
var wrapper ristypes.RISLiveMessage
|
|
if err := json.Unmarshal(line, &wrapper); err != nil {
|
|
// Log the error and return to trigger reconnection
|
|
s.logger.Error("Failed to parse JSON",
|
|
"error", err,
|
|
"line", string(line),
|
|
"line_length", len(line))
|
|
|
|
return fmt.Errorf("JSON parse error: %w", err)
|
|
}
|
|
|
|
// Check if it's a ris_message wrapper
|
|
if wrapper.Type != "ris_message" {
|
|
s.logger.Error("Unexpected wrapper type",
|
|
"type", wrapper.Type,
|
|
"line", string(line),
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
// Get the actual message
|
|
msg := wrapper.Data
|
|
|
|
// Parse the timestamp
|
|
msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC()
|
|
|
|
// Process based on message type
|
|
switch msg.Type {
|
|
case "UPDATE":
|
|
// Process BGP UPDATE messages
|
|
// Will be dispatched to handlers
|
|
case "RIS_PEER_STATE":
|
|
// RIS peer state messages - silently ignore
|
|
continue
|
|
case "KEEPALIVE":
|
|
// BGP keepalive messages - silently process
|
|
continue
|
|
case "OPEN":
|
|
// BGP open messages
|
|
s.logger.Info("BGP session opened",
|
|
"peer", msg.Peer,
|
|
"peer_asn", msg.PeerASN,
|
|
)
|
|
|
|
continue
|
|
case "NOTIFICATION":
|
|
// BGP notification messages (errors)
|
|
s.logger.Warn("BGP notification",
|
|
"peer", msg.Peer,
|
|
"peer_asn", msg.PeerASN,
|
|
)
|
|
|
|
continue
|
|
case "STATE":
|
|
// Peer state changes - silently ignore
|
|
continue
|
|
default:
|
|
s.logger.Error("Unknown message type",
|
|
"type", msg.Type,
|
|
"line", string(line),
|
|
)
|
|
panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type))
|
|
}
|
|
|
|
// Dispatch to interested handlers
|
|
s.mu.RLock()
|
|
for _, info := range s.handlers {
|
|
if !info.handler.WantsMessage(msg.Type) {
|
|
continue
|
|
}
|
|
|
|
// Check if we should drop due to backpressure
|
|
if s.shouldDropForBackpressure(info) {
|
|
atomic.AddUint64(&info.metrics.droppedCount, 1)
|
|
atomic.AddUint64(&s.totalDropped, 1)
|
|
|
|
continue
|
|
}
|
|
|
|
// Try to queue the message
|
|
select {
|
|
case info.queue <- &msg:
|
|
// Message queued successfully
|
|
// Update high water mark if needed
|
|
queueLen := len(info.queue)
|
|
info.metrics.mu.Lock()
|
|
if queueLen > info.metrics.queueHighWaterMark {
|
|
info.metrics.queueHighWaterMark = queueLen
|
|
}
|
|
info.metrics.mu.Unlock()
|
|
default:
|
|
// Queue is full, drop the message
|
|
atomic.AddUint64(&info.metrics.droppedCount, 1)
|
|
atomic.AddUint64(&s.totalDropped, 1)
|
|
}
|
|
}
|
|
s.mu.RUnlock()
|
|
}
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
return fmt.Errorf("scanner error: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// shouldDropForBackpressure determines if a message should be dropped based on queue utilization
|
|
func (s *Streamer) shouldDropForBackpressure(info *handlerInfo) bool {
|
|
// Calculate queue utilization
|
|
queueLen := len(info.queue)
|
|
queueCap := cap(info.queue)
|
|
utilization := float64(queueLen) / float64(queueCap)
|
|
|
|
// No drops below threshold
|
|
if utilization < backpressureThreshold {
|
|
return false
|
|
}
|
|
|
|
// Calculate drop probability (0.0 at threshold, 1.0 at 100% full)
|
|
dropProbability := (utilization - backpressureThreshold) * backpressureSlope
|
|
if dropProbability > 1.0 {
|
|
dropProbability = 1.0
|
|
}
|
|
|
|
// Random drop based on probability
|
|
return s.random.Float64() < dropProbability
|
|
}
|