diff --git a/internal/database/interface.go b/internal/database/interface.go index 213368d..a083fe4 100644 --- a/internal/database/interface.go +++ b/internal/database/interface.go @@ -1,3 +1,5 @@ +// Package database provides SQLite storage for BGP routing data including ASNs, +// prefixes, announcements, peerings, and live route tables. package database import ( @@ -5,7 +7,8 @@ import ( "time" ) -// Stats contains database statistics +// Stats contains database statistics including counts for ASNs, prefixes, +// peerings, peers, and live routes, as well as file size and prefix distribution data. type Stats struct { ASNs int Prefixes int @@ -19,7 +22,9 @@ type Stats struct { IPv6PrefixDistribution []PrefixDistribution } -// Store defines the interface for database operations +// Store defines the interface for database operations. It provides methods for +// managing ASNs, prefixes, announcements, peerings, BGP peers, and live routes. +// Implementations must be safe for concurrent use. type Store interface { // ASN operations GetOrCreateASN(number int, timestamp time.Time) (*ASN, error) diff --git a/internal/database/models.go b/internal/database/models.go index 2bd79b4..35a9d2a 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -1,3 +1,4 @@ +// Package database provides SQLite storage for BGP routing data. package database import ( @@ -6,7 +7,8 @@ import ( "github.com/google/uuid" ) -// ASN represents an Autonomous System Number +// ASN represents an Autonomous System Number with its metadata including +// handle, description, and first/last seen timestamps. type ASN struct { ASN int `json:"asn"` Handle string `json:"handle"` diff --git a/internal/ristypes/ris.go b/internal/ristypes/ris.go index 7ff46b3..7a46e27 100644 --- a/internal/ristypes/ris.go +++ b/internal/ristypes/ris.go @@ -6,7 +6,9 @@ import ( "time" ) -// ASPath represents an AS path that may contain nested AS sets +// ASPath represents a BGP AS path as a slice of AS numbers. +// It handles JSON unmarshaling of both simple arrays and nested AS sets, +// flattening any nested structures into a single sequence of AS numbers. type ASPath []int // UnmarshalJSON implements custom JSON unmarshaling to flatten nested arrays diff --git a/internal/routewatch/ashandler.go b/internal/routewatch/ashandler.go index 0e1e2c9..7ea5630 100644 --- a/internal/routewatch/ashandler.go +++ b/internal/routewatch/ashandler.go @@ -22,7 +22,10 @@ const ( asnBatchTimeout = 2 * time.Second ) -// ASHandler handles ASN information from BGP messages using batched operations +// ASHandler processes Autonomous System Number (ASN) information extracted from +// BGP UPDATE messages. It uses batched database operations to efficiently store +// ASN data, collecting operations into batches that are flushed either when the +// batch reaches a size threshold or after a timeout period. type ASHandler struct { db database.Store logger *logger.Logger @@ -40,7 +43,11 @@ type asnOp struct { timestamp time.Time } -// NewASHandler creates a new batched ASN handler +// NewASHandler creates and returns a new ASHandler instance. It initializes +// the batching system and starts a background goroutine that periodically +// flushes accumulated ASN operations to the database. The caller must call +// Stop when finished to ensure all pending operations are flushed and the +// background goroutine is terminated. func NewASHandler(db database.Store, logger *logger.Logger) *ASHandler { h := &ASHandler{ db: db, @@ -57,19 +64,27 @@ func NewASHandler(db database.Store, logger *logger.Logger) *ASHandler { return h } -// WantsMessage returns true if this handler wants to process messages of the given type +// WantsMessage reports whether this handler should process messages of the +// given type. ASHandler only processes "UPDATE" messages, as these contain +// the AS path information needed to track autonomous systems. func (h *ASHandler) WantsMessage(messageType string) bool { // We only care about UPDATE messages for the database return messageType == "UPDATE" } -// QueueCapacity returns the desired queue capacity for this handler +// QueueCapacity returns the recommended message queue size for this handler. +// ASHandler uses a large queue capacity to accommodate high-volume BGP streams, +// as the batching mechanism allows efficient processing of accumulated messages. func (h *ASHandler) QueueCapacity() int { // Batching allows us to use a larger queue return asHandlerQueueSize } -// HandleMessage processes a RIS message and queues database operations +// HandleMessage processes a RIS Live BGP message by extracting all ASNs from +// the AS path and queuing them for batch insertion into the database. The +// origin ASN (last element in the path) and all transit ASNs are recorded +// with their associated timestamps. The batch is automatically flushed when +// it reaches the configured size threshold. func (h *ASHandler) HandleMessage(msg *ristypes.RISMessage) { // Use the pre-parsed timestamp timestamp := msg.ParsedTimestamp diff --git a/internal/routewatch/cli.go b/internal/routewatch/cli.go index 92e707d..d857de7 100644 --- a/internal/routewatch/cli.go +++ b/internal/routewatch/cli.go @@ -53,7 +53,11 @@ func logDebugStats(logger *logger.Logger) { } } -// CLIEntry is the main entry point for the CLI +// CLIEntry is the main entry point for the routewatch command-line interface. +// It initializes the application using the fx dependency injection framework, +// sets up signal handling for graceful shutdown, and starts the RouteWatch service. +// This function blocks until the application receives a shutdown signal or encounters +// a fatal error. func CLIEntry() { app := fx.New( getModule(), diff --git a/internal/routewatch/handler.go b/internal/routewatch/handler.go index c6a6789..328b7ff 100644 --- a/internal/routewatch/handler.go +++ b/internal/routewatch/handler.go @@ -5,14 +5,20 @@ import ( "git.eeqj.de/sneak/routewatch/internal/ristypes" ) -// SimpleHandler is a basic implementation of streamer.MessageHandler +// SimpleHandler is a basic implementation of streamer.MessageHandler that +// filters messages by type and delegates processing to a callback function. +// It provides a simple way to handle specific RIS message types without +// implementing the full MessageHandler interface from scratch. type SimpleHandler struct { logger *logger.Logger messageTypes []string callback func(*ristypes.RISMessage) } -// NewSimpleHandler creates a handler that accepts specific message types +// NewSimpleHandler creates a new SimpleHandler that accepts specific message types. +// The messageTypes parameter specifies which RIS message types this handler will process. +// If messageTypes is empty, the handler will accept all message types. +// The callback function is invoked for each message that passes the type filter. func NewSimpleHandler( logger *logger.Logger, messageTypes []string, @@ -25,7 +31,9 @@ func NewSimpleHandler( } } -// WantsMessage returns true if this handler wants to process messages of the given type +// WantsMessage returns true if this handler wants to process messages of the given type. +// It checks whether messageType is in the handler's configured list of accepted types. +// If no specific types were configured (empty messageTypes slice), it returns true for all types. func (h *SimpleHandler) WantsMessage(messageType string) bool { // If no specific types are set, accept all messages if len(h.messageTypes) == 0 { @@ -41,7 +49,8 @@ func (h *SimpleHandler) WantsMessage(messageType string) bool { return false } -// HandleMessage processes a RIS message +// HandleMessage processes a RIS message by invoking the configured callback function. +// If no callback was provided during construction, the message is silently ignored. func (h *SimpleHandler) HandleMessage(msg *ristypes.RISMessage) { if h.callback != nil { h.callback(msg) diff --git a/internal/routewatch/peerhandler.go b/internal/routewatch/peerhandler.go index d2f0014..0e0182c 100644 --- a/internal/routewatch/peerhandler.go +++ b/internal/routewatch/peerhandler.go @@ -1,5 +1,8 @@ package routewatch +// peerhandler.go provides batched peer tracking functionality for BGP route monitoring. +// It tracks BGP peers from all incoming RIS messages and maintains peer state in the database. + import ( "strconv" "sync" @@ -21,7 +24,10 @@ const ( peerBatchTimeout = 2 * time.Second ) -// PeerHandler tracks BGP peers from all message types using batched operations +// PeerHandler tracks BGP peers from all message types using batched operations. +// It maintains a queue of peer updates and periodically flushes them to the database +// in batches to improve performance. The handler deduplicates peer updates within +// each batch, keeping only the most recent update for each peer IP address. type PeerHandler struct { db database.Store logger *logger.Logger diff --git a/internal/routewatch/peeringhandler.go b/internal/routewatch/peeringhandler.go index a807a33..5fce75f 100644 --- a/internal/routewatch/peeringhandler.go +++ b/internal/routewatch/peeringhandler.go @@ -11,23 +11,36 @@ import ( ) const ( - // peeringHandlerQueueSize is the queue capacity for peering operations + // peeringHandlerQueueSize defines the buffer capacity for the peering + // handler's message queue. This should be large enough to handle bursts + // of BGP UPDATE messages without blocking. peeringHandlerQueueSize = 100000 - // minPathLengthForPeering is the minimum AS path length to extract peerings + // minPathLengthForPeering specifies the minimum number of ASNs required + // in a BGP AS path to extract peering relationships. A path with fewer + // than 2 ASNs cannot contain any peering information. minPathLengthForPeering = 2 - // pathExpirationTime is how long to keep AS paths in memory + // pathExpirationTime determines how long AS paths are kept in memory + // before being eligible for pruning. Paths older than this are removed + // to prevent unbounded memory growth. pathExpirationTime = 30 * time.Minute - // peeringProcessInterval is how often to process AS paths into peerings + // peeringProcessInterval controls how frequently the handler processes + // accumulated AS paths and extracts peering relationships to store + // in the database. peeringProcessInterval = 30 * time.Second - // pathPruneInterval is how often to prune old AS paths + // pathPruneInterval determines how often the handler checks for and + // removes expired AS paths from memory. pathPruneInterval = 5 * time.Minute ) -// PeeringHandler handles AS peering relationships from BGP path data +// PeeringHandler processes BGP UPDATE messages to extract and track +// AS peering relationships. It accumulates AS paths in memory and +// periodically processes them to extract unique peering pairs, which +// are then stored in the database. The handler implements the Handler +// interface for integration with the message processing pipeline. type PeeringHandler struct { db database.Store logger *logger.Logger @@ -39,7 +52,11 @@ type PeeringHandler struct { stopCh chan struct{} } -// NewPeeringHandler creates a new batched peering handler +// NewPeeringHandler creates and initializes a new PeeringHandler with the +// provided database store and logger. It starts two background goroutines: +// one for periodic processing of accumulated AS paths into peering records, +// and one for pruning expired paths from memory. The handler begins +// processing immediately upon creation. func NewPeeringHandler(db database.Store, logger *logger.Logger) *PeeringHandler { h := &PeeringHandler{ db: db, diff --git a/internal/server/handlers.go b/internal/server/handlers.go index 2d54ead..4af06cb 100644 --- a/internal/server/handlers.go +++ b/internal/server/handlers.go @@ -21,18 +21,19 @@ import ( ) const ( - // statsContextTimeout is the timeout for stats API operations + // statsContextTimeout is the timeout for stats API operations. statsContextTimeout = 4 * time.Second ) -// handleRoot returns a handler that redirects to /status +// handleRoot returns a handler that redirects to /status. func (s *Server) handleRoot() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "/status", http.StatusSeeOther) } } -// writeJSONError writes a standardized JSON error response +// writeJSONError writes a standardized JSON error response with the given +// status code and error message. func writeJSONError(w http.ResponseWriter, statusCode int, message string) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) @@ -45,7 +46,8 @@ func writeJSONError(w http.ResponseWriter, statusCode int, message string) { }) } -// writeJSONSuccess writes a standardized JSON success response +// writeJSONSuccess writes a standardized JSON success response containing +// the provided data wrapped in a status envelope. func writeJSONSuccess(w http.ResponseWriter, data interface{}) error { w.Header().Set("Content-Type", "application/json") diff --git a/internal/server/middleware.go b/internal/server/middleware.go index 786fba7..4e279f8 100644 --- a/internal/server/middleware.go +++ b/internal/server/middleware.go @@ -44,7 +44,12 @@ func (rw *responseWriter) Header() http.Header { return rw.ResponseWriter.Header() } -// JSONResponseMiddleware wraps all JSON responses with metadata +// JSONResponseMiddleware is an HTTP middleware that wraps all JSON responses +// with a @meta field containing execution metadata. The metadata includes the +// time zone (always UTC), API version, and request execution time in milliseconds. +// +// Endpoints "/" and "/status" are excluded from this processing and passed through +// unchanged. Non-JSON responses and empty responses are also passed through unchanged. func JSONResponseMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Skip non-JSON endpoints @@ -155,7 +160,13 @@ func (tw *timeoutWriter) markWritten() { tw.written = true } -// TimeoutMiddleware creates a timeout middleware that returns JSON errors +// TimeoutMiddleware creates an HTTP middleware that enforces a request timeout. +// If the handler does not complete within the specified duration, the middleware +// returns a JSON error response with HTTP status 408 (Request Timeout). +// +// The timeout parameter specifies the maximum duration allowed for request processing. +// The returned middleware handles panics from the wrapped handler by re-panicking +// after cleanup, and prevents concurrent writes to the response after timeout occurs. func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -218,7 +229,15 @@ func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler { } } -// JSONValidationMiddleware ensures all JSON API responses are valid JSON +// JSONValidationMiddleware is an HTTP middleware that validates JSON API responses. +// It ensures that responses with Content-Type "application/json" contain valid JSON. +// +// If a response is not valid JSON or is empty when JSON is expected, the middleware +// returns a properly formatted JSON error response. For timeout errors (status 408), +// the error message will be "Request timeout". For other errors, it returns +// "Internal server error" with status 500 if the original status was 200. +// +// Non-JSON responses are passed through unchanged. func JSONValidationMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Create a custom response writer to capture the response