- Add context-aware versions of all read operations in the database - Update handlers to use context from HTTP requests - Allows database queries to be cancelled when HTTP requests timeout - Prevents database operations from continuing after client disconnects
549 lines
14 KiB
Go
549 lines
14 KiB
Go
// Package streamer implements an HTTP client that connects to the RIPE RIS Live streaming API,
|
|
// parses BGP UPDATE messages from the JSON stream, and dispatches them to registered handlers.
|
|
package streamer
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math"
|
|
"net/http"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.eeqj.de/sneak/routewatch/internal/logger"
|
|
"git.eeqj.de/sneak/routewatch/internal/metrics"
|
|
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
|
)
|
|
|
|
const (
|
|
risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json&" +
|
|
"client=https%3A%2F%2Fgit.eeqj.de%2Fsneak%2Froutewatch"
|
|
metricsWindowSize = 60 // seconds for rolling average
|
|
metricsUpdateRate = time.Second
|
|
minBackoffDelay = 5 * time.Second
|
|
maxBackoffDelay = 320 * time.Second
|
|
metricsLogInterval = 10 * time.Second
|
|
bytesPerKB = 1024
|
|
bytesPerMB = 1024 * 1024
|
|
maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers
|
|
)
|
|
|
|
// MessageHandler is an interface for handling RIS messages
|
|
type MessageHandler interface {
|
|
// WantsMessage returns true if this handler wants to process messages of the given type
|
|
WantsMessage(messageType string) bool
|
|
|
|
// HandleMessage processes a RIS message
|
|
HandleMessage(msg *ristypes.RISMessage)
|
|
|
|
// QueueCapacity returns the desired queue capacity for this handler
|
|
// Handlers that process quickly can have larger queues
|
|
QueueCapacity() int
|
|
}
|
|
|
|
// RawMessageHandler is a callback for handling raw JSON lines from the stream
|
|
type RawMessageHandler func(line string)
|
|
|
|
// handlerMetrics tracks performance metrics for a handler
|
|
type handlerMetrics struct {
|
|
processedCount uint64 // Total messages processed
|
|
droppedCount uint64 // Total messages dropped
|
|
totalTime time.Duration // Total processing time (for average calculation)
|
|
minTime time.Duration // Minimum processing time
|
|
maxTime time.Duration // Maximum processing time
|
|
mu sync.Mutex // Protects the metrics
|
|
}
|
|
|
|
// handlerInfo wraps a handler with its queue and metrics
|
|
type handlerInfo struct {
|
|
handler MessageHandler
|
|
queue chan *ristypes.RISMessage
|
|
metrics handlerMetrics
|
|
}
|
|
|
|
// Streamer handles streaming BGP updates from RIS Live
|
|
type Streamer struct {
|
|
logger *logger.Logger
|
|
client *http.Client
|
|
handlers []*handlerInfo
|
|
rawHandler RawMessageHandler
|
|
mu sync.RWMutex
|
|
cancel context.CancelFunc
|
|
running bool
|
|
metrics *metrics.Tracker
|
|
totalDropped uint64 // Total dropped messages across all handlers
|
|
}
|
|
|
|
// New creates a new RIS streamer
|
|
func New(logger *logger.Logger, metrics *metrics.Tracker) *Streamer {
|
|
return &Streamer{
|
|
logger: logger,
|
|
client: &http.Client{
|
|
Timeout: 0, // No timeout for streaming
|
|
},
|
|
handlers: make([]*handlerInfo, 0),
|
|
metrics: metrics,
|
|
}
|
|
}
|
|
|
|
// RegisterHandler adds a callback for message processing
|
|
func (s *Streamer) RegisterHandler(handler MessageHandler) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
// Create handler info with its own queue based on capacity
|
|
info := &handlerInfo{
|
|
handler: handler,
|
|
queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()),
|
|
metrics: handlerMetrics{
|
|
minTime: time.Duration(math.MaxInt64), // Initialize to max so first value sets the floor
|
|
},
|
|
}
|
|
|
|
s.handlers = append(s.handlers, info)
|
|
|
|
// If we're already running, start a worker for this handler
|
|
if s.running {
|
|
go s.runHandlerWorker(info)
|
|
}
|
|
}
|
|
|
|
// RegisterRawHandler sets a callback for raw message lines
|
|
func (s *Streamer) RegisterRawHandler(handler RawMessageHandler) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.rawHandler = handler
|
|
}
|
|
|
|
// Start begins streaming in a goroutine
|
|
func (s *Streamer) Start() error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.running {
|
|
return fmt.Errorf("streamer already running")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s.cancel = cancel
|
|
s.running = true
|
|
|
|
// Start workers for each handler
|
|
for _, info := range s.handlers {
|
|
go s.runHandlerWorker(info)
|
|
}
|
|
|
|
go func() {
|
|
s.streamWithReconnect(ctx)
|
|
s.mu.Lock()
|
|
s.running = false
|
|
s.mu.Unlock()
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop halts the streaming
|
|
func (s *Streamer) Stop() {
|
|
s.mu.Lock()
|
|
if s.cancel != nil {
|
|
s.cancel()
|
|
}
|
|
// Close all handler queues to signal workers to stop
|
|
for _, info := range s.handlers {
|
|
close(info.queue)
|
|
}
|
|
s.running = false
|
|
s.mu.Unlock()
|
|
s.metrics.SetConnected(false)
|
|
}
|
|
|
|
// runHandlerWorker processes messages for a specific handler
|
|
func (s *Streamer) runHandlerWorker(info *handlerInfo) {
|
|
for msg := range info.queue {
|
|
start := time.Now()
|
|
info.handler.HandleMessage(msg)
|
|
elapsed := time.Since(start)
|
|
|
|
// Update metrics
|
|
info.metrics.mu.Lock()
|
|
info.metrics.processedCount++
|
|
info.metrics.totalTime += elapsed
|
|
|
|
// Update min time
|
|
if elapsed < info.metrics.minTime {
|
|
info.metrics.minTime = elapsed
|
|
}
|
|
|
|
// Update max time
|
|
if elapsed > info.metrics.maxTime {
|
|
info.metrics.maxTime = elapsed
|
|
}
|
|
info.metrics.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// IsRunning returns whether the streamer is currently active
|
|
func (s *Streamer) IsRunning() bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
return s.running
|
|
}
|
|
|
|
// GetMetrics returns current streaming metrics
|
|
func (s *Streamer) GetMetrics() metrics.StreamMetrics {
|
|
return s.metrics.GetStreamMetrics()
|
|
}
|
|
|
|
// GetMetricsTracker returns the metrics tracker instance
|
|
func (s *Streamer) GetMetricsTracker() *metrics.Tracker {
|
|
return s.metrics
|
|
}
|
|
|
|
// HandlerStats represents metrics for a single handler
|
|
type HandlerStats struct {
|
|
Name string
|
|
QueueLength int
|
|
QueueCapacity int
|
|
ProcessedCount uint64
|
|
DroppedCount uint64
|
|
AvgProcessTime time.Duration
|
|
MinProcessTime time.Duration
|
|
MaxProcessTime time.Duration
|
|
}
|
|
|
|
// GetHandlerStats returns current handler statistics
|
|
func (s *Streamer) GetHandlerStats() []HandlerStats {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
stats := make([]HandlerStats, 0, len(s.handlers))
|
|
|
|
for _, info := range s.handlers {
|
|
info.metrics.mu.Lock()
|
|
|
|
hs := HandlerStats{
|
|
Name: fmt.Sprintf("%T", info.handler),
|
|
QueueLength: len(info.queue),
|
|
QueueCapacity: cap(info.queue),
|
|
ProcessedCount: info.metrics.processedCount,
|
|
DroppedCount: info.metrics.droppedCount,
|
|
MinProcessTime: info.metrics.minTime,
|
|
MaxProcessTime: info.metrics.maxTime,
|
|
}
|
|
|
|
// Calculate average time
|
|
if info.metrics.processedCount > 0 {
|
|
processedCount := info.metrics.processedCount
|
|
const maxInt64 = 1<<63 - 1
|
|
if processedCount > maxInt64 {
|
|
processedCount = maxInt64
|
|
}
|
|
//nolint:gosec // processedCount is explicitly bounded above
|
|
hs.AvgProcessTime = info.metrics.totalTime / time.Duration(processedCount)
|
|
}
|
|
|
|
info.metrics.mu.Unlock()
|
|
|
|
stats = append(stats, hs)
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// GetDroppedMessages returns the total number of dropped messages
|
|
func (s *Streamer) GetDroppedMessages() uint64 {
|
|
return atomic.LoadUint64(&s.totalDropped)
|
|
}
|
|
|
|
// logMetrics logs the current streaming statistics
|
|
func (s *Streamer) logMetrics() {
|
|
metrics := s.metrics.GetStreamMetrics()
|
|
uptime := time.Since(metrics.ConnectedSince)
|
|
|
|
const bitsPerMegabit = 1000000
|
|
totalDropped := atomic.LoadUint64(&s.totalDropped)
|
|
|
|
s.logger.Info(
|
|
"Stream statistics",
|
|
"uptime",
|
|
uptime,
|
|
"total_messages",
|
|
metrics.TotalMessages,
|
|
"total_bytes",
|
|
metrics.TotalBytes,
|
|
"total_mb",
|
|
fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
|
|
"messages_per_sec",
|
|
fmt.Sprintf("%.2f", metrics.MessagesPerSec),
|
|
"bits_per_sec",
|
|
fmt.Sprintf("%.0f", metrics.BitsPerSec),
|
|
"mbps",
|
|
fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
|
|
"total_dropped",
|
|
totalDropped,
|
|
)
|
|
|
|
// Log per-handler statistics
|
|
s.mu.RLock()
|
|
for i, info := range s.handlers {
|
|
info.metrics.mu.Lock()
|
|
if info.metrics.processedCount > 0 {
|
|
// Safe conversion: processedCount is bounded by maxInt64
|
|
processedCount := info.metrics.processedCount
|
|
const maxInt64 = 1<<63 - 1
|
|
if processedCount > maxInt64 {
|
|
processedCount = maxInt64
|
|
}
|
|
//nolint:gosec // processedCount is explicitly bounded above
|
|
avgTime := info.metrics.totalTime / time.Duration(processedCount)
|
|
s.logger.Info(
|
|
"Handler statistics",
|
|
"handler", fmt.Sprintf("%T", info.handler),
|
|
"index", i,
|
|
"queue_len", len(info.queue),
|
|
"queue_cap", cap(info.queue),
|
|
"processed", info.metrics.processedCount,
|
|
"dropped", info.metrics.droppedCount,
|
|
"avg_time", avgTime,
|
|
"min_time", info.metrics.minTime,
|
|
"max_time", info.metrics.maxTime,
|
|
)
|
|
}
|
|
info.metrics.mu.Unlock()
|
|
}
|
|
s.mu.RUnlock()
|
|
}
|
|
|
|
// updateMetrics updates the metrics counters and rates
|
|
func (s *Streamer) updateMetrics(messageBytes int) {
|
|
s.metrics.RecordMessage(int64(messageBytes))
|
|
}
|
|
|
|
// streamWithReconnect handles streaming with automatic reconnection and exponential backoff
|
|
func (s *Streamer) streamWithReconnect(ctx context.Context) {
|
|
backoffDelay := minBackoffDelay
|
|
consecutiveFailures := 0
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
s.logger.Info("Stream context cancelled, stopping reconnection attempts")
|
|
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Attempt to stream
|
|
startTime := time.Now()
|
|
err := s.stream(ctx)
|
|
streamDuration := time.Since(startTime)
|
|
|
|
if err == nil {
|
|
// Clean exit (context cancelled)
|
|
return
|
|
}
|
|
|
|
// Log the error
|
|
s.logger.Error("Stream disconnected",
|
|
"error", err,
|
|
"consecutive_failures", consecutiveFailures+1,
|
|
"stream_duration", streamDuration)
|
|
s.metrics.SetConnected(false)
|
|
|
|
// Check if context is cancelled
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
// If we streamed for more than 30 seconds, reset the backoff
|
|
// This indicates we had a successful connection that received data
|
|
if streamDuration > 30*time.Second {
|
|
s.logger.Info("Resetting backoff delay due to successful connection",
|
|
"stream_duration", streamDuration)
|
|
backoffDelay = minBackoffDelay
|
|
consecutiveFailures = 0
|
|
} else {
|
|
// Increment consecutive failures
|
|
consecutiveFailures++
|
|
}
|
|
|
|
// Wait with exponential backoff
|
|
s.logger.Info("Waiting before reconnection attempt",
|
|
"delay_seconds", backoffDelay.Seconds(),
|
|
"consecutive_failures", consecutiveFailures)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(backoffDelay):
|
|
// Double the backoff delay for next time, up to max
|
|
backoffDelay *= 2
|
|
if backoffDelay > maxBackoffDelay {
|
|
backoffDelay = maxBackoffDelay
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Streamer) stream(ctx context.Context) error {
|
|
req, err := http.NewRequestWithContext(ctx, "GET", risLiveURL, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
resp, err := s.client.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to RIS Live: %w", err)
|
|
}
|
|
defer func() {
|
|
if err := resp.Body.Close(); err != nil {
|
|
s.logger.Error("Failed to close response body", "error", err)
|
|
}
|
|
}()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
|
}
|
|
|
|
s.logger.Info("Connected to RIS Live stream")
|
|
s.metrics.SetConnected(true)
|
|
|
|
// Start metrics logging goroutine
|
|
metricsTicker := time.NewTicker(metricsLogInterval)
|
|
defer metricsTicker.Stop()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-metricsTicker.C:
|
|
s.logMetrics()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
scanner := bufio.NewScanner(resp.Body)
|
|
|
|
for scanner.Scan() {
|
|
select {
|
|
case <-ctx.Done():
|
|
s.logger.Info("Stream stopped by context")
|
|
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
line := scanner.Bytes()
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Update metrics with message size
|
|
s.updateMetrics(len(line))
|
|
|
|
// Call raw handler if registered
|
|
s.mu.RLock()
|
|
rawHandler := s.rawHandler
|
|
s.mu.RUnlock()
|
|
|
|
if rawHandler != nil {
|
|
// Call raw handler synchronously to preserve order
|
|
rawHandler(string(line))
|
|
}
|
|
|
|
// Parse the message first
|
|
var wrapper ristypes.RISLiveMessage
|
|
if err := json.Unmarshal(line, &wrapper); err != nil {
|
|
// Log the error and return to trigger reconnection
|
|
s.logger.Error("Failed to parse JSON",
|
|
"error", err,
|
|
"line", string(line),
|
|
"line_length", len(line))
|
|
|
|
return fmt.Errorf("JSON parse error: %w", err)
|
|
}
|
|
|
|
// Check if it's a ris_message wrapper
|
|
if wrapper.Type != "ris_message" {
|
|
s.logger.Error("Unexpected wrapper type",
|
|
"type", wrapper.Type,
|
|
"line", string(line),
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
// Get the actual message
|
|
msg := wrapper.Data
|
|
|
|
// Parse the timestamp
|
|
msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC()
|
|
|
|
// Process based on message type
|
|
switch msg.Type {
|
|
case "UPDATE":
|
|
// Process BGP UPDATE messages
|
|
// Will be dispatched to handlers
|
|
case "RIS_PEER_STATE":
|
|
// RIS peer state messages - silently ignore
|
|
continue
|
|
case "KEEPALIVE":
|
|
// BGP keepalive messages - silently process
|
|
continue
|
|
case "OPEN":
|
|
// BGP open messages
|
|
s.logger.Info("BGP session opened",
|
|
"peer", msg.Peer,
|
|
"peer_asn", msg.PeerASN,
|
|
)
|
|
|
|
continue
|
|
case "NOTIFICATION":
|
|
// BGP notification messages (errors)
|
|
s.logger.Warn("BGP notification",
|
|
"peer", msg.Peer,
|
|
"peer_asn", msg.PeerASN,
|
|
)
|
|
|
|
continue
|
|
case "STATE":
|
|
// Peer state changes - silently ignore
|
|
continue
|
|
default:
|
|
s.logger.Error("Unknown message type",
|
|
"type", msg.Type,
|
|
"line", string(line),
|
|
)
|
|
panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type))
|
|
}
|
|
|
|
// Dispatch to interested handlers
|
|
s.mu.RLock()
|
|
for _, info := range s.handlers {
|
|
if info.handler.WantsMessage(msg.Type) {
|
|
select {
|
|
case info.queue <- &msg:
|
|
// Message queued successfully
|
|
default:
|
|
// Queue is full, drop the message
|
|
atomic.AddUint64(&info.metrics.droppedCount, 1)
|
|
atomic.AddUint64(&s.totalDropped, 1)
|
|
}
|
|
}
|
|
}
|
|
s.mu.RUnlock()
|
|
}
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
return fmt.Errorf("scanner error: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|