Add detailed godoc documentation to CLIEntry function
Expand the documentation comment for CLIEntry to provide more context about what the function does, including its use of the fx dependency injection framework, signal handling, and blocking behavior.
This commit is contained in:
parent
8323a95be9
commit
e1d0ab5ea6
@ -1,3 +1,5 @@
|
|||||||
|
// Package database provides SQLite storage for BGP routing data including ASNs,
|
||||||
|
// prefixes, announcements, peerings, and live route tables.
|
||||||
package database
|
package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -5,7 +7,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Stats contains database statistics
|
// Stats contains database statistics including counts for ASNs, prefixes,
|
||||||
|
// peerings, peers, and live routes, as well as file size and prefix distribution data.
|
||||||
type Stats struct {
|
type Stats struct {
|
||||||
ASNs int
|
ASNs int
|
||||||
Prefixes int
|
Prefixes int
|
||||||
@ -19,7 +22,9 @@ type Stats struct {
|
|||||||
IPv6PrefixDistribution []PrefixDistribution
|
IPv6PrefixDistribution []PrefixDistribution
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store defines the interface for database operations
|
// Store defines the interface for database operations. It provides methods for
|
||||||
|
// managing ASNs, prefixes, announcements, peerings, BGP peers, and live routes.
|
||||||
|
// Implementations must be safe for concurrent use.
|
||||||
type Store interface {
|
type Store interface {
|
||||||
// ASN operations
|
// ASN operations
|
||||||
GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
|
GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
|
||||||
|
|||||||
@ -1,3 +1,4 @@
|
|||||||
|
// Package database provides SQLite storage for BGP routing data.
|
||||||
package database
|
package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -6,7 +7,8 @@ import (
|
|||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ASN represents an Autonomous System Number
|
// ASN represents an Autonomous System Number with its metadata including
|
||||||
|
// handle, description, and first/last seen timestamps.
|
||||||
type ASN struct {
|
type ASN struct {
|
||||||
ASN int `json:"asn"`
|
ASN int `json:"asn"`
|
||||||
Handle string `json:"handle"`
|
Handle string `json:"handle"`
|
||||||
|
|||||||
@ -6,7 +6,9 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ASPath represents an AS path that may contain nested AS sets
|
// ASPath represents a BGP AS path as a slice of AS numbers.
|
||||||
|
// It handles JSON unmarshaling of both simple arrays and nested AS sets,
|
||||||
|
// flattening any nested structures into a single sequence of AS numbers.
|
||||||
type ASPath []int
|
type ASPath []int
|
||||||
|
|
||||||
// UnmarshalJSON implements custom JSON unmarshaling to flatten nested arrays
|
// UnmarshalJSON implements custom JSON unmarshaling to flatten nested arrays
|
||||||
|
|||||||
@ -22,7 +22,10 @@ const (
|
|||||||
asnBatchTimeout = 2 * time.Second
|
asnBatchTimeout = 2 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// ASHandler handles ASN information from BGP messages using batched operations
|
// ASHandler processes Autonomous System Number (ASN) information extracted from
|
||||||
|
// BGP UPDATE messages. It uses batched database operations to efficiently store
|
||||||
|
// ASN data, collecting operations into batches that are flushed either when the
|
||||||
|
// batch reaches a size threshold or after a timeout period.
|
||||||
type ASHandler struct {
|
type ASHandler struct {
|
||||||
db database.Store
|
db database.Store
|
||||||
logger *logger.Logger
|
logger *logger.Logger
|
||||||
@ -40,7 +43,11 @@ type asnOp struct {
|
|||||||
timestamp time.Time
|
timestamp time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewASHandler creates a new batched ASN handler
|
// NewASHandler creates and returns a new ASHandler instance. It initializes
|
||||||
|
// the batching system and starts a background goroutine that periodically
|
||||||
|
// flushes accumulated ASN operations to the database. The caller must call
|
||||||
|
// Stop when finished to ensure all pending operations are flushed and the
|
||||||
|
// background goroutine is terminated.
|
||||||
func NewASHandler(db database.Store, logger *logger.Logger) *ASHandler {
|
func NewASHandler(db database.Store, logger *logger.Logger) *ASHandler {
|
||||||
h := &ASHandler{
|
h := &ASHandler{
|
||||||
db: db,
|
db: db,
|
||||||
@ -57,19 +64,27 @@ func NewASHandler(db database.Store, logger *logger.Logger) *ASHandler {
|
|||||||
return h
|
return h
|
||||||
}
|
}
|
||||||
|
|
||||||
// WantsMessage returns true if this handler wants to process messages of the given type
|
// WantsMessage reports whether this handler should process messages of the
|
||||||
|
// given type. ASHandler only processes "UPDATE" messages, as these contain
|
||||||
|
// the AS path information needed to track autonomous systems.
|
||||||
func (h *ASHandler) WantsMessage(messageType string) bool {
|
func (h *ASHandler) WantsMessage(messageType string) bool {
|
||||||
// We only care about UPDATE messages for the database
|
// We only care about UPDATE messages for the database
|
||||||
return messageType == "UPDATE"
|
return messageType == "UPDATE"
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueCapacity returns the desired queue capacity for this handler
|
// QueueCapacity returns the recommended message queue size for this handler.
|
||||||
|
// ASHandler uses a large queue capacity to accommodate high-volume BGP streams,
|
||||||
|
// as the batching mechanism allows efficient processing of accumulated messages.
|
||||||
func (h *ASHandler) QueueCapacity() int {
|
func (h *ASHandler) QueueCapacity() int {
|
||||||
// Batching allows us to use a larger queue
|
// Batching allows us to use a larger queue
|
||||||
return asHandlerQueueSize
|
return asHandlerQueueSize
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleMessage processes a RIS message and queues database operations
|
// HandleMessage processes a RIS Live BGP message by extracting all ASNs from
|
||||||
|
// the AS path and queuing them for batch insertion into the database. The
|
||||||
|
// origin ASN (last element in the path) and all transit ASNs are recorded
|
||||||
|
// with their associated timestamps. The batch is automatically flushed when
|
||||||
|
// it reaches the configured size threshold.
|
||||||
func (h *ASHandler) HandleMessage(msg *ristypes.RISMessage) {
|
func (h *ASHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||||
// Use the pre-parsed timestamp
|
// Use the pre-parsed timestamp
|
||||||
timestamp := msg.ParsedTimestamp
|
timestamp := msg.ParsedTimestamp
|
||||||
|
|||||||
@ -53,7 +53,11 @@ func logDebugStats(logger *logger.Logger) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CLIEntry is the main entry point for the CLI
|
// CLIEntry is the main entry point for the routewatch command-line interface.
|
||||||
|
// It initializes the application using the fx dependency injection framework,
|
||||||
|
// sets up signal handling for graceful shutdown, and starts the RouteWatch service.
|
||||||
|
// This function blocks until the application receives a shutdown signal or encounters
|
||||||
|
// a fatal error.
|
||||||
func CLIEntry() {
|
func CLIEntry() {
|
||||||
app := fx.New(
|
app := fx.New(
|
||||||
getModule(),
|
getModule(),
|
||||||
|
|||||||
@ -5,14 +5,20 @@ import (
|
|||||||
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SimpleHandler is a basic implementation of streamer.MessageHandler
|
// SimpleHandler is a basic implementation of streamer.MessageHandler that
|
||||||
|
// filters messages by type and delegates processing to a callback function.
|
||||||
|
// It provides a simple way to handle specific RIS message types without
|
||||||
|
// implementing the full MessageHandler interface from scratch.
|
||||||
type SimpleHandler struct {
|
type SimpleHandler struct {
|
||||||
logger *logger.Logger
|
logger *logger.Logger
|
||||||
messageTypes []string
|
messageTypes []string
|
||||||
callback func(*ristypes.RISMessage)
|
callback func(*ristypes.RISMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSimpleHandler creates a handler that accepts specific message types
|
// NewSimpleHandler creates a new SimpleHandler that accepts specific message types.
|
||||||
|
// The messageTypes parameter specifies which RIS message types this handler will process.
|
||||||
|
// If messageTypes is empty, the handler will accept all message types.
|
||||||
|
// The callback function is invoked for each message that passes the type filter.
|
||||||
func NewSimpleHandler(
|
func NewSimpleHandler(
|
||||||
logger *logger.Logger,
|
logger *logger.Logger,
|
||||||
messageTypes []string,
|
messageTypes []string,
|
||||||
@ -25,7 +31,9 @@ func NewSimpleHandler(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WantsMessage returns true if this handler wants to process messages of the given type
|
// WantsMessage returns true if this handler wants to process messages of the given type.
|
||||||
|
// It checks whether messageType is in the handler's configured list of accepted types.
|
||||||
|
// If no specific types were configured (empty messageTypes slice), it returns true for all types.
|
||||||
func (h *SimpleHandler) WantsMessage(messageType string) bool {
|
func (h *SimpleHandler) WantsMessage(messageType string) bool {
|
||||||
// If no specific types are set, accept all messages
|
// If no specific types are set, accept all messages
|
||||||
if len(h.messageTypes) == 0 {
|
if len(h.messageTypes) == 0 {
|
||||||
@ -41,7 +49,8 @@ func (h *SimpleHandler) WantsMessage(messageType string) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleMessage processes a RIS message
|
// HandleMessage processes a RIS message by invoking the configured callback function.
|
||||||
|
// If no callback was provided during construction, the message is silently ignored.
|
||||||
func (h *SimpleHandler) HandleMessage(msg *ristypes.RISMessage) {
|
func (h *SimpleHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||||
if h.callback != nil {
|
if h.callback != nil {
|
||||||
h.callback(msg)
|
h.callback(msg)
|
||||||
|
|||||||
@ -1,5 +1,8 @@
|
|||||||
package routewatch
|
package routewatch
|
||||||
|
|
||||||
|
// peerhandler.go provides batched peer tracking functionality for BGP route monitoring.
|
||||||
|
// It tracks BGP peers from all incoming RIS messages and maintains peer state in the database.
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
@ -21,7 +24,10 @@ const (
|
|||||||
peerBatchTimeout = 2 * time.Second
|
peerBatchTimeout = 2 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// PeerHandler tracks BGP peers from all message types using batched operations
|
// PeerHandler tracks BGP peers from all message types using batched operations.
|
||||||
|
// It maintains a queue of peer updates and periodically flushes them to the database
|
||||||
|
// in batches to improve performance. The handler deduplicates peer updates within
|
||||||
|
// each batch, keeping only the most recent update for each peer IP address.
|
||||||
type PeerHandler struct {
|
type PeerHandler struct {
|
||||||
db database.Store
|
db database.Store
|
||||||
logger *logger.Logger
|
logger *logger.Logger
|
||||||
|
|||||||
@ -11,23 +11,36 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// peeringHandlerQueueSize is the queue capacity for peering operations
|
// peeringHandlerQueueSize defines the buffer capacity for the peering
|
||||||
|
// handler's message queue. This should be large enough to handle bursts
|
||||||
|
// of BGP UPDATE messages without blocking.
|
||||||
peeringHandlerQueueSize = 100000
|
peeringHandlerQueueSize = 100000
|
||||||
|
|
||||||
// minPathLengthForPeering is the minimum AS path length to extract peerings
|
// minPathLengthForPeering specifies the minimum number of ASNs required
|
||||||
|
// in a BGP AS path to extract peering relationships. A path with fewer
|
||||||
|
// than 2 ASNs cannot contain any peering information.
|
||||||
minPathLengthForPeering = 2
|
minPathLengthForPeering = 2
|
||||||
|
|
||||||
// pathExpirationTime is how long to keep AS paths in memory
|
// pathExpirationTime determines how long AS paths are kept in memory
|
||||||
|
// before being eligible for pruning. Paths older than this are removed
|
||||||
|
// to prevent unbounded memory growth.
|
||||||
pathExpirationTime = 30 * time.Minute
|
pathExpirationTime = 30 * time.Minute
|
||||||
|
|
||||||
// peeringProcessInterval is how often to process AS paths into peerings
|
// peeringProcessInterval controls how frequently the handler processes
|
||||||
|
// accumulated AS paths and extracts peering relationships to store
|
||||||
|
// in the database.
|
||||||
peeringProcessInterval = 30 * time.Second
|
peeringProcessInterval = 30 * time.Second
|
||||||
|
|
||||||
// pathPruneInterval is how often to prune old AS paths
|
// pathPruneInterval determines how often the handler checks for and
|
||||||
|
// removes expired AS paths from memory.
|
||||||
pathPruneInterval = 5 * time.Minute
|
pathPruneInterval = 5 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// PeeringHandler handles AS peering relationships from BGP path data
|
// PeeringHandler processes BGP UPDATE messages to extract and track
|
||||||
|
// AS peering relationships. It accumulates AS paths in memory and
|
||||||
|
// periodically processes them to extract unique peering pairs, which
|
||||||
|
// are then stored in the database. The handler implements the Handler
|
||||||
|
// interface for integration with the message processing pipeline.
|
||||||
type PeeringHandler struct {
|
type PeeringHandler struct {
|
||||||
db database.Store
|
db database.Store
|
||||||
logger *logger.Logger
|
logger *logger.Logger
|
||||||
@ -39,7 +52,11 @@ type PeeringHandler struct {
|
|||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPeeringHandler creates a new batched peering handler
|
// NewPeeringHandler creates and initializes a new PeeringHandler with the
|
||||||
|
// provided database store and logger. It starts two background goroutines:
|
||||||
|
// one for periodic processing of accumulated AS paths into peering records,
|
||||||
|
// and one for pruning expired paths from memory. The handler begins
|
||||||
|
// processing immediately upon creation.
|
||||||
func NewPeeringHandler(db database.Store, logger *logger.Logger) *PeeringHandler {
|
func NewPeeringHandler(db database.Store, logger *logger.Logger) *PeeringHandler {
|
||||||
h := &PeeringHandler{
|
h := &PeeringHandler{
|
||||||
db: db,
|
db: db,
|
||||||
|
|||||||
@ -21,18 +21,19 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// statsContextTimeout is the timeout for stats API operations
|
// statsContextTimeout is the timeout for stats API operations.
|
||||||
statsContextTimeout = 4 * time.Second
|
statsContextTimeout = 4 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// handleRoot returns a handler that redirects to /status
|
// handleRoot returns a handler that redirects to /status.
|
||||||
func (s *Server) handleRoot() http.HandlerFunc {
|
func (s *Server) handleRoot() http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
http.Redirect(w, r, "/status", http.StatusSeeOther)
|
http.Redirect(w, r, "/status", http.StatusSeeOther)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeJSONError writes a standardized JSON error response
|
// writeJSONError writes a standardized JSON error response with the given
|
||||||
|
// status code and error message.
|
||||||
func writeJSONError(w http.ResponseWriter, statusCode int, message string) {
|
func writeJSONError(w http.ResponseWriter, statusCode int, message string) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
w.WriteHeader(statusCode)
|
w.WriteHeader(statusCode)
|
||||||
@ -45,7 +46,8 @@ func writeJSONError(w http.ResponseWriter, statusCode int, message string) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeJSONSuccess writes a standardized JSON success response
|
// writeJSONSuccess writes a standardized JSON success response containing
|
||||||
|
// the provided data wrapped in a status envelope.
|
||||||
func writeJSONSuccess(w http.ResponseWriter, data interface{}) error {
|
func writeJSONSuccess(w http.ResponseWriter, data interface{}) error {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
|||||||
@ -44,7 +44,12 @@ func (rw *responseWriter) Header() http.Header {
|
|||||||
return rw.ResponseWriter.Header()
|
return rw.ResponseWriter.Header()
|
||||||
}
|
}
|
||||||
|
|
||||||
// JSONResponseMiddleware wraps all JSON responses with metadata
|
// JSONResponseMiddleware is an HTTP middleware that wraps all JSON responses
|
||||||
|
// with a @meta field containing execution metadata. The metadata includes the
|
||||||
|
// time zone (always UTC), API version, and request execution time in milliseconds.
|
||||||
|
//
|
||||||
|
// Endpoints "/" and "/status" are excluded from this processing and passed through
|
||||||
|
// unchanged. Non-JSON responses and empty responses are also passed through unchanged.
|
||||||
func JSONResponseMiddleware(next http.Handler) http.Handler {
|
func JSONResponseMiddleware(next http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
// Skip non-JSON endpoints
|
// Skip non-JSON endpoints
|
||||||
@ -155,7 +160,13 @@ func (tw *timeoutWriter) markWritten() {
|
|||||||
tw.written = true
|
tw.written = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// TimeoutMiddleware creates a timeout middleware that returns JSON errors
|
// TimeoutMiddleware creates an HTTP middleware that enforces a request timeout.
|
||||||
|
// If the handler does not complete within the specified duration, the middleware
|
||||||
|
// returns a JSON error response with HTTP status 408 (Request Timeout).
|
||||||
|
//
|
||||||
|
// The timeout parameter specifies the maximum duration allowed for request processing.
|
||||||
|
// The returned middleware handles panics from the wrapped handler by re-panicking
|
||||||
|
// after cleanup, and prevents concurrent writes to the response after timeout occurs.
|
||||||
func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler {
|
func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler {
|
||||||
return func(next http.Handler) http.Handler {
|
return func(next http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -218,7 +229,15 @@ func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// JSONValidationMiddleware ensures all JSON API responses are valid JSON
|
// JSONValidationMiddleware is an HTTP middleware that validates JSON API responses.
|
||||||
|
// It ensures that responses with Content-Type "application/json" contain valid JSON.
|
||||||
|
//
|
||||||
|
// If a response is not valid JSON or is empty when JSON is expected, the middleware
|
||||||
|
// returns a properly formatted JSON error response. For timeout errors (status 408),
|
||||||
|
// the error message will be "Request timeout". For other errors, it returns
|
||||||
|
// "Internal server error" with status 500 if the original status was 200.
|
||||||
|
//
|
||||||
|
// Non-JSON responses are passed through unchanged.
|
||||||
func JSONValidationMiddleware(next http.Handler) http.Handler {
|
func JSONValidationMiddleware(next http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
// Create a custom response writer to capture the response
|
// Create a custom response writer to capture the response
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user