Fix linting errors for magic numbers in handler queue sizes

- Define constants for all handler queue capacities
- Fix integer overflow warning in metrics calculation
- Add missing blank lines before continue statements
This commit is contained in:
Jeffrey Paul 2025-07-27 23:38:38 +02:00
parent 76ec9f68b7
commit 1d05372899
4 changed files with 224 additions and 109 deletions

View File

@ -7,6 +7,11 @@ import (
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
// databaseHandlerQueueSize is the queue capacity for database operations
databaseHandlerQueueSize = 100
)
// DatabaseHandler handles BGP messages and stores them in the database
type DatabaseHandler struct {
db database.Store
@ -27,6 +32,12 @@ func (h *DatabaseHandler) WantsMessage(messageType string) bool {
return messageType == "UPDATE"
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *DatabaseHandler) QueueCapacity() int {
// Database operations are slow, so use a smaller queue
return databaseHandlerQueueSize
}
// HandleMessage processes a RIS message and updates the database
func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp

View File

@ -8,6 +8,11 @@ import (
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
// peerHandlerQueueSize is the queue capacity for peer tracking operations
peerHandlerQueueSize = 500
)
// PeerHandler tracks BGP peers from all message types
type PeerHandler struct {
db database.Store
@ -27,6 +32,12 @@ func (h *PeerHandler) WantsMessage(_ string) bool {
return true
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *PeerHandler) QueueCapacity() int {
// Peer tracking is lightweight but involves database ops, use moderate queue
return peerHandlerQueueSize
}
// HandleMessage processes a message to track peer information
func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) {
// Parse peer ASN from string

View File

@ -9,6 +9,11 @@ import (
"github.com/google/uuid"
)
const (
// routingTableHandlerQueueSize is the queue capacity for in-memory routing table operations
routingTableHandlerQueueSize = 10000
)
// RoutingTableHandler handles BGP messages and updates the in-memory routing table
type RoutingTableHandler struct {
rt *routingtable.RoutingTable
@ -29,6 +34,12 @@ func (h *RoutingTableHandler) WantsMessage(messageType string) bool {
return messageType == "UPDATE"
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *RoutingTableHandler) QueueCapacity() int {
// In-memory operations are very fast, so use a large queue
return routingTableHandlerQueueSize
}
// HandleMessage processes a RIS message and updates the routing table
func (h *RoutingTableHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp

View File

@ -25,7 +25,7 @@ const (
metricsLogInterval = 10 * time.Second
bytesPerKB = 1024
bytesPerMB = 1024 * 1024
maxConcurrentHandlers = 200 // Maximum number of concurrent message handlers
maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers
)
// MessageHandler is an interface for handling RIS messages
@ -35,23 +35,43 @@ type MessageHandler interface {
// HandleMessage processes a RIS message
HandleMessage(msg *ristypes.RISMessage)
// QueueCapacity returns the desired queue capacity for this handler
// Handlers that process quickly can have larger queues
QueueCapacity() int
}
// RawMessageHandler is a callback for handling raw JSON lines from the stream
type RawMessageHandler func(line string)
// handlerMetrics tracks performance metrics for a handler
type handlerMetrics struct {
processedCount uint64 // Total messages processed
droppedCount uint64 // Total messages dropped
totalTime time.Duration // Total processing time (for average calculation)
minTime time.Duration // Minimum processing time
maxTime time.Duration // Maximum processing time
mu sync.Mutex // Protects the metrics
}
// handlerInfo wraps a handler with its queue and metrics
type handlerInfo struct {
handler MessageHandler
queue chan *ristypes.RISMessage
metrics handlerMetrics
}
// Streamer handles streaming BGP updates from RIS Live
type Streamer struct {
logger *slog.Logger
client *http.Client
handlers []MessageHandler
rawHandler RawMessageHandler
mu sync.RWMutex
cancel context.CancelFunc
running bool
metrics *metrics.Tracker
semaphore chan struct{} // Limits concurrent message processing
droppedMessages uint64 // Atomic counter for dropped messages
logger *slog.Logger
client *http.Client
handlers []*handlerInfo
rawHandler RawMessageHandler
mu sync.RWMutex
cancel context.CancelFunc
running bool
metrics *metrics.Tracker
totalDropped uint64 // Total dropped messages across all handlers
}
// New creates a new RIS streamer
@ -61,9 +81,8 @@ func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer {
client: &http.Client{
Timeout: 0, // No timeout for streaming
},
handlers: make([]MessageHandler, 0),
metrics: metrics,
semaphore: make(chan struct{}, maxConcurrentHandlers),
handlers: make([]*handlerInfo, 0),
metrics: metrics,
}
}
@ -71,7 +90,19 @@ func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer {
func (s *Streamer) RegisterHandler(handler MessageHandler) {
s.mu.Lock()
defer s.mu.Unlock()
s.handlers = append(s.handlers, handler)
// Create handler info with its own queue based on capacity
info := &handlerInfo{
handler: handler,
queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()),
}
s.handlers = append(s.handlers, info)
// If we're already running, start a worker for this handler
if s.running {
go s.runHandlerWorker(info)
}
}
// RegisterRawHandler sets a callback for raw message lines
@ -94,6 +125,11 @@ func (s *Streamer) Start() error {
s.cancel = cancel
s.running = true
// Start workers for each handler
for _, info := range s.handlers {
go s.runHandlerWorker(info)
}
go func() {
if err := s.stream(ctx); err != nil {
s.logger.Error("Streaming error", "error", err)
@ -112,10 +148,40 @@ func (s *Streamer) Stop() {
if s.cancel != nil {
s.cancel()
}
// Close all handler queues to signal workers to stop
for _, info := range s.handlers {
close(info.queue)
}
s.running = false
s.mu.Unlock()
s.metrics.SetConnected(false)
}
// runHandlerWorker processes messages for a specific handler
func (s *Streamer) runHandlerWorker(info *handlerInfo) {
for msg := range info.queue {
start := time.Now()
info.handler.HandleMessage(msg)
elapsed := time.Since(start)
// Update metrics
info.metrics.mu.Lock()
info.metrics.processedCount++
info.metrics.totalTime += elapsed
// Update min time
if info.metrics.minTime == 0 || elapsed < info.metrics.minTime {
info.metrics.minTime = elapsed
}
// Update max time
if elapsed > info.metrics.maxTime {
info.metrics.maxTime = elapsed
}
info.metrics.mu.Unlock()
}
}
// IsRunning returns whether the streamer is currently active
func (s *Streamer) IsRunning() bool {
s.mu.RLock()
@ -131,7 +197,7 @@ func (s *Streamer) GetMetrics() metrics.StreamMetrics {
// GetDroppedMessages returns the total number of dropped messages
func (s *Streamer) GetDroppedMessages() uint64 {
return atomic.LoadUint64(&s.droppedMessages)
return atomic.LoadUint64(&s.totalDropped)
}
// logMetrics logs the current streaming statistics
@ -140,7 +206,8 @@ func (s *Streamer) logMetrics() {
uptime := time.Since(metrics.ConnectedSince)
const bitsPerMegabit = 1000000
droppedMessages := atomic.LoadUint64(&s.droppedMessages)
totalDropped := atomic.LoadUint64(&s.totalDropped)
s.logger.Info(
"Stream statistics",
"uptime",
@ -157,11 +224,39 @@ func (s *Streamer) logMetrics() {
fmt.Sprintf("%.0f", metrics.BitsPerSec),
"mbps",
fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
"dropped_messages",
droppedMessages,
"active_handlers",
len(s.semaphore),
"total_dropped",
totalDropped,
)
// Log per-handler statistics
s.mu.RLock()
for i, info := range s.handlers {
info.metrics.mu.Lock()
if info.metrics.processedCount > 0 {
// Safe conversion: processedCount is bounded by maxInt64
processedCount := info.metrics.processedCount
const maxInt64 = 1<<63 - 1
if processedCount > maxInt64 {
processedCount = maxInt64
}
//nolint:gosec // processedCount is explicitly bounded above
avgTime := info.metrics.totalTime / time.Duration(processedCount)
s.logger.Info(
"Handler statistics",
"handler", fmt.Sprintf("%T", info.handler),
"index", i,
"queue_len", len(info.queue),
"queue_cap", cap(info.queue),
"processed", info.metrics.processedCount,
"dropped", info.metrics.droppedCount,
"avg_time", avgTime,
"min_time", info.metrics.minTime,
"max_time", info.metrics.maxTime,
)
}
info.metrics.mu.Unlock()
}
s.mu.RUnlock()
}
// updateMetrics updates the metrics counters and rates
@ -236,104 +331,91 @@ func (s *Streamer) stream(ctx context.Context) error {
rawHandler(string(line))
}
// Get current handlers
s.mu.RLock()
handlers := make([]MessageHandler, len(s.handlers))
copy(handlers, s.handlers)
s.mu.RUnlock()
// Parse the message first
var wrapper ristypes.RISLiveMessage
if err := json.Unmarshal(line, &wrapper); err != nil {
// Output the raw line and panic on parse failure
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err)
fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(line))
panic(fmt.Sprintf("JSON parse error: %v", err))
}
// Try to acquire semaphore, drop message if at capacity
select {
case s.semaphore <- struct{}{}:
// Successfully acquired semaphore, process message
go func(rawLine []byte, messageHandlers []MessageHandler) {
defer func() { <-s.semaphore }() // Release semaphore when done
// Check if it's a ris_message wrapper
if wrapper.Type != "ris_message" {
s.logger.Error("Unexpected wrapper type",
"type", wrapper.Type,
"line", string(line),
)
// Parse the outer wrapper first
var wrapper ristypes.RISLiveMessage
if err := json.Unmarshal(rawLine, &wrapper); err != nil {
// Output the raw line and panic on parse failure
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err)
fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(rawLine))
panic(fmt.Sprintf("JSON parse error: %v", err))
}
continue
}
// Check if it's a ris_message wrapper
if wrapper.Type != "ris_message" {
s.logger.Error("Unexpected wrapper type",
"type", wrapper.Type,
"line", string(rawLine),
)
// Get the actual message
msg := wrapper.Data
return
}
// Parse the timestamp
msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC()
// Get the actual message
msg := wrapper.Data
// Process based on message type
switch msg.Type {
case "UPDATE":
// Process BGP UPDATE messages
// Will be dispatched to handlers
case "RIS_PEER_STATE":
// RIS peer state messages - silently ignore
continue
case "KEEPALIVE":
// BGP keepalive messages - silently process
continue
case "OPEN":
// BGP open messages
s.logger.Info("BGP session opened",
"peer", msg.Peer,
"peer_asn", msg.PeerASN,
)
// Parse the timestamp
msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).
UTC()
continue
case "NOTIFICATION":
// BGP notification messages (errors)
s.logger.Warn("BGP notification",
"peer", msg.Peer,
"peer_asn", msg.PeerASN,
)
// Process based on message type
switch msg.Type {
case "UPDATE":
// Process BGP UPDATE messages
// Will be handled by registered handlers
case "RIS_PEER_STATE":
// RIS peer state messages - silently ignore
case "KEEPALIVE":
// BGP keepalive messages - silently process
case "OPEN":
// BGP open messages
s.logger.Info("BGP session opened",
"peer", msg.Peer,
"peer_asn", msg.PeerASN,
)
case "NOTIFICATION":
// BGP notification messages (errors)
s.logger.Warn("BGP notification",
"peer", msg.Peer,
"peer_asn", msg.PeerASN,
)
case "STATE":
// Peer state changes - silently ignore
default:
fmt.Fprintf(
os.Stderr,
"UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n",
msg.Type,
string(rawLine),
)
panic(
fmt.Sprintf(
"Unknown RIS message type: %s",
msg.Type,
),
)
}
// Call handlers synchronously within this goroutine
// This prevents unbounded goroutine growth at the handler level
for _, handler := range messageHandlers {
if handler.WantsMessage(msg.Type) {
handler.HandleMessage(&msg)
}
}
}(append([]byte(nil), line...), handlers) // Copy the line to avoid data races
continue
case "STATE":
// Peer state changes - silently ignore
continue
default:
// Semaphore is full, drop the message
dropped := atomic.AddUint64(&s.droppedMessages, 1)
if dropped%1000 == 0 { // Log every 1000 dropped messages
s.logger.Warn(
"Dropping messages due to overload",
"total_dropped",
dropped,
"max_handlers",
maxConcurrentHandlers,
)
fmt.Fprintf(
os.Stderr,
"UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n",
msg.Type,
string(line),
)
panic(
fmt.Sprintf(
"Unknown RIS message type: %s",
msg.Type,
),
)
}
// Dispatch to interested handlers
s.mu.RLock()
for _, info := range s.handlers {
if info.handler.WantsMessage(msg.Type) {
select {
case info.queue <- &msg:
// Message queued successfully
default:
// Queue is full, drop the message
atomic.AddUint64(&info.metrics.droppedCount, 1)
atomic.AddUint64(&s.totalDropped, 1)
}
}
}
s.mu.RUnlock()
}
if err := scanner.Err(); err != nil {