Initial commit: RouteWatch BGP stream monitor
- Connects to RIPE RIS Live stream to receive real-time BGP updates - Stores BGP data in SQLite database: - ASNs with first/last seen timestamps - Prefixes with IPv4/IPv6 classification - BGP announcements and withdrawals - AS-to-AS peering relationships from AS paths - Live routing table tracking active routes - HTTP server with statistics endpoints - Metrics tracking with go-metrics - Custom JSON unmarshaling to handle nested AS sets in paths - Dependency injection with uber/fx - Pure Go implementation (no CGO) - Includes streamdumper utility for debugging raw messages
This commit is contained in:
310
internal/streamer/streamer.go
Normal file
310
internal/streamer/streamer.go
Normal file
@@ -0,0 +1,310 @@
|
||||
// Package streamer implements an HTTP client that connects to the RIPE RIS Live streaming API,
|
||||
// parses BGP UPDATE messages from the JSON stream, and dispatches them to registered handlers.
|
||||
package streamer
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/metrics"
|
||||
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||
)
|
||||
|
||||
const (
|
||||
risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json"
|
||||
metricsWindowSize = 60 // seconds for rolling average
|
||||
metricsUpdateRate = time.Second
|
||||
metricsLogInterval = 10 * time.Second
|
||||
bytesPerKB = 1024
|
||||
bytesPerMB = 1024 * 1024
|
||||
)
|
||||
|
||||
// MessageHandler is an interface for handling RIS messages
|
||||
type MessageHandler interface {
|
||||
// WantsMessage returns true if this handler wants to process messages of the given type
|
||||
WantsMessage(messageType string) bool
|
||||
|
||||
// HandleMessage processes a RIS message
|
||||
HandleMessage(msg *ristypes.RISMessage)
|
||||
}
|
||||
|
||||
// RawMessageHandler is a callback for handling raw JSON lines from the stream
|
||||
type RawMessageHandler func(line string)
|
||||
|
||||
// Streamer handles streaming BGP updates from RIS Live
|
||||
type Streamer struct {
|
||||
logger *slog.Logger
|
||||
client *http.Client
|
||||
handlers []MessageHandler
|
||||
rawHandler RawMessageHandler
|
||||
mu sync.RWMutex
|
||||
cancel context.CancelFunc
|
||||
running bool
|
||||
metrics *metrics.Tracker
|
||||
}
|
||||
|
||||
// New creates a new RIS streamer
|
||||
func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer {
|
||||
return &Streamer{
|
||||
logger: logger,
|
||||
client: &http.Client{
|
||||
Timeout: 0, // No timeout for streaming
|
||||
},
|
||||
handlers: make([]MessageHandler, 0),
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterHandler adds a callback for message processing
|
||||
func (s *Streamer) RegisterHandler(handler MessageHandler) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.handlers = append(s.handlers, handler)
|
||||
}
|
||||
|
||||
// RegisterRawHandler sets a callback for raw message lines
|
||||
func (s *Streamer) RegisterRawHandler(handler RawMessageHandler) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.rawHandler = handler
|
||||
}
|
||||
|
||||
// Start begins streaming in a goroutine
|
||||
func (s *Streamer) Start() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.running {
|
||||
return fmt.Errorf("streamer already running")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.cancel = cancel
|
||||
s.running = true
|
||||
|
||||
go func() {
|
||||
if err := s.stream(ctx); err != nil {
|
||||
s.logger.Error("Streaming error", "error", err)
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.running = false
|
||||
s.mu.Unlock()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop halts the streaming
|
||||
func (s *Streamer) Stop() {
|
||||
s.mu.Lock()
|
||||
if s.cancel != nil {
|
||||
s.cancel()
|
||||
}
|
||||
s.mu.Unlock()
|
||||
s.metrics.SetConnected(false)
|
||||
}
|
||||
|
||||
// IsRunning returns whether the streamer is currently active
|
||||
func (s *Streamer) IsRunning() bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return s.running
|
||||
}
|
||||
|
||||
// GetMetrics returns current streaming metrics
|
||||
func (s *Streamer) GetMetrics() metrics.StreamMetrics {
|
||||
return s.metrics.GetStreamMetrics()
|
||||
}
|
||||
|
||||
// logMetrics logs the current streaming statistics
|
||||
func (s *Streamer) logMetrics() {
|
||||
metrics := s.metrics.GetStreamMetrics()
|
||||
uptime := time.Since(metrics.ConnectedSince)
|
||||
|
||||
const bitsPerMegabit = 1000000
|
||||
s.logger.Info("Stream statistics",
|
||||
"uptime", uptime,
|
||||
"total_messages", metrics.TotalMessages,
|
||||
"total_bytes", metrics.TotalBytes,
|
||||
"total_mb", fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
|
||||
"messages_per_sec", fmt.Sprintf("%.2f", metrics.MessagesPerSec),
|
||||
"bits_per_sec", fmt.Sprintf("%.0f", metrics.BitsPerSec),
|
||||
"mbps", fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
|
||||
)
|
||||
}
|
||||
|
||||
// updateMetrics updates the metrics counters and rates
|
||||
func (s *Streamer) updateMetrics(messageBytes int) {
|
||||
s.metrics.RecordMessage(int64(messageBytes))
|
||||
}
|
||||
|
||||
func (s *Streamer) stream(ctx context.Context) error {
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", risLiveURL, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := s.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to RIS Live: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
s.logger.Error("Failed to close response body", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
s.logger.Info("Connected to RIS Live stream")
|
||||
s.metrics.SetConnected(true)
|
||||
|
||||
// Start metrics logging goroutine
|
||||
metricsTicker := time.NewTicker(metricsLogInterval)
|
||||
defer metricsTicker.Stop()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-metricsTicker.C:
|
||||
s.logMetrics()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
|
||||
for scanner.Scan() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
s.logger.Info("Stream stopped by context")
|
||||
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
line := scanner.Bytes()
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Update metrics with message size
|
||||
s.updateMetrics(len(line))
|
||||
|
||||
// Call raw handler if registered
|
||||
s.mu.RLock()
|
||||
rawHandler := s.rawHandler
|
||||
s.mu.RUnlock()
|
||||
|
||||
if rawHandler != nil {
|
||||
// Call raw handler synchronously to preserve order
|
||||
rawHandler(string(line))
|
||||
}
|
||||
|
||||
// Get current handlers
|
||||
s.mu.RLock()
|
||||
handlers := make([]MessageHandler, len(s.handlers))
|
||||
copy(handlers, s.handlers)
|
||||
s.mu.RUnlock()
|
||||
|
||||
// Spawn goroutine to parse and process the message
|
||||
go func(rawLine []byte, messageHandlers []MessageHandler) {
|
||||
|
||||
// Parse the outer wrapper first
|
||||
var wrapper ristypes.RISLiveMessage
|
||||
if err := json.Unmarshal(rawLine, &wrapper); err != nil {
|
||||
// Output the raw line and panic on parse failure
|
||||
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err)
|
||||
fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(rawLine))
|
||||
panic(fmt.Sprintf("JSON parse error: %v", err))
|
||||
}
|
||||
|
||||
// Check if it's a ris_message wrapper
|
||||
if wrapper.Type != "ris_message" {
|
||||
s.logger.Error("Unexpected wrapper type",
|
||||
"type", wrapper.Type,
|
||||
"line", string(rawLine),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Get the actual message
|
||||
msg := wrapper.Data
|
||||
|
||||
// Parse the timestamp
|
||||
msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC()
|
||||
|
||||
// Process based on message type
|
||||
switch msg.Type {
|
||||
case "UPDATE":
|
||||
// Process BGP UPDATE messages
|
||||
// Will be handled by registered handlers
|
||||
case "RIS_PEER_STATE":
|
||||
s.logger.Info("RIS peer state change",
|
||||
"peer", msg.Peer,
|
||||
"peer_asn", msg.PeerASN,
|
||||
)
|
||||
case "KEEPALIVE":
|
||||
// BGP keepalive messages - just log at debug level
|
||||
s.logger.Debug("BGP keepalive",
|
||||
"peer", msg.Peer,
|
||||
"peer_asn", msg.PeerASN,
|
||||
)
|
||||
case "OPEN":
|
||||
// BGP open messages
|
||||
s.logger.Info("BGP session opened",
|
||||
"peer", msg.Peer,
|
||||
"peer_asn", msg.PeerASN,
|
||||
)
|
||||
case "NOTIFICATION":
|
||||
// BGP notification messages (errors)
|
||||
s.logger.Warn("BGP notification",
|
||||
"peer", msg.Peer,
|
||||
"peer_asn", msg.PeerASN,
|
||||
)
|
||||
case "STATE":
|
||||
// Peer state changes
|
||||
s.logger.Info("Peer state change",
|
||||
"peer", msg.Peer,
|
||||
"peer_asn", msg.PeerASN,
|
||||
)
|
||||
default:
|
||||
fmt.Fprintf(
|
||||
os.Stderr,
|
||||
"UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n",
|
||||
msg.Type,
|
||||
string(rawLine),
|
||||
)
|
||||
panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type))
|
||||
}
|
||||
|
||||
// Spawn goroutine for each handler callback that wants this message type
|
||||
for _, handler := range messageHandlers {
|
||||
if handler.WantsMessage(msg.Type) {
|
||||
go func(h MessageHandler) {
|
||||
h.HandleMessage(&msg)
|
||||
}(handler)
|
||||
}
|
||||
}
|
||||
}(append([]byte(nil), line...), handlers) // Copy the line to avoid data races
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
return fmt.Errorf("scanner error: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
34
internal/streamer/streamer_test.go
Normal file
34
internal/streamer/streamer_test.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package streamer
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/metrics"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
func TestNewStreamer(t *testing.T) {
|
||||
logger := slog.Default()
|
||||
metricsTracker := metrics.New()
|
||||
s := New(logger, metricsTracker)
|
||||
|
||||
if s == nil {
|
||||
t.Fatal("New() returned nil")
|
||||
}
|
||||
|
||||
if s.logger != logger {
|
||||
t.Error("logger not set correctly")
|
||||
}
|
||||
|
||||
if s.client == nil {
|
||||
t.Error("HTTP client not initialized")
|
||||
}
|
||||
|
||||
if s.handlers == nil {
|
||||
t.Error("handlers slice not initialized")
|
||||
}
|
||||
|
||||
if s.metrics != metricsTracker {
|
||||
t.Error("metrics tracker not set correctly")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user