diff --git a/internal/database/models.go b/internal/database/models.go index 35a9d2a..cd7ef4b 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -17,7 +17,8 @@ type ASN struct { LastSeen time.Time `json:"last_seen"` } -// Prefix represents an IP prefix (CIDR block) +// Prefix represents an IP prefix (CIDR block) with its IP version (4 or 6) +// and first/last seen timestamps. type Prefix struct { ID uuid.UUID `json:"id"` Prefix string `json:"prefix"` @@ -26,7 +27,8 @@ type Prefix struct { LastSeen time.Time `json:"last_seen"` } -// Announcement represents a BGP announcement +// Announcement represents a BGP announcement or withdrawal event, +// containing the prefix, AS path, origin ASN, peer ASN, next hop, and timestamp. type Announcement struct { ID uuid.UUID `json:"id"` PrefixID uuid.UUID `json:"prefix_id"` @@ -38,7 +40,8 @@ type Announcement struct { IsWithdrawal bool `json:"is_withdrawal"` } -// ASNPeering represents a peering relationship between two ASNs +// ASNPeering represents a peering relationship between two ASNs, +// stored with the lower ASN as ASA and the higher as ASB. type ASNPeering struct { ID uuid.UUID `json:"id"` ASA int `json:"as_a"` diff --git a/internal/ristypes/ris.go b/internal/ristypes/ris.go index 7a46e27..69c6b2c 100644 --- a/internal/ristypes/ris.go +++ b/internal/ristypes/ris.go @@ -11,7 +11,9 @@ import ( // flattening any nested structures into a single sequence of AS numbers. type ASPath []int -// UnmarshalJSON implements custom JSON unmarshaling to flatten nested arrays +// UnmarshalJSON implements the json.Unmarshaler interface for ASPath. +// It handles both simple integer arrays [1, 2, 3] and nested AS sets +// like [1, [2, 3], 4], flattening them into a single slice of integers. func (p *ASPath) UnmarshalJSON(data []byte) error { // First try to unmarshal as a simple array of integers var simple []int @@ -48,7 +50,9 @@ func (p *ASPath) UnmarshalJSON(data []byte) error { return nil } -// RISLiveMessage represents the outer wrapper from the RIS Live stream +// RISLiveMessage represents the outer wrapper message from the RIPE RIS Live stream. +// Each message contains a Type field indicating the message type and a Data field +// containing the actual BGP message payload. type RISLiveMessage struct { Type string `json:"type"` Data RISMessage `json:"data"` diff --git a/internal/routewatch/ashandler.go b/internal/routewatch/ashandler.go index 7ea5630..c1ff34f 100644 --- a/internal/routewatch/ashandler.go +++ b/internal/routewatch/ashandler.go @@ -171,7 +171,11 @@ func (h *ASHandler) flushBatchLocked() { h.lastFlush = time.Now() } -// Stop gracefully stops the handler and flushes remaining batches +// Stop gracefully shuts down the ASHandler by signaling the background flush +// goroutine to terminate and waiting for it to complete. Any pending ASN +// operations in the current batch are flushed to the database before Stop +// returns. This method should be called during application shutdown to ensure +// no data is lost. func (h *ASHandler) Stop() { close(h.stopCh) h.wg.Wait() diff --git a/internal/routewatch/peerhandler.go b/internal/routewatch/peerhandler.go index 0e0182c..79980d1 100644 --- a/internal/routewatch/peerhandler.go +++ b/internal/routewatch/peerhandler.go @@ -47,7 +47,10 @@ type peerUpdate struct { timestamp time.Time } -// NewPeerHandler creates a new batched peer tracking handler +// NewPeerHandler creates a new PeerHandler with the given database store and logger. +// It initializes the peer batch buffer and starts a background goroutine that +// periodically flushes accumulated peer updates to the database. The handler +// should be stopped by calling Stop when it is no longer needed. func NewPeerHandler(db database.Store, logger *logger.Logger) *PeerHandler { h := &PeerHandler{ db: db, @@ -64,7 +67,9 @@ func NewPeerHandler(db database.Store, logger *logger.Logger) *PeerHandler { return h } -// WantsMessage returns true for all message types since we track peers from all messages +// WantsMessage returns true for all message types since peer information +// is extracted from every RIS message regardless of type. This satisfies +// the MessageHandler interface. func (h *PeerHandler) WantsMessage(_ string) bool { return true } diff --git a/internal/routewatch/peeringhandler.go b/internal/routewatch/peeringhandler.go index 5fce75f..d58d0ff 100644 --- a/internal/routewatch/peeringhandler.go +++ b/internal/routewatch/peeringhandler.go @@ -72,18 +72,25 @@ func NewPeeringHandler(db database.Store, logger *logger.Logger) *PeeringHandler return h } -// WantsMessage returns true if this handler wants to process messages of the given type +// WantsMessage reports whether the handler should receive messages of the +// given type. PeeringHandler only processes UPDATE messages, as these contain +// the AS path information needed to extract peering relationships. func (h *PeeringHandler) WantsMessage(messageType string) bool { // We only care about UPDATE messages that have AS paths return messageType == "UPDATE" } -// QueueCapacity returns the desired queue capacity for this handler +// QueueCapacity returns the buffer size for the handler's message queue. +// This value is used by the message dispatcher to allocate the channel +// buffer when registering the handler. func (h *PeeringHandler) QueueCapacity() int { return peeringHandlerQueueSize } -// HandleMessage processes a message to extract AS paths +// HandleMessage processes a BGP UPDATE message by storing its AS path +// in memory for later batch processing. Messages with AS paths shorter +// than minPathLengthForPeering are ignored as they cannot contain valid +// peering information. func (h *PeeringHandler) HandleMessage(msg *ristypes.RISMessage) { // Skip if no AS path or only one AS if len(msg.Path) < minPathLengthForPeering { diff --git a/internal/server/handlers.go b/internal/server/handlers.go index 4af06cb..ba7219e 100644 --- a/internal/server/handlers.go +++ b/internal/server/handlers.go @@ -57,7 +57,8 @@ func writeJSONSuccess(w http.ResponseWriter, data interface{}) error { }) } -// handleStatusJSON returns a handler that serves JSON statistics +// handleStatusJSON returns a handler that serves JSON statistics including +// uptime, message counts, database stats, and route information. func (s *Server) handleStatusJSON() http.HandlerFunc { // Stats represents the statistics response type Stats struct { @@ -177,7 +178,8 @@ func (s *Server) handleStatusJSON() http.HandlerFunc { } } -// handleStats returns a handler that serves API v1 statistics +// handleStats returns a handler that serves API v1 statistics including +// detailed handler queue statistics and performance metrics. func (s *Server) handleStats() http.HandlerFunc { // HandlerStatsInfo represents handler statistics in the API response type HandlerStatsInfo struct { @@ -338,7 +340,8 @@ func (s *Server) handleStats() http.HandlerFunc { } } -// handleStatusHTML returns a handler that serves the HTML status page +// handleStatusHTML returns a handler that serves the HTML status page, +// which displays real-time statistics fetched via JavaScript. func (s *Server) handleStatusHTML() http.HandlerFunc { return func(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "text/html; charset=utf-8") diff --git a/internal/streamer/streamer.go b/internal/streamer/streamer.go index 09d9d67..c4a5b92 100644 --- a/internal/streamer/streamer.go +++ b/internal/streamer/streamer.go @@ -19,6 +19,7 @@ import ( "git.eeqj.de/sneak/routewatch/internal/ristypes" ) +// Configuration constants for the RIS Live streamer. const ( risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json&" + "client=https%3A%2F%2Fgit.eeqj.de%2Fsneak%2Froutewatch" @@ -36,16 +37,22 @@ const ( backpressureSlope = 2.0 // Slope for linear drop probability increase ) -// MessageHandler is an interface for handling RIS messages +// MessageHandler defines the interface for processing RIS messages. +// Implementations must specify which message types they want to receive, +// how to process messages, and their desired queue capacity. type MessageHandler interface { - // WantsMessage returns true if this handler wants to process messages of the given type + // WantsMessage returns true if this handler wants to process messages of the given type. WantsMessage(messageType string) bool - // HandleMessage processes a RIS message + // HandleMessage processes a RIS message. This method is called from a dedicated + // goroutine for each handler, so implementations do not need to be goroutine-safe + // with respect to other handlers. HandleMessage(msg *ristypes.RISMessage) - // QueueCapacity returns the desired queue capacity for this handler - // Handlers that process quickly can have larger queues + // QueueCapacity returns the desired queue capacity for this handler. + // Handlers that process quickly can have larger queues to buffer bursts. + // When the queue fills up, messages will be dropped according to the + // backpressure algorithm. QueueCapacity() int }