From 1157003db7a57e5f1465cbc603ae86b8568581e4 Mon Sep 17 00:00:00 2001 From: sneak Date: Mon, 28 Jul 2025 02:31:04 +0200 Subject: [PATCH] Refactor database handlers and optimize PeeringHandler - Create PeeringHandler for asn_peerings table maintenance - Rename DBHandler to ASHandler (now only handles asns table) - Move prefixes table maintenance to PrefixHandler - Optimize PeeringHandler with in-memory AS path tracking: - Stores AS paths in memory with timestamps - Processes peerings in batch every 2 minutes - Prunes old paths (>30 minutes) every 5 minutes - Normalizes peerings with lower AS number first - Each handler now has a single responsibility: - ASHandler: asns table - PeerHandler: bgp_peers table - PrefixHandler: prefixes and live_routes tables - PeeringHandler: asn_peerings table --- internal/routewatch/app.go | 36 +-- internal/routewatch/app_integration_test.go | 5 + internal/routewatch/ashandler.go | 163 +++++++++++++ internal/routewatch/dbhandler.go | 221 ----------------- internal/routewatch/peeringhandler.go | 248 ++++++++++++++++++++ 5 files changed, 438 insertions(+), 235 deletions(-) create mode 100644 internal/routewatch/ashandler.go delete mode 100644 internal/routewatch/dbhandler.go create mode 100644 internal/routewatch/peeringhandler.go diff --git a/internal/routewatch/app.go b/internal/routewatch/app.go index 57a8030..27ff8d6 100644 --- a/internal/routewatch/app.go +++ b/internal/routewatch/app.go @@ -40,19 +40,20 @@ type Dependencies struct { // RouteWatch represents the main application instance type RouteWatch struct { - db database.Store - routingTable *routingtable.RoutingTable - streamer *streamer.Streamer - server *server.Server - snapshotter *snapshotter.Snapshotter - logger *logger.Logger - maxRuntime time.Duration - shutdown bool - mu sync.Mutex - config *config.Config - dbHandler *DBHandler - peerHandler *PeerHandler - prefixHandler *PrefixHandler + db database.Store + routingTable *routingtable.RoutingTable + streamer *streamer.Streamer + server *server.Server + snapshotter *snapshotter.Snapshotter + logger *logger.Logger + maxRuntime time.Duration + shutdown bool + mu sync.Mutex + config *config.Config + dbHandler *ASHandler + peerHandler *PeerHandler + prefixHandler *PrefixHandler + peeringHandler *PeeringHandler } // isTruthy returns true if the value is considered truthy @@ -107,14 +108,21 @@ func (rw *RouteWatch) Run(ctx context.Context) error { // Register database handler to process BGP UPDATE messages if rw.config.EnableBatchedDatabaseWrites { rw.logger.Info("Using batched database handlers for improved performance") - rw.dbHandler = NewDBHandler(rw.db, rw.logger) + // ASHandler maintains the asns table + rw.dbHandler = NewASHandler(rw.db, rw.logger) rw.streamer.RegisterHandler(rw.dbHandler) + // PeerHandler maintains the bgp_peers table rw.peerHandler = NewPeerHandler(rw.db, rw.logger) rw.streamer.RegisterHandler(rw.peerHandler) + // PrefixHandler maintains the prefixes and live_routes tables rw.prefixHandler = NewPrefixHandler(rw.db, rw.logger) rw.streamer.RegisterHandler(rw.prefixHandler) + + // PeeringHandler maintains the asn_peerings table + rw.peeringHandler = NewPeeringHandler(rw.db, rw.logger) + rw.streamer.RegisterHandler(rw.peeringHandler) } else { // Non-batched handlers not implemented yet rw.logger.Error("Non-batched handlers not implemented") diff --git a/internal/routewatch/app_integration_test.go b/internal/routewatch/app_integration_test.go index 9e49e13..1cf02c9 100644 --- a/internal/routewatch/app_integration_test.go +++ b/internal/routewatch/app_integration_test.go @@ -224,6 +224,11 @@ func TestRouteWatchLiveFeed(t *testing.T) { // Wait for the configured duration time.Sleep(5 * time.Second) + // Force peering processing for test + if rw.peeringHandler != nil { + rw.peeringHandler.ProcessPeeringsNow() + } + // Get statistics stats, err := mockDB.GetStats() if err != nil { diff --git a/internal/routewatch/ashandler.go b/internal/routewatch/ashandler.go new file mode 100644 index 0000000..14a3619 --- /dev/null +++ b/internal/routewatch/ashandler.go @@ -0,0 +1,163 @@ +package routewatch + +import ( + "sync" + "time" + + "git.eeqj.de/sneak/routewatch/internal/database" + "git.eeqj.de/sneak/routewatch/internal/logger" + "git.eeqj.de/sneak/routewatch/internal/ristypes" +) + +const ( + // asHandlerQueueSize is the queue capacity for ASN operations + asHandlerQueueSize = 200000 + + // asnBatchSize is the number of ASN operations to batch together + asnBatchSize = 10000 + + // asnBatchTimeout is the maximum time to wait before flushing a batch + asnBatchTimeout = 2 * time.Second +) + +// ASHandler handles ASN information from BGP messages using batched operations +type ASHandler struct { + db database.Store + logger *logger.Logger + + // Batching + mu sync.Mutex + batch []asnOp + lastFlush time.Time + stopCh chan struct{} + wg sync.WaitGroup +} + +type asnOp struct { + number int + timestamp time.Time +} + +// NewASHandler creates a new batched ASN handler +func NewASHandler(db database.Store, logger *logger.Logger) *ASHandler { + h := &ASHandler{ + db: db, + logger: logger, + batch: make([]asnOp, 0, asnBatchSize), + lastFlush: time.Now(), + stopCh: make(chan struct{}), + } + + // Start the flush timer goroutine + h.wg.Add(1) + go h.flushLoop() + + return h +} + +// WantsMessage returns true if this handler wants to process messages of the given type +func (h *ASHandler) WantsMessage(messageType string) bool { + // We only care about UPDATE messages for the database + return messageType == "UPDATE" +} + +// QueueCapacity returns the desired queue capacity for this handler +func (h *ASHandler) QueueCapacity() int { + // Batching allows us to use a larger queue + return asHandlerQueueSize +} + +// HandleMessage processes a RIS message and queues database operations +func (h *ASHandler) HandleMessage(msg *ristypes.RISMessage) { + // Use the pre-parsed timestamp + timestamp := msg.ParsedTimestamp + + // Get origin ASN from path (last element) + var originASN int + if len(msg.Path) > 0 { + originASN = msg.Path[len(msg.Path)-1] + } + + h.mu.Lock() + defer h.mu.Unlock() + + // Queue origin ASN operation + if originASN > 0 { + h.batch = append(h.batch, asnOp{ + number: originASN, + timestamp: timestamp, + }) + } + + // Also track all ASNs in the path + for _, asn := range msg.Path { + if asn > 0 { + h.batch = append(h.batch, asnOp{ + number: asn, + timestamp: timestamp, + }) + } + } + + // Check if we need to flush + if len(h.batch) >= asnBatchSize { + h.flushBatchLocked() + } +} + +// flushLoop runs in a goroutine and periodically flushes batches +func (h *ASHandler) flushLoop() { + defer h.wg.Done() + ticker := time.NewTicker(asnBatchTimeout) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + h.mu.Lock() + if time.Since(h.lastFlush) >= asnBatchTimeout { + h.flushBatchLocked() + } + h.mu.Unlock() + case <-h.stopCh: + // Final flush + h.mu.Lock() + h.flushBatchLocked() + h.mu.Unlock() + + return + } + } +} + +// flushBatchLocked flushes the ASN batch to the database (must be called with mutex held) +func (h *ASHandler) flushBatchLocked() { + if len(h.batch) == 0 { + return + } + + // Process ASNs first (deduped) + asnMap := make(map[int]time.Time) + for _, op := range h.batch { + if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) { + asnMap[op.number] = op.timestamp + } + } + + for asn, ts := range asnMap { + _, err := h.db.GetOrCreateASN(asn, ts) + if err != nil { + h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err) + } + } + + // Clear batch + h.batch = h.batch[:0] + h.lastFlush = time.Now() +} + +// Stop gracefully stops the handler and flushes remaining batches +func (h *ASHandler) Stop() { + close(h.stopCh) + h.wg.Wait() +} diff --git a/internal/routewatch/dbhandler.go b/internal/routewatch/dbhandler.go deleted file mode 100644 index dfbdaf1..0000000 --- a/internal/routewatch/dbhandler.go +++ /dev/null @@ -1,221 +0,0 @@ -package routewatch - -import ( - "sync" - "time" - - "git.eeqj.de/sneak/routewatch/internal/database" - "git.eeqj.de/sneak/routewatch/internal/logger" - "git.eeqj.de/sneak/routewatch/internal/ristypes" -) - -const ( - // dbHandlerQueueSize is the queue capacity for database operations - dbHandlerQueueSize = 200000 - - // batchSize is the number of operations to batch together - batchSize = 10000 - - // batchTimeout is the maximum time to wait before flushing a batch - batchTimeout = 2 * time.Second -) - -// DBHandler handles BGP messages and stores them in the database using batched operations -type DBHandler struct { - db database.Store - logger *logger.Logger - - // Batching - mu sync.Mutex - asnBatch []asnOp - peeringBatch []peeringOp - lastFlush time.Time - stopCh chan struct{} - wg sync.WaitGroup -} - -type asnOp struct { - number int - timestamp time.Time -} - -type peeringOp struct { - fromASN int - toASN int - timestamp time.Time -} - -// NewDBHandler creates a new batched database handler -func NewDBHandler( - db database.Store, - logger *logger.Logger, -) *DBHandler { - h := &DBHandler{ - db: db, - logger: logger, - asnBatch: make([]asnOp, 0, batchSize), - peeringBatch: make([]peeringOp, 0, batchSize), - lastFlush: time.Now(), - stopCh: make(chan struct{}), - } - - // Start the flush timer goroutine - h.wg.Add(1) - go h.flushLoop() - - return h -} - -// WantsMessage returns true if this handler wants to process messages of the given type -func (h *DBHandler) WantsMessage(messageType string) bool { - // We only care about UPDATE messages for the database - return messageType == "UPDATE" -} - -// QueueCapacity returns the desired queue capacity for this handler -func (h *DBHandler) QueueCapacity() int { - // Batching allows us to use a larger queue - return dbHandlerQueueSize -} - -// HandleMessage processes a RIS message and queues database operations -func (h *DBHandler) HandleMessage(msg *ristypes.RISMessage) { - // Use the pre-parsed timestamp - timestamp := msg.ParsedTimestamp - - // Get origin ASN from path (last element) - var originASN int - if len(msg.Path) > 0 { - originASN = msg.Path[len(msg.Path)-1] - } - - h.mu.Lock() - defer h.mu.Unlock() - - // Queue origin ASN operation - if originASN > 0 { - h.asnBatch = append(h.asnBatch, asnOp{ - number: originASN, - timestamp: timestamp, - }) - } - - // Process AS path to queue peering operations - if len(msg.Path) > 1 { - for i := range len(msg.Path) - 1 { - fromASN := msg.Path[i] - toASN := msg.Path[i+1] - - // Queue ASN operations - h.asnBatch = append(h.asnBatch, asnOp{ - number: fromASN, - timestamp: timestamp, - }) - h.asnBatch = append(h.asnBatch, asnOp{ - number: toASN, - timestamp: timestamp, - }) - - // Queue peering operation - h.peeringBatch = append(h.peeringBatch, peeringOp{ - fromASN: fromASN, - toASN: toASN, - timestamp: timestamp, - }) - } - } - - // Check if we need to flush - if len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize { - h.flushBatchesLocked() - } -} - -// flushLoop runs in a goroutine and periodically flushes batches -func (h *DBHandler) flushLoop() { - defer h.wg.Done() - ticker := time.NewTicker(batchTimeout) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - h.mu.Lock() - if time.Since(h.lastFlush) >= batchTimeout { - h.flushBatchesLocked() - } - h.mu.Unlock() - case <-h.stopCh: - // Final flush - h.mu.Lock() - h.flushBatchesLocked() - h.mu.Unlock() - - return - } - } -} - -// flushBatchesLocked flushes all batches to the database (must be called with mutex held) -func (h *DBHandler) flushBatchesLocked() { - if len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 { - return - } - - // Process ASNs first (deduped) - asnMap := make(map[int]time.Time) - for _, op := range h.asnBatch { - if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) { - asnMap[op.number] = op.timestamp - } - } - - asnCache := make(map[int]*database.ASN) - for asn, ts := range asnMap { - asnObj, err := h.db.GetOrCreateASN(asn, ts) - if err != nil { - h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err) - - continue - } - asnCache[asn] = asnObj - } - - // Process peerings (deduped) - type peeringKey struct { - from, to int - } - peeringMap := make(map[peeringKey]time.Time) - for _, op := range h.peeringBatch { - key := peeringKey{from: op.fromASN, to: op.toASN} - if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) { - peeringMap[key] = op.timestamp - } - } - - for key, ts := range peeringMap { - fromAS := asnCache[key.from] - toAS := asnCache[key.to] - if fromAS != nil && toAS != nil { - err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts) - if err != nil { - h.logger.Error("Failed to record peering", - "from_asn", key.from, - "to_asn", key.to, - "error", err, - ) - } - } - } - - // Clear batches - h.asnBatch = h.asnBatch[:0] - h.peeringBatch = h.peeringBatch[:0] - h.lastFlush = time.Now() -} - -// Stop gracefully stops the handler and flushes remaining batches -func (h *DBHandler) Stop() { - close(h.stopCh) - h.wg.Wait() -} diff --git a/internal/routewatch/peeringhandler.go b/internal/routewatch/peeringhandler.go new file mode 100644 index 0000000..62d5475 --- /dev/null +++ b/internal/routewatch/peeringhandler.go @@ -0,0 +1,248 @@ +package routewatch + +import ( + "encoding/json" + "sync" + "time" + + "git.eeqj.de/sneak/routewatch/internal/database" + "git.eeqj.de/sneak/routewatch/internal/logger" + "git.eeqj.de/sneak/routewatch/internal/ristypes" +) + +const ( + // peeringHandlerQueueSize is the queue capacity for peering operations + peeringHandlerQueueSize = 200000 + + // minPathLengthForPeering is the minimum AS path length to extract peerings + minPathLengthForPeering = 2 + + // pathExpirationTime is how long to keep AS paths in memory + pathExpirationTime = 30 * time.Minute + + // peeringProcessInterval is how often to process AS paths into peerings + peeringProcessInterval = 2 * time.Minute + + // pathPruneInterval is how often to prune old AS paths + pathPruneInterval = 5 * time.Minute +) + +// PeeringHandler handles AS peering relationships from BGP path data +type PeeringHandler struct { + db database.Store + logger *logger.Logger + + // In-memory AS path tracking + mu sync.RWMutex + asPaths map[string]time.Time // key is JSON-encoded AS path + asnCache map[int]*database.ASN + + stopCh chan struct{} +} + +// NewPeeringHandler creates a new batched peering handler +func NewPeeringHandler(db database.Store, logger *logger.Logger) *PeeringHandler { + h := &PeeringHandler{ + db: db, + logger: logger, + asPaths: make(map[string]time.Time), + asnCache: make(map[int]*database.ASN), + stopCh: make(chan struct{}), + } + + // Start the periodic processing goroutines + go h.processLoop() + go h.pruneLoop() + + return h +} + +// WantsMessage returns true if this handler wants to process messages of the given type +func (h *PeeringHandler) WantsMessage(messageType string) bool { + // We only care about UPDATE messages that have AS paths + return messageType == "UPDATE" +} + +// QueueCapacity returns the desired queue capacity for this handler +func (h *PeeringHandler) QueueCapacity() int { + return peeringHandlerQueueSize +} + +// HandleMessage processes a message to extract AS paths +func (h *PeeringHandler) HandleMessage(msg *ristypes.RISMessage) { + // Skip if no AS path or only one AS + if len(msg.Path) < minPathLengthForPeering { + return + } + + timestamp := msg.ParsedTimestamp + + // Encode AS path as JSON for use as map key + pathJSON, err := json.Marshal(msg.Path) + if err != nil { + h.logger.Error("Failed to encode AS path", "error", err) + + return + } + + h.mu.Lock() + h.asPaths[string(pathJSON)] = timestamp + h.mu.Unlock() +} + +// processLoop runs periodically to process AS paths into peerings +func (h *PeeringHandler) processLoop() { + ticker := time.NewTicker(peeringProcessInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + h.processPeerings() + case <-h.stopCh: + // Final processing + h.processPeerings() + + return + } + } +} + +// pruneLoop runs periodically to remove old AS paths +func (h *PeeringHandler) pruneLoop() { + ticker := time.NewTicker(pathPruneInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + h.prunePaths() + case <-h.stopCh: + return + } + } +} + +// prunePaths removes AS paths older than pathExpirationTime +func (h *PeeringHandler) prunePaths() { + cutoff := time.Now().Add(-pathExpirationTime) + var removed int + + h.mu.Lock() + for pathKey, timestamp := range h.asPaths { + if timestamp.Before(cutoff) { + delete(h.asPaths, pathKey) + removed++ + } + } + pathCount := len(h.asPaths) + h.mu.Unlock() + + if removed > 0 { + h.logger.Debug("Pruned old AS paths", "removed", removed, "remaining", pathCount) + } +} + +// ProcessPeeringsNow forces immediate processing of peerings (for testing) +func (h *PeeringHandler) ProcessPeeringsNow() { + h.processPeerings() +} + +// processPeerings extracts peerings from AS paths and writes to database +func (h *PeeringHandler) processPeerings() { + // Take a snapshot of current AS paths + h.mu.RLock() + pathsCopy := make(map[string]time.Time, len(h.asPaths)) + for k, v := range h.asPaths { + pathsCopy[k] = v + } + h.mu.RUnlock() + + if len(pathsCopy) == 0 { + return + } + + // Extract unique peerings from AS paths + type peeringKey struct { + low, high int + } + peerings := make(map[peeringKey]time.Time) + uniqueASNs := make(map[int]struct{}) + + for pathJSON, timestamp := range pathsCopy { + var path []int + if err := json.Unmarshal([]byte(pathJSON), &path); err != nil { + h.logger.Error("Failed to decode AS path", "error", err) + + continue + } + + // Extract peerings from path + for i := range len(path) - 1 { + asn1 := path[i] + asn2 := path[i+1] + + // Normalize: lower AS number first + low, high := asn1, asn2 + if low > high { + low, high = high, low + } + + key := peeringKey{low: low, high: high} + // Update timestamp if this is newer + if existing, ok := peerings[key]; !ok || timestamp.After(existing) { + peerings[key] = timestamp + } + + uniqueASNs[asn1] = struct{}{} + uniqueASNs[asn2] = struct{}{} + } + } + + // Get or create ASNs + for asn := range uniqueASNs { + if _, ok := h.asnCache[asn]; !ok { + asnObj, err := h.db.GetOrCreateASN(asn, time.Now()) + if err != nil { + h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err) + + continue + } + h.asnCache[asn] = asnObj + } + } + + // Record peerings in database + start := time.Now() + successCount := 0 + for key, ts := range peerings { + fromAS := h.asnCache[key.low] + toAS := h.asnCache[key.high] + if fromAS != nil && toAS != nil { + err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts) + if err != nil { + h.logger.Error("Failed to record peering", + "from_asn", key.low, + "to_asn", key.high, + "error", err, + ) + } else { + successCount++ + } + } + } + + h.logger.Info("Processed AS peerings", + "paths", len(pathsCopy), + "unique_peerings", len(peerings), + "success", successCount, + "duration", time.Since(start), + ) +} + +// Stop gracefully stops the handler and processes remaining peerings +func (h *PeeringHandler) Stop() { + close(h.stopCh) + // Process any remaining peerings synchronously + h.processPeerings() +}