diff --git a/README.md b/README.md new file mode 100644 index 0000000..dfe81f8 --- /dev/null +++ b/README.md @@ -0,0 +1,193 @@ +# RouteWatch + +RouteWatch is a real-time BGP routing table monitor that streams BGP UPDATE messages from the RIPE RIS Live service, maintains a live routing table in SQLite, and provides HTTP APIs for querying routing information. + +## Features + +- Real-time streaming of BGP updates from RIPE RIS Live +- Maintains live IPv4 and IPv6 routing tables +- Tracks AS peering relationships +- HTTP API for IP-to-AS lookups, prefix details, and AS information +- Automatic reconnection with exponential backoff +- Batched database writes for high performance +- Backpressure handling to prevent memory exhaustion + +## Installation + +```bash +go build -o routewatch ./cmd/routewatch +``` + +## Usage + +```bash +# Run the daemon (listens on port 8080 by default) +./routewatch + +# Set custom port +PORT=3000 ./routewatch + +# Enable debug logging +DEBUG=routewatch ./routewatch +``` + +## HTTP Endpoints + +### Web Interface +- `GET /` - Redirects to /status +- `GET /status` - HTML status dashboard +- `GET /status.json` - JSON statistics +- `GET /as/{asn}` - AS detail page (HTML) +- `GET /prefix/{prefix}` - Prefix detail page (HTML) +- `GET /prefixlength/{length}` - IPv4 prefixes by mask length +- `GET /prefixlength6/{length}` - IPv6 prefixes by mask length +- `GET /ip/{ip}` - Redirects to prefix containing the IP + +### API v1 +- `GET /api/v1/stats` - Detailed statistics with handler metrics +- `GET /api/v1/ip/{ip}` - Look up AS information for an IP address +- `GET /api/v1/as/{asn}` - Get prefixes announced by an AS +- `GET /api/v1/prefix/{prefix}` - Get routes for a specific prefix + +## Code Structure + +``` +routewatch/ +├── cmd/ +│ ├── routewatch/ # Main daemon entry point +│ ├── asinfo-gen/ # Utility to generate AS info data +│ └── streamdumper/ # Debug utility for raw stream output +├── internal/ +│ ├── routewatch/ # Core application logic +│ ├── server/ # HTTP server and handlers +│ ├── database/ # SQLite storage layer +│ ├── streamer/ # RIPE RIS Live client +│ ├── ristypes/ # BGP message data structures +│ ├── logger/ # Structured logging wrapper +│ ├── metrics/ # Performance metrics tracking +│ ├── config/ # Configuration management +│ └── templates/ # HTML templates +└── pkg/ + └── asinfo/ # AS information lookup (public API) +``` + +## Architecture Overview + +### Component Relationships + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ RouteWatch │ +│ (internal/routewatch/app.go - main orchestrator) │ +├─────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ +│ │ Streamer │───▶│ Handlers │───▶│ Database │ │ +│ │ │ │ │ │ │ │ +│ │ RIS Live │ │ - ASHandler │ │ SQLite with │ │ +│ │ WebSocket │ │ - PeerHandler│ │ WAL mode │ │ +│ │ client │ │ - PrefixHdlr │ │ │ │ +│ │ │ │ - PeeringHdlr│ │ Tables: │ │ +│ └──────────────┘ └──────────────┘ │ - asns │ │ +│ │ - prefixes │ │ +│ ┌──────────────┐ ┌──────────────┐ │ - live_routes│ │ +│ │ Server │───▶│ Handlers │───▶│ - peerings │ │ +│ │ │ │ │ │ - bgp_peers │ │ +│ │ Chi router │ │ Status, API │ └──────────────┘ │ +│ │ port 8080 │ │ AS, Prefix │ │ +│ └──────────────┘ └──────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### Execution Flow + +1. **Startup** (`cmd/routewatch/main.go` → `internal/routewatch/cli.go`) + - Uber fx dependency injection initializes all components + - Signal handlers registered for graceful shutdown + +2. **Initialization** (`internal/routewatch/app.go`) + - Database created with SQLite schema (WAL mode, 3GB cache) + - Message handlers registered with the streamer + - HTTP server started on configured port + +3. **Message Processing Pipeline** + ``` + RIS Live Stream → JSON Parser → Message Dispatcher → Handler Queues → Batch Writers → SQLite + ``` + - Streamer connects to `ris-live.ripe.net` via HTTP + - Parses BGP UPDATE messages from JSON stream + - Dispatches to registered handlers based on message type + - Each handler has its own queue with backpressure handling + - Handlers batch writes for efficiency (25K-30K ops, 1-2s timeout) + +4. **Handler Details** + - **ASHandler**: Tracks all ASNs seen in AS paths + - **PeerHandler**: Records BGP peer information + - **PrefixHandler**: Maintains live routing table (upserts on announcement, deletes on withdrawal) + - **PeeringHandler**: Extracts AS peering relationships from AS paths + +5. **HTTP Request Flow** + ``` + Request → Chi Router → Middleware (timeout, logging) → Handler → Database Query → Response + ``` + +### Key Design Patterns + +- **Batched Writes**: All database operations are batched for performance +- **Backpressure**: Probabilistic message dropping when queues exceed 50% capacity +- **Graceful Shutdown**: 60-second timeout, flushes all pending batches +- **Reconnection**: Exponential backoff (5s-320s) with reset after 30s of stable connection +- **IPv4 Optimization**: IP ranges stored as uint32 for O(1) lookups + +### Database Schema + +```sql +-- Core tables +asns(id, number, handle, description, first_seen, last_seen) +prefixes_v4(id, prefix, mask_length, first_seen, last_seen) +prefixes_v6(id, prefix, mask_length, first_seen, last_seen) + +-- Live routing tables (one per IP version) +live_routes_v4(id, prefix, mask_length, origin_asn, peer_ip, as_path, + next_hop, last_updated, v4_ip_start, v4_ip_end) +live_routes_v6(id, prefix, mask_length, origin_asn, peer_ip, as_path, + next_hop, last_updated) + +-- Relationship tracking +peerings(id, as_a, as_b, first_seen, last_seen) +bgp_peers(id, peer_ip, peer_asn, last_message_type, last_seen) +``` + +## Configuration + +Configuration is handled via environment variables and OS-specific paths: + +| Variable | Default | Description | +|----------|---------|-------------| +| `PORT` | `8080` | HTTP server port | +| `DEBUG` | (empty) | Set to `routewatch` for debug logging | + +State directory (database location): +- macOS: `~/Library/Application Support/routewatch/` +- Linux: `/var/lib/routewatch/` or `~/.local/share/routewatch/` + +## Development + +```bash +# Run tests +make test + +# Format code +make fmt + +# Run linter +make lint + +# Build +make +``` + +## License + +See LICENSE file. diff --git a/internal/database/database.go b/internal/database/database.go index bac1891..0b3a627 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -1435,7 +1435,7 @@ func (d *Database) GetASDetailsContext(ctx context.Context, asn int) (*ASN, []Li return &asnInfo, allPrefixes, nil } -// ASPeer represents a peering relationship with another AS +// ASPeer represents a peering relationship with another AS including handle, description, and timestamps. type ASPeer struct { ASN int `json:"asn"` Handle string `json:"handle"` diff --git a/internal/logger/logger.go b/internal/logger/logger.go index b81228c..3e90ad8 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -1,4 +1,8 @@ -// Package logger provides a structured logger with source location tracking +// Package logger provides a structured logger with source location tracking. +// It wraps the standard library's log/slog package and automatically enriches +// log messages with the file name, line number, and function name of the caller. +// The output format is automatically selected based on the runtime environment: +// human-readable text for terminals, JSON for non-terminal output. package logger import ( @@ -12,17 +16,25 @@ import ( "golang.org/x/term" ) -// Logger wraps slog.Logger to add source location information +// Logger wraps slog.Logger to add automatic source location information +// to all log messages. It embeds slog.Logger and provides the same logging +// methods (Debug, Info, Warn, Error) but enriches each message with the +// file name, line number, and function name of the caller. type Logger struct { *slog.Logger } -// AsSlog returns the underlying slog.Logger +// AsSlog returns the underlying slog.Logger for use with APIs that require +// a standard slog.Logger instance rather than the custom Logger type. func (l *Logger) AsSlog() *slog.Logger { return l.Logger } -// New creates a new logger with appropriate handler based on environment +// New creates a new Logger with an appropriate handler based on the runtime +// environment. If stdout is a terminal, it uses a human-readable text format; +// otherwise, it outputs JSON for structured log aggregation. The log level +// defaults to Info, but can be set to Debug by including "routewatch" in the +// DEBUG environment variable. func New() *Logger { level := slog.LevelInfo if debug := os.Getenv("DEBUG"); strings.Contains(debug, "routewatch") { @@ -45,7 +57,10 @@ func New() *Logger { return &Logger{Logger: slog.New(handler)} } -const sourceSkipLevel = 2 // Skip levels for source location tracking +// sourceSkipLevel defines the number of call stack frames to skip when +// determining the caller's source location. This accounts for the logger +// method itself and the getSourceAttrs helper function. +const sourceSkipLevel = 2 // getSourceAttrs returns attributes for the calling source location func getSourceAttrs() []slog.Attr { @@ -75,7 +90,10 @@ func getSourceAttrs() []slog.Attr { return attrs } -// Debug logs at debug level with source location +// Debug logs a message at debug level with automatic source location tracking. +// Additional structured attributes can be passed as key-value pairs in args. +// Debug messages are only output when the DEBUG environment variable contains +// "routewatch". func (l *Logger) Debug(msg string, args ...any) { sourceAttrs := getSourceAttrs() allArgs := make([]any, 0, len(args)+len(sourceAttrs)*2) @@ -91,7 +109,8 @@ func (l *Logger) Debug(msg string, args ...any) { l.Logger.Debug(msg, allArgs...) } -// Info logs at info level with source location +// Info logs a message at info level with automatic source location tracking. +// Additional structured attributes can be passed as key-value pairs in args. func (l *Logger) Info(msg string, args ...any) { sourceAttrs := getSourceAttrs() allArgs := make([]any, 0, len(args)+len(sourceAttrs)*2) @@ -107,7 +126,8 @@ func (l *Logger) Info(msg string, args ...any) { l.Logger.Info(msg, allArgs...) } -// Warn logs at warn level with source location +// Warn logs a message at warn level with automatic source location tracking. +// Additional structured attributes can be passed as key-value pairs in args. func (l *Logger) Warn(msg string, args ...any) { sourceAttrs := getSourceAttrs() allArgs := make([]any, 0, len(args)+len(sourceAttrs)*2) @@ -123,7 +143,8 @@ func (l *Logger) Warn(msg string, args ...any) { l.Logger.Warn(msg, allArgs...) } -// Error logs at error level with source location +// Error logs a message at error level with automatic source location tracking. +// Additional structured attributes can be passed as key-value pairs in args. func (l *Logger) Error(msg string, args ...any) { sourceAttrs := getSourceAttrs() allArgs := make([]any, 0, len(args)+len(sourceAttrs)*2) @@ -139,12 +160,16 @@ func (l *Logger) Error(msg string, args ...any) { l.Logger.Error(msg, allArgs...) } -// With returns a new logger with additional attributes +// With returns a new Logger with additional structured attributes that will +// be included in all subsequent log messages. The args parameter accepts +// key-value pairs in the same format as the logging methods. func (l *Logger) With(args ...any) *Logger { return &Logger{Logger: l.Logger.With(args...)} } -// WithGroup returns a new logger with a group prefix +// WithGroup returns a new Logger that adds the specified group name as a +// prefix to all attribute keys in subsequent log messages. This is useful +// for organizing related attributes under a common namespace. func (l *Logger) WithGroup(name string) *Logger { return &Logger{Logger: l.Logger.WithGroup(name)} } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 64398d7..f3d5c87 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -115,16 +115,24 @@ func (t *Tracker) GetRouteMetrics() RouteMetrics { // StreamMetrics contains streaming statistics type StreamMetrics struct { - TotalMessages uint64 - TotalBytes uint64 + // TotalMessages is the total number of messages received since startup + TotalMessages uint64 + // TotalBytes is the total number of bytes received since startup + TotalBytes uint64 + // ConnectedSince is the time when the current connection was established ConnectedSince time.Time - Connected bool + // Connected indicates whether the stream is currently connected + Connected bool + // MessagesPerSec is the rate of messages received per second (1-minute average) MessagesPerSec float64 - BitsPerSec float64 + // BitsPerSec is the rate of bits received per second (1-minute average) + BitsPerSec float64 } // RouteMetrics contains route update statistics type RouteMetrics struct { + // IPv4UpdatesPerSec is the rate of IPv4 route updates per second (1-minute average) IPv4UpdatesPerSec float64 + // IPv6UpdatesPerSec is the rate of IPv6 route updates per second (1-minute average) IPv6UpdatesPerSec float64 } diff --git a/internal/ristypes/ris.go b/internal/ristypes/ris.go index 69c6b2c..ba36b91 100644 --- a/internal/ristypes/ris.go +++ b/internal/ristypes/ris.go @@ -58,7 +58,10 @@ type RISLiveMessage struct { Data RISMessage `json:"data"` } -// RISMessage represents a message from the RIS Live stream +// RISMessage represents a BGP update message from the RIPE RIS Live stream. +// It contains metadata about the BGP session (peer, ASN, host) along with +// the actual BGP update data including AS path, communities, announcements, +// and withdrawals. type RISMessage struct { Type string `json:"type"` Timestamp float64 `json:"timestamp"` @@ -80,7 +83,9 @@ type RISMessage struct { Raw string `json:"raw,omitempty"` } -// RISAnnouncement represents announcement data within a RIS message +// RISAnnouncement represents a BGP route announcement within a RIS message. +// It contains the next hop IP address and the list of prefixes being announced +// via that next hop. type RISAnnouncement struct { NextHop string `json:"next_hop"` Prefixes []string `json:"prefixes"` diff --git a/internal/routewatch/peerhandler.go b/internal/routewatch/peerhandler.go index 79980d1..7ffabab 100644 --- a/internal/routewatch/peerhandler.go +++ b/internal/routewatch/peerhandler.go @@ -74,13 +74,18 @@ func (h *PeerHandler) WantsMessage(_ string) bool { return true } -// QueueCapacity returns the desired queue capacity for this handler +// QueueCapacity returns the desired queue capacity for this handler. +// The PeerHandler uses a large queue capacity because batching allows +// for efficient processing of many updates at once. func (h *PeerHandler) QueueCapacity() int { // Batching allows us to use a larger queue return peerHandlerQueueSize } -// HandleMessage processes a message to track peer information +// HandleMessage processes a RIS message to track peer information. +// It extracts the peer IP address and ASN from the message and adds +// the update to an internal batch. When the batch reaches peerBatchSize +// or the batch timeout expires, the batch is flushed to the database. func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) { // Parse peer ASN from string peerASN := 0 diff --git a/internal/streamer/streamer.go b/internal/streamer/streamer.go index c4a5b92..48c0fe2 100644 --- a/internal/streamer/streamer.go +++ b/internal/streamer/streamer.go @@ -56,7 +56,8 @@ type MessageHandler interface { QueueCapacity() int } -// RawMessageHandler is a callback for handling raw JSON lines from the stream +// RawMessageHandler is a function type for processing raw JSON lines from the stream. +// It receives the unmodified JSON line as a string before any parsing occurs. type RawMessageHandler func(line string) // handlerMetrics tracks performance metrics for a handler @@ -77,7 +78,10 @@ type handlerInfo struct { metrics handlerMetrics } -// Streamer handles streaming BGP updates from RIS Live +// Streamer manages a connection to the RIPE RIS Live streaming API for receiving +// real-time BGP UPDATE messages. It handles automatic reconnection with exponential +// backoff, dispatches messages to registered handlers via per-handler queues, and +// implements backpressure to prevent queue overflow during high traffic periods. type Streamer struct { logger *logger.Logger client *http.Client @@ -91,7 +95,9 @@ type Streamer struct { random *rand.Rand // Random number generator for backpressure drops } -// New creates a new RIS streamer +// New creates a new Streamer instance configured to connect to the RIS Live API. +// The logger is used for structured logging of connection events and errors. +// The metrics tracker is used to record message counts, bytes received, and connection status. func New(logger *logger.Logger, metrics *metrics.Tracker) *Streamer { return &Streamer{ logger: logger, @@ -105,7 +111,9 @@ func New(logger *logger.Logger, metrics *metrics.Tracker) *Streamer { } } -// RegisterHandler adds a callback for message processing +// RegisterHandler adds a MessageHandler to receive parsed RIS messages. +// Each handler gets its own dedicated queue and worker goroutine for processing. +// If the streamer is already running, the handler's worker is started immediately. func (s *Streamer) RegisterHandler(handler MessageHandler) { s.mu.Lock() defer s.mu.Unlock() @@ -127,14 +135,19 @@ func (s *Streamer) RegisterHandler(handler MessageHandler) { } } -// RegisterRawHandler sets a callback for raw message lines +// RegisterRawHandler sets a callback to receive raw JSON lines from the stream +// before they are parsed. Only one raw handler can be registered at a time; +// subsequent calls will replace the previous handler. func (s *Streamer) RegisterRawHandler(handler RawMessageHandler) { s.mu.Lock() defer s.mu.Unlock() s.rawHandler = handler } -// Start begins streaming in a goroutine +// Start begins streaming BGP updates from the RIS Live API in a background goroutine. +// It starts worker goroutines for each registered handler and manages automatic +// reconnection with exponential backoff on connection failures. +// Returns an error if the streamer is already running. func (s *Streamer) Start() error { s.mu.Lock() defer s.mu.Unlock() @@ -162,7 +175,9 @@ func (s *Streamer) Start() error { return nil } -// Stop halts the streaming +// Stop halts the streaming connection and shuts down all handler workers. +// It cancels the streaming context, closes all handler queues, and updates +// the connection status in metrics. This method is safe to call multiple times. func (s *Streamer) Stop() { s.mu.Lock() if s.cancel != nil { @@ -202,7 +217,8 @@ func (s *Streamer) runHandlerWorker(info *handlerInfo) { } } -// IsRunning returns whether the streamer is currently active +// IsRunning reports whether the streamer is currently connected and processing messages. +// This is safe to call concurrently from multiple goroutines. func (s *Streamer) IsRunning() bool { s.mu.RLock() defer s.mu.RUnlock() @@ -210,17 +226,21 @@ func (s *Streamer) IsRunning() bool { return s.running } -// GetMetrics returns current streaming metrics +// GetMetrics returns the current streaming metrics including message counts, +// bytes received, and throughput rates. The returned struct is a snapshot +// of the current state and is safe to use without synchronization. func (s *Streamer) GetMetrics() metrics.StreamMetrics { return s.metrics.GetStreamMetrics() } -// GetMetricsTracker returns the metrics tracker instance +// GetMetricsTracker returns the underlying metrics.Tracker instance for direct access +// to metrics recording and retrieval functionality. func (s *Streamer) GetMetricsTracker() *metrics.Tracker { return s.metrics } -// HandlerStats represents metrics for a single handler +// HandlerStats contains performance metrics for a single message handler. +// It includes queue utilization, message counts, and processing time statistics. type HandlerStats struct { Name string QueueLength int @@ -233,7 +253,9 @@ type HandlerStats struct { MaxProcessTime time.Duration } -// GetHandlerStats returns current handler statistics +// GetHandlerStats returns a snapshot of performance statistics for all registered +// handlers. The returned slice contains one HandlerStats entry per handler with +// current queue depth, processed/dropped counts, and processing time statistics. func (s *Streamer) GetHandlerStats() []HandlerStats { s.mu.RLock() defer s.mu.RUnlock() @@ -273,7 +295,9 @@ func (s *Streamer) GetHandlerStats() []HandlerStats { return stats } -// GetDroppedMessages returns the total number of dropped messages +// GetDroppedMessages returns the total number of messages dropped across all handlers +// due to queue overflow or backpressure. This counter is monotonically increasing +// and is safe to call concurrently. func (s *Streamer) GetDroppedMessages() uint64 { return atomic.LoadUint64(&s.totalDropped) } diff --git a/internal/templates/templates.go b/internal/templates/templates.go index a5b755e..0a507e3 100644 --- a/internal/templates/templates.go +++ b/internal/templates/templates.go @@ -23,9 +23,13 @@ var prefixLengthHTML string // Templates contains all parsed templates type Templates struct { - Status *template.Template - ASDetail *template.Template + // Status is the template for the main status page + Status *template.Template + // ASDetail is the template for displaying AS (Autonomous System) details + ASDetail *template.Template + // PrefixDetail is the template for displaying prefix details PrefixDetail *template.Template + // PrefixLength is the template for displaying prefixes by length PrefixLength *template.Template }