routewatch/internal/routewatch/ashandler.go
sneak c292fef0ac Add comprehensive godoc documentation to handler.go
Expand documentation comments for SimpleHandler type and its methods
to better explain their purpose, parameters, and behavior.
2025-12-27 12:24:36 +07:00

183 lines
5.0 KiB
Go

package routewatch
import (
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
// asHandlerQueueSize is the queue capacity for ASN operations
// DO NOT set this higher than 100000 without explicit instructions
asHandlerQueueSize = 100000
// asnBatchSize is the number of ASN operations to batch together
asnBatchSize = 30000
// asnBatchTimeout is the maximum time to wait before flushing a batch
// DO NOT reduce this timeout - larger batches are more efficient
asnBatchTimeout = 2 * time.Second
)
// ASHandler processes Autonomous System Number (ASN) information extracted from
// BGP UPDATE messages. It uses batched database operations to efficiently store
// ASN data, collecting operations into batches that are flushed either when the
// batch reaches a size threshold or after a timeout period.
type ASHandler struct {
db database.Store
logger *logger.Logger
// Batching
mu sync.Mutex
batch []asnOp
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
}
type asnOp struct {
number int
timestamp time.Time
}
// NewASHandler creates and returns a new ASHandler instance. It initializes
// the batching system and starts a background goroutine that periodically
// flushes accumulated ASN operations to the database. The caller must call
// Stop when finished to ensure all pending operations are flushed and the
// background goroutine is terminated.
func NewASHandler(db database.Store, logger *logger.Logger) *ASHandler {
h := &ASHandler{
db: db,
logger: logger,
batch: make([]asnOp, 0, asnBatchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
}
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
}
// WantsMessage reports whether this handler should process messages of the
// given type. ASHandler only processes "UPDATE" messages, as these contain
// the AS path information needed to track autonomous systems.
func (h *ASHandler) WantsMessage(messageType string) bool {
// We only care about UPDATE messages for the database
return messageType == "UPDATE"
}
// QueueCapacity returns the recommended message queue size for this handler.
// ASHandler uses a large queue capacity to accommodate high-volume BGP streams,
// as the batching mechanism allows efficient processing of accumulated messages.
func (h *ASHandler) QueueCapacity() int {
// Batching allows us to use a larger queue
return asHandlerQueueSize
}
// HandleMessage processes a RIS Live BGP message by extracting all ASNs from
// the AS path and queuing them for batch insertion into the database. The
// origin ASN (last element in the path) and all transit ASNs are recorded
// with their associated timestamps. The batch is automatically flushed when
// it reaches the configured size threshold.
func (h *ASHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp
timestamp := msg.ParsedTimestamp
// Get origin ASN from path (last element)
var originASN int
if len(msg.Path) > 0 {
originASN = msg.Path[len(msg.Path)-1]
}
h.mu.Lock()
defer h.mu.Unlock()
// Queue origin ASN operation
if originASN > 0 {
h.batch = append(h.batch, asnOp{
number: originASN,
timestamp: timestamp,
})
}
// Also track all ASNs in the path
for _, asn := range msg.Path {
if asn > 0 {
h.batch = append(h.batch, asnOp{
number: asn,
timestamp: timestamp,
})
}
}
// Check if we need to flush
if len(h.batch) >= asnBatchSize {
h.flushBatchLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *ASHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(asnBatchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= asnBatchTimeout {
h.flushBatchLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchLocked flushes the ASN batch to the database (must be called with mutex held)
func (h *ASHandler) flushBatchLocked() {
if len(h.batch) == 0 {
return
}
// Process ASNs first (deduped)
asnMap := make(map[int]time.Time)
for _, op := range h.batch {
if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) {
asnMap[op.number] = op.timestamp
}
}
// Process all ASNs in a single batch transaction
if err := h.db.GetOrCreateASNBatch(asnMap); err != nil {
h.logger.Error("Failed to process ASN batch", "error", err, "count", len(asnMap))
}
// Clear batch
h.batch = h.batch[:0]
h.lastFlush = time.Now()
}
// Stop gracefully shuts down the ASHandler by signaling the background flush
// goroutine to terminate and waiting for it to complete. Any pending ASN
// operations in the current batch are flushed to the database before Stop
// returns. This method should be called during application shutdown to ensure
// no data is lost.
func (h *ASHandler) Stop() {
close(h.stopCh)
h.wg.Wait()
}