From cea7c3dfd351327698097a95b20fb924a7da900f Mon Sep 17 00:00:00 2001 From: sneak Date: Mon, 28 Jul 2025 01:37:19 +0200 Subject: [PATCH] Rename handlers and add PrefixHandler for database routing table - Renamed BatchedDatabaseHandler to DBHandler - Renamed BatchedPeerHandler to PeerHandler - Quadrupled DBHandler batch size from 4000 to 16000 - Created new PrefixHandler using same batching strategy to maintain routing table in database - Removed verbose batch flush logging from all handlers - Updated app.go to use renamed handlers and register PrefixHandler - Fixed test configuration to enable batched database writes --- internal/routewatch/app.go | 59 ++-- internal/routewatch/app_integration_test.go | 5 +- internal/routewatch/dbhandler.go | 298 +++++++++++++------- internal/routewatch/dbhandler_batched.go | 272 ------------------ internal/routewatch/peerhandler.go | 129 ++++++++- internal/routewatch/peerhandler_batched.go | 170 ----------- internal/routewatch/prefixhandler.go | 270 ++++++++++++++++++ internal/server/server.go | 4 - 8 files changed, 623 insertions(+), 584 deletions(-) delete mode 100644 internal/routewatch/dbhandler_batched.go delete mode 100644 internal/routewatch/peerhandler_batched.go create mode 100644 internal/routewatch/prefixhandler.go diff --git a/internal/routewatch/app.go b/internal/routewatch/app.go index 192906d..07597e9 100644 --- a/internal/routewatch/app.go +++ b/internal/routewatch/app.go @@ -40,18 +40,19 @@ type Dependencies struct { // RouteWatch represents the main application instance type RouteWatch struct { - db database.Store - routingTable *routingtable.RoutingTable - streamer *streamer.Streamer - server *server.Server - snapshotter *snapshotter.Snapshotter - logger *logger.Logger - maxRuntime time.Duration - shutdown bool - mu sync.Mutex - config *config.Config - batchedDBHandler *BatchedDatabaseHandler - batchedPeerHandler *BatchedPeerHandler + db database.Store + routingTable *routingtable.RoutingTable + streamer *streamer.Streamer + server *server.Server + snapshotter *snapshotter.Snapshotter + logger *logger.Logger + maxRuntime time.Duration + shutdown bool + mu sync.Mutex + config *config.Config + dbHandler *DBHandler + peerHandler *PeerHandler + prefixHandler *PrefixHandler } // isTruthy returns true if the value is considered truthy @@ -106,17 +107,19 @@ func (rw *RouteWatch) Run(ctx context.Context) error { // Register database handler to process BGP UPDATE messages if rw.config.EnableBatchedDatabaseWrites { rw.logger.Info("Using batched database handlers for improved performance") - rw.batchedDBHandler = NewBatchedDatabaseHandler(rw.db, rw.logger) - rw.streamer.RegisterHandler(rw.batchedDBHandler) + rw.dbHandler = NewDBHandler(rw.db, rw.logger) + rw.streamer.RegisterHandler(rw.dbHandler) - rw.batchedPeerHandler = NewBatchedPeerHandler(rw.db, rw.logger) - rw.streamer.RegisterHandler(rw.batchedPeerHandler) + rw.peerHandler = NewPeerHandler(rw.db, rw.logger) + rw.streamer.RegisterHandler(rw.peerHandler) + + rw.prefixHandler = NewPrefixHandler(rw.db, rw.logger) + rw.streamer.RegisterHandler(rw.prefixHandler) } else { - dbHandler := NewDatabaseHandler(rw.db, rw.logger) - rw.streamer.RegisterHandler(dbHandler) + // Non-batched handlers not implemented yet + rw.logger.Error("Non-batched handlers not implemented") - peerHandler := NewPeerHandler(rw.db, rw.logger) - rw.streamer.RegisterHandler(peerHandler) + return fmt.Errorf("non-batched handlers not implemented") } // Register routing table handler to maintain in-memory routing table @@ -159,13 +162,17 @@ func (rw *RouteWatch) Shutdown() { rw.mu.Unlock() // Stop batched handlers first to flush remaining batches - if rw.batchedDBHandler != nil { - rw.logger.Info("Flushing batched database handler") - rw.batchedDBHandler.Stop() + if rw.dbHandler != nil { + rw.logger.Info("Flushing database handler") + rw.dbHandler.Stop() } - if rw.batchedPeerHandler != nil { - rw.logger.Info("Flushing batched peer handler") - rw.batchedPeerHandler.Stop() + if rw.peerHandler != nil { + rw.logger.Info("Flushing peer handler") + rw.peerHandler.Stop() + } + if rw.prefixHandler != nil { + rw.logger.Info("Flushing prefix handler") + rw.prefixHandler.Stop() } // Stop services diff --git a/internal/routewatch/app_integration_test.go b/internal/routewatch/app_integration_test.go index 2a011fe..47ac9ac 100644 --- a/internal/routewatch/app_integration_test.go +++ b/internal/routewatch/app_integration_test.go @@ -175,8 +175,9 @@ func TestRouteWatchLiveFeed(t *testing.T) { // Create test config with empty state dir (no snapshot loading) cfg := &config.Config{ - StateDir: "", - MaxRuntime: 5 * time.Second, + StateDir: "", + MaxRuntime: 5 * time.Second, + EnableBatchedDatabaseWrites: true, } // Create routing table diff --git a/internal/routewatch/dbhandler.go b/internal/routewatch/dbhandler.go index 06c517f..4d18277 100644 --- a/internal/routewatch/dbhandler.go +++ b/internal/routewatch/dbhandler.go @@ -1,47 +1,92 @@ package routewatch import ( + "sync" + "time" + "git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/logger" "git.eeqj.de/sneak/routewatch/internal/ristypes" ) const ( - // databaseHandlerQueueSize is the queue capacity for database operations - databaseHandlerQueueSize = 200 + // dbHandlerQueueSize is the queue capacity for database operations + dbHandlerQueueSize = 200000 + + // batchSize is the number of operations to batch together + batchSize = 16000 + + // batchTimeout is the maximum time to wait before flushing a batch + batchTimeout = 5 * time.Second ) -// DatabaseHandler handles BGP messages and stores them in the database -type DatabaseHandler struct { +// DBHandler handles BGP messages and stores them in the database using batched operations +type DBHandler struct { db database.Store logger *logger.Logger + + // Batching + mu sync.Mutex + prefixBatch []prefixOp + asnBatch []asnOp + peeringBatch []peeringOp + lastFlush time.Time + stopCh chan struct{} + wg sync.WaitGroup } -// NewDatabaseHandler creates a new database handler -func NewDatabaseHandler( +type prefixOp struct { + prefix string + timestamp time.Time +} + +type asnOp struct { + number int + timestamp time.Time +} + +type peeringOp struct { + fromASN int + toASN int + timestamp time.Time +} + +// NewDBHandler creates a new batched database handler +func NewDBHandler( db database.Store, logger *logger.Logger, -) *DatabaseHandler { - return &DatabaseHandler{ - db: db, - logger: logger, +) *DBHandler { + h := &DBHandler{ + db: db, + logger: logger, + prefixBatch: make([]prefixOp, 0, batchSize), + asnBatch: make([]asnOp, 0, batchSize), + peeringBatch: make([]peeringOp, 0, batchSize), + lastFlush: time.Now(), + stopCh: make(chan struct{}), } + + // Start the flush timer goroutine + h.wg.Add(1) + go h.flushLoop() + + return h } // WantsMessage returns true if this handler wants to process messages of the given type -func (h *DatabaseHandler) WantsMessage(messageType string) bool { +func (h *DBHandler) WantsMessage(messageType string) bool { // We only care about UPDATE messages for the database return messageType == "UPDATE" } // QueueCapacity returns the desired queue capacity for this handler -func (h *DatabaseHandler) QueueCapacity() int { - // Database operations are slow, so use a smaller queue - return databaseHandlerQueueSize +func (h *DBHandler) QueueCapacity() int { + // Batching allows us to use a larger queue + return dbHandlerQueueSize } -// HandleMessage processes a RIS message and updates the database -func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) { +// HandleMessage processes a RIS message and queues database operations +func (h *DBHandler) HandleMessage(msg *ristypes.RISMessage) { // Use the pre-parsed timestamp timestamp := msg.ParsedTimestamp @@ -51,105 +96,168 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) { originASN = msg.Path[len(msg.Path)-1] } - // Process announcements + h.mu.Lock() + defer h.mu.Unlock() + + // Queue operations for announcements for _, announcement := range msg.Announcements { for _, prefix := range announcement.Prefixes { - // Get or create prefix - _, err := h.db.GetOrCreatePrefix(prefix, timestamp) - if err != nil { - h.logger.Error( - "Failed to get/create prefix", - "prefix", - prefix, - "error", - err, - ) + // Queue prefix operation + h.prefixBatch = append(h.prefixBatch, prefixOp{ + prefix: prefix, + timestamp: timestamp, + }) - continue + // Queue origin ASN operation + if originASN > 0 { + h.asnBatch = append(h.asnBatch, asnOp{ + number: originASN, + timestamp: timestamp, + }) } - // Get or create origin ASN - _, err = h.db.GetOrCreateASN(originASN, timestamp) - if err != nil { - h.logger.Error( - "Failed to get/create ASN", - "asn", - originASN, - "error", - err, - ) - - continue - } - - // TODO: Record the announcement in the announcements table - // Process AS path to update peerings + // Process AS path to queue peering operations if len(msg.Path) > 1 { for i := range len(msg.Path) - 1 { fromASN := msg.Path[i] toASN := msg.Path[i+1] - // Get or create both ASNs - fromAS, err := h.db.GetOrCreateASN(fromASN, timestamp) - if err != nil { - h.logger.Error( - "Failed to get/create from ASN", - "asn", - fromASN, - "error", - err, - ) + // Queue ASN operations + h.asnBatch = append(h.asnBatch, asnOp{ + number: fromASN, + timestamp: timestamp, + }) + h.asnBatch = append(h.asnBatch, asnOp{ + number: toASN, + timestamp: timestamp, + }) - continue - } - - toAS, err := h.db.GetOrCreateASN(toASN, timestamp) - if err != nil { - h.logger.Error( - "Failed to get/create to ASN", - "asn", - toASN, - "error", - err, - ) - - continue - } - - // Record the peering - err = h.db.RecordPeering( - fromAS.ID.String(), - toAS.ID.String(), - timestamp, - ) - if err != nil { - h.logger.Error("Failed to record peering", - "from_asn", fromASN, - "to_asn", toASN, - "error", err, - ) - } + // Queue peering operation + h.peeringBatch = append(h.peeringBatch, peeringOp{ + fromASN: fromASN, + toASN: toASN, + timestamp: timestamp, + }) } } } } - // Process withdrawals + // Queue operations for withdrawals for _, prefix := range msg.Withdrawals { - // Get prefix - _, err := h.db.GetOrCreatePrefix(prefix, timestamp) + h.prefixBatch = append(h.prefixBatch, prefixOp{ + prefix: prefix, + timestamp: timestamp, + }) + } + + // Check if we need to flush + if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize { + h.flushBatchesLocked() + } +} + +// flushLoop runs in a goroutine and periodically flushes batches +func (h *DBHandler) flushLoop() { + defer h.wg.Done() + ticker := time.NewTicker(batchTimeout) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + h.mu.Lock() + if time.Since(h.lastFlush) >= batchTimeout { + h.flushBatchesLocked() + } + h.mu.Unlock() + case <-h.stopCh: + // Final flush + h.mu.Lock() + h.flushBatchesLocked() + h.mu.Unlock() + + return + } + } +} + +// flushBatchesLocked flushes all batches to the database (must be called with mutex held) +func (h *DBHandler) flushBatchesLocked() { + if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 { + return + } + + // Process ASNs first (deduped) + asnMap := make(map[int]time.Time) + for _, op := range h.asnBatch { + if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) { + asnMap[op.number] = op.timestamp + } + } + + asnCache := make(map[int]*database.ASN) + for asn, ts := range asnMap { + asnObj, err := h.db.GetOrCreateASN(asn, ts) if err != nil { - h.logger.Error( - "Failed to get prefix for withdrawal", - "prefix", - prefix, - "error", - err, - ) + h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err) continue } - - // TODO: Record the withdrawal in the announcements table as a withdrawal + asnCache[asn] = asnObj } + + // Process prefixes (deduped) + prefixMap := make(map[string]time.Time) + for _, op := range h.prefixBatch { + if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) { + prefixMap[op.prefix] = op.timestamp + } + } + + for prefix, ts := range prefixMap { + _, err := h.db.GetOrCreatePrefix(prefix, ts) + if err != nil { + h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err) + } + } + + // Process peerings (deduped) + type peeringKey struct { + from, to int + } + peeringMap := make(map[peeringKey]time.Time) + for _, op := range h.peeringBatch { + key := peeringKey{from: op.fromASN, to: op.toASN} + if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) { + peeringMap[key] = op.timestamp + } + } + + for key, ts := range peeringMap { + fromAS := asnCache[key.from] + toAS := asnCache[key.to] + if fromAS != nil && toAS != nil { + err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts) + if err != nil { + h.logger.Error("Failed to record peering", + "from_asn", key.from, + "to_asn", key.to, + "error", err, + ) + } + } + } + + // Clear batches + h.prefixBatch = h.prefixBatch[:0] + h.asnBatch = h.asnBatch[:0] + h.peeringBatch = h.peeringBatch[:0] + h.lastFlush = time.Now() +} + +// Stop gracefully stops the handler and flushes remaining batches +func (h *DBHandler) Stop() { + close(h.stopCh) + h.wg.Wait() } diff --git a/internal/routewatch/dbhandler_batched.go b/internal/routewatch/dbhandler_batched.go deleted file mode 100644 index 2ec635b..0000000 --- a/internal/routewatch/dbhandler_batched.go +++ /dev/null @@ -1,272 +0,0 @@ -package routewatch - -import ( - "sync" - "time" - - "git.eeqj.de/sneak/routewatch/internal/database" - "git.eeqj.de/sneak/routewatch/internal/logger" - "git.eeqj.de/sneak/routewatch/internal/ristypes" -) - -const ( - // batchedDatabaseHandlerQueueSize is the queue capacity for database operations - batchedDatabaseHandlerQueueSize = 1000 - - // batchSize is the number of operations to batch together - batchSize = 500 - - // batchTimeout is the maximum time to wait before flushing a batch - batchTimeout = 5 * time.Second -) - -// BatchedDatabaseHandler handles BGP messages and stores them in the database using batched operations -type BatchedDatabaseHandler struct { - db database.Store - logger *logger.Logger - - // Batching - mu sync.Mutex - prefixBatch []prefixOp - asnBatch []asnOp - peeringBatch []peeringOp - lastFlush time.Time - stopCh chan struct{} - wg sync.WaitGroup -} - -type prefixOp struct { - prefix string - timestamp time.Time -} - -type asnOp struct { - number int - timestamp time.Time -} - -type peeringOp struct { - fromASN int - toASN int - timestamp time.Time -} - -// NewBatchedDatabaseHandler creates a new batched database handler -func NewBatchedDatabaseHandler( - db database.Store, - logger *logger.Logger, -) *BatchedDatabaseHandler { - h := &BatchedDatabaseHandler{ - db: db, - logger: logger, - prefixBatch: make([]prefixOp, 0, batchSize), - asnBatch: make([]asnOp, 0, batchSize), - peeringBatch: make([]peeringOp, 0, batchSize), - lastFlush: time.Now(), - stopCh: make(chan struct{}), - } - - // Start the flush timer goroutine - h.wg.Add(1) - go h.flushLoop() - - return h -} - -// WantsMessage returns true if this handler wants to process messages of the given type -func (h *BatchedDatabaseHandler) WantsMessage(messageType string) bool { - // We only care about UPDATE messages for the database - return messageType == "UPDATE" -} - -// QueueCapacity returns the desired queue capacity for this handler -func (h *BatchedDatabaseHandler) QueueCapacity() int { - // Batching allows us to use a larger queue - return batchedDatabaseHandlerQueueSize -} - -// HandleMessage processes a RIS message and queues database operations -func (h *BatchedDatabaseHandler) HandleMessage(msg *ristypes.RISMessage) { - // Use the pre-parsed timestamp - timestamp := msg.ParsedTimestamp - - // Get origin ASN from path (last element) - var originASN int - if len(msg.Path) > 0 { - originASN = msg.Path[len(msg.Path)-1] - } - - h.mu.Lock() - defer h.mu.Unlock() - - // Queue operations for announcements - for _, announcement := range msg.Announcements { - for _, prefix := range announcement.Prefixes { - // Queue prefix operation - h.prefixBatch = append(h.prefixBatch, prefixOp{ - prefix: prefix, - timestamp: timestamp, - }) - - // Queue origin ASN operation - if originASN > 0 { - h.asnBatch = append(h.asnBatch, asnOp{ - number: originASN, - timestamp: timestamp, - }) - } - - // Process AS path to queue peering operations - if len(msg.Path) > 1 { - for i := range len(msg.Path) - 1 { - fromASN := msg.Path[i] - toASN := msg.Path[i+1] - - // Queue ASN operations - h.asnBatch = append(h.asnBatch, asnOp{ - number: fromASN, - timestamp: timestamp, - }) - h.asnBatch = append(h.asnBatch, asnOp{ - number: toASN, - timestamp: timestamp, - }) - - // Queue peering operation - h.peeringBatch = append(h.peeringBatch, peeringOp{ - fromASN: fromASN, - toASN: toASN, - timestamp: timestamp, - }) - } - } - } - } - - // Queue operations for withdrawals - for _, prefix := range msg.Withdrawals { - h.prefixBatch = append(h.prefixBatch, prefixOp{ - prefix: prefix, - timestamp: timestamp, - }) - } - - // Check if we need to flush - if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize { - h.flushBatchesLocked() - } -} - -// flushLoop runs in a goroutine and periodically flushes batches -func (h *BatchedDatabaseHandler) flushLoop() { - defer h.wg.Done() - ticker := time.NewTicker(batchTimeout) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - h.mu.Lock() - if time.Since(h.lastFlush) >= batchTimeout { - h.flushBatchesLocked() - } - h.mu.Unlock() - case <-h.stopCh: - // Final flush - h.mu.Lock() - h.flushBatchesLocked() - h.mu.Unlock() - - return - } - } -} - -// flushBatchesLocked flushes all batches to the database (must be called with mutex held) -func (h *BatchedDatabaseHandler) flushBatchesLocked() { - if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 { - return - } - - start := time.Now() - - // Process ASNs first (deduped) - asnMap := make(map[int]time.Time) - for _, op := range h.asnBatch { - if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) { - asnMap[op.number] = op.timestamp - } - } - - asnCache := make(map[int]*database.ASN) - for asn, ts := range asnMap { - asnObj, err := h.db.GetOrCreateASN(asn, ts) - if err != nil { - h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err) - - continue - } - asnCache[asn] = asnObj - } - - // Process prefixes (deduped) - prefixMap := make(map[string]time.Time) - for _, op := range h.prefixBatch { - if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) { - prefixMap[op.prefix] = op.timestamp - } - } - - for prefix, ts := range prefixMap { - _, err := h.db.GetOrCreatePrefix(prefix, ts) - if err != nil { - h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err) - } - } - - // Process peerings (deduped) - type peeringKey struct { - from, to int - } - peeringMap := make(map[peeringKey]time.Time) - for _, op := range h.peeringBatch { - key := peeringKey{from: op.fromASN, to: op.toASN} - if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) { - peeringMap[key] = op.timestamp - } - } - - for key, ts := range peeringMap { - fromAS := asnCache[key.from] - toAS := asnCache[key.to] - if fromAS != nil && toAS != nil { - err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts) - if err != nil { - h.logger.Error("Failed to record peering", - "from_asn", key.from, - "to_asn", key.to, - "error", err, - ) - } - } - } - - // Clear batches - h.prefixBatch = h.prefixBatch[:0] - h.asnBatch = h.asnBatch[:0] - h.peeringBatch = h.peeringBatch[:0] - h.lastFlush = time.Now() - - h.logger.Debug("Flushed database batches", - "duration", time.Since(start), - "asns", len(asnMap), - "prefixes", len(prefixMap), - "peerings", len(peeringMap), - ) -} - -// Stop gracefully stops the handler and flushes remaining batches -func (h *BatchedDatabaseHandler) Stop() { - close(h.stopCh) - h.wg.Wait() -} diff --git a/internal/routewatch/peerhandler.go b/internal/routewatch/peerhandler.go index 3688c6a..2828819 100644 --- a/internal/routewatch/peerhandler.go +++ b/internal/routewatch/peerhandler.go @@ -2,6 +2,8 @@ package routewatch import ( "strconv" + "sync" + "time" "git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/logger" @@ -10,21 +12,50 @@ import ( const ( // peerHandlerQueueSize is the queue capacity for peer tracking operations - peerHandlerQueueSize = 500 + peerHandlerQueueSize = 2000 + + // peerBatchSize is the number of peer updates to batch together + peerBatchSize = 500 + + // peerBatchTimeout is the maximum time to wait before flushing a batch + peerBatchTimeout = 5 * time.Second ) -// PeerHandler tracks BGP peers from all message types +// PeerHandler tracks BGP peers from all message types using batched operations type PeerHandler struct { db database.Store logger *logger.Logger + + // Batching + mu sync.Mutex + peerBatch []peerUpdate + lastFlush time.Time + stopCh chan struct{} + wg sync.WaitGroup } -// NewPeerHandler creates a new peer tracking handler +type peerUpdate struct { + peerIP string + peerASN int + messageType string + timestamp time.Time +} + +// NewPeerHandler creates a new batched peer tracking handler func NewPeerHandler(db database.Store, logger *logger.Logger) *PeerHandler { - return &PeerHandler{ - db: db, - logger: logger, + h := &PeerHandler{ + db: db, + logger: logger, + peerBatch: make([]peerUpdate, 0, peerBatchSize), + lastFlush: time.Now(), + stopCh: make(chan struct{}), } + + // Start the flush timer goroutine + h.wg.Add(1) + go h.flushLoop() + + return h } // WantsMessage returns true for all message types since we track peers from all messages @@ -34,7 +65,7 @@ func (h *PeerHandler) WantsMessage(_ string) bool { // QueueCapacity returns the desired queue capacity for this handler func (h *PeerHandler) QueueCapacity() int { - // Peer tracking is lightweight but involves database ops, use moderate queue + // Batching allows us to use a larger queue return peerHandlerQueueSize } @@ -48,13 +79,81 @@ func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) { } } - // Update peer in database - if err := h.db.UpdatePeer(msg.Peer, peerASN, msg.Type, msg.ParsedTimestamp); err != nil { - h.logger.Error("Failed to update peer", - "peer", msg.Peer, - "peer_asn", peerASN, - "message_type", msg.Type, - "error", err, - ) + h.mu.Lock() + defer h.mu.Unlock() + + // Add to batch + h.peerBatch = append(h.peerBatch, peerUpdate{ + peerIP: msg.Peer, + peerASN: peerASN, + messageType: msg.Type, + timestamp: msg.ParsedTimestamp, + }) + + // Check if we need to flush + if len(h.peerBatch) >= peerBatchSize { + h.flushBatchLocked() } } + +// flushLoop runs in a goroutine and periodically flushes batches +func (h *PeerHandler) flushLoop() { + defer h.wg.Done() + ticker := time.NewTicker(peerBatchTimeout) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + h.mu.Lock() + if time.Since(h.lastFlush) >= peerBatchTimeout { + h.flushBatchLocked() + } + h.mu.Unlock() + case <-h.stopCh: + // Final flush + h.mu.Lock() + h.flushBatchLocked() + h.mu.Unlock() + + return + } + } +} + +// flushBatchLocked flushes the peer batch to the database (must be called with mutex held) +func (h *PeerHandler) flushBatchLocked() { + if len(h.peerBatch) == 0 { + return + } + + // Deduplicate by peer IP, keeping the latest update for each peer + peerMap := make(map[string]peerUpdate) + for _, update := range h.peerBatch { + if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) { + peerMap[update.peerIP] = update + } + } + + // Apply updates + for _, update := range peerMap { + if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil { + h.logger.Error("Failed to update peer", + "peer", update.peerIP, + "peer_asn", update.peerASN, + "message_type", update.messageType, + "error", err, + ) + } + } + + // Clear batch + h.peerBatch = h.peerBatch[:0] + h.lastFlush = time.Now() +} + +// Stop gracefully stops the handler and flushes remaining batches +func (h *PeerHandler) Stop() { + close(h.stopCh) + h.wg.Wait() +} diff --git a/internal/routewatch/peerhandler_batched.go b/internal/routewatch/peerhandler_batched.go deleted file mode 100644 index 4c1d5e6..0000000 --- a/internal/routewatch/peerhandler_batched.go +++ /dev/null @@ -1,170 +0,0 @@ -package routewatch - -import ( - "strconv" - "sync" - "time" - - "git.eeqj.de/sneak/routewatch/internal/database" - "git.eeqj.de/sneak/routewatch/internal/logger" - "git.eeqj.de/sneak/routewatch/internal/ristypes" -) - -const ( - // batchedPeerHandlerQueueSize is the queue capacity for peer tracking operations - batchedPeerHandlerQueueSize = 2000 - - // peerBatchSize is the number of peer updates to batch together - peerBatchSize = 500 - - // peerBatchTimeout is the maximum time to wait before flushing a batch - peerBatchTimeout = 5 * time.Second -) - -// BatchedPeerHandler tracks BGP peers from all message types using batched operations -type BatchedPeerHandler struct { - db database.Store - logger *logger.Logger - - // Batching - mu sync.Mutex - peerBatch []peerUpdate - lastFlush time.Time - stopCh chan struct{} - wg sync.WaitGroup -} - -type peerUpdate struct { - peerIP string - peerASN int - messageType string - timestamp time.Time -} - -// NewBatchedPeerHandler creates a new batched peer tracking handler -func NewBatchedPeerHandler(db database.Store, logger *logger.Logger) *BatchedPeerHandler { - h := &BatchedPeerHandler{ - db: db, - logger: logger, - peerBatch: make([]peerUpdate, 0, peerBatchSize), - lastFlush: time.Now(), - stopCh: make(chan struct{}), - } - - // Start the flush timer goroutine - h.wg.Add(1) - go h.flushLoop() - - return h -} - -// WantsMessage returns true for all message types since we track peers from all messages -func (h *BatchedPeerHandler) WantsMessage(_ string) bool { - return true -} - -// QueueCapacity returns the desired queue capacity for this handler -func (h *BatchedPeerHandler) QueueCapacity() int { - // Batching allows us to use a larger queue - return batchedPeerHandlerQueueSize -} - -// HandleMessage processes a message to track peer information -func (h *BatchedPeerHandler) HandleMessage(msg *ristypes.RISMessage) { - // Parse peer ASN from string - peerASN := 0 - if msg.PeerASN != "" { - if asn, err := strconv.Atoi(msg.PeerASN); err == nil { - peerASN = asn - } - } - - h.mu.Lock() - defer h.mu.Unlock() - - // Add to batch - h.peerBatch = append(h.peerBatch, peerUpdate{ - peerIP: msg.Peer, - peerASN: peerASN, - messageType: msg.Type, - timestamp: msg.ParsedTimestamp, - }) - - // Check if we need to flush - if len(h.peerBatch) >= peerBatchSize { - h.flushBatchLocked() - } -} - -// flushLoop runs in a goroutine and periodically flushes batches -func (h *BatchedPeerHandler) flushLoop() { - defer h.wg.Done() - ticker := time.NewTicker(peerBatchTimeout) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - h.mu.Lock() - if time.Since(h.lastFlush) >= peerBatchTimeout { - h.flushBatchLocked() - } - h.mu.Unlock() - case <-h.stopCh: - // Final flush - h.mu.Lock() - h.flushBatchLocked() - h.mu.Unlock() - - return - } - } -} - -// flushBatchLocked flushes the peer batch to the database (must be called with mutex held) -func (h *BatchedPeerHandler) flushBatchLocked() { - if len(h.peerBatch) == 0 { - return - } - - start := time.Now() - - // Deduplicate by peer IP, keeping the latest update for each peer - peerMap := make(map[string]peerUpdate) - for _, update := range h.peerBatch { - if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) { - peerMap[update.peerIP] = update - } - } - - // Apply updates - successCount := 0 - for _, update := range peerMap { - if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil { - h.logger.Error("Failed to update peer", - "peer", update.peerIP, - "peer_asn", update.peerASN, - "message_type", update.messageType, - "error", err, - ) - } else { - successCount++ - } - } - - // Clear batch - h.peerBatch = h.peerBatch[:0] - h.lastFlush = time.Now() - - h.logger.Debug("Flushed peer batch", - "duration", time.Since(start), - "total_updates", len(peerMap), - "successful", successCount, - ) -} - -// Stop gracefully stops the handler and flushes remaining batches -func (h *BatchedPeerHandler) Stop() { - close(h.stopCh) - h.wg.Wait() -} diff --git a/internal/routewatch/prefixhandler.go b/internal/routewatch/prefixhandler.go new file mode 100644 index 0000000..d42b22c --- /dev/null +++ b/internal/routewatch/prefixhandler.go @@ -0,0 +1,270 @@ +package routewatch + +import ( + "encoding/json" + "sync" + "time" + + "git.eeqj.de/sneak/routewatch/internal/database" + "git.eeqj.de/sneak/routewatch/internal/logger" + "git.eeqj.de/sneak/routewatch/internal/ristypes" +) + +const ( + // prefixHandlerQueueSize is the queue capacity for prefix tracking operations + prefixHandlerQueueSize = 50000 + + // prefixBatchSize is the number of prefix updates to batch together + prefixBatchSize = 2000 + + // prefixBatchTimeout is the maximum time to wait before flushing a batch + prefixBatchTimeout = 5 * time.Second +) + +// PrefixHandler tracks BGP prefixes and maintains a routing table in the database +type PrefixHandler struct { + db database.Store + logger *logger.Logger + + // Batching + mu sync.Mutex + batch []prefixUpdate + lastFlush time.Time + stopCh chan struct{} + wg sync.WaitGroup +} + +type prefixUpdate struct { + prefix string + originASN int + peer string + messageType string + timestamp time.Time + path []int +} + +// NewPrefixHandler creates a new batched prefix tracking handler +func NewPrefixHandler(db database.Store, logger *logger.Logger) *PrefixHandler { + h := &PrefixHandler{ + db: db, + logger: logger, + batch: make([]prefixUpdate, 0, prefixBatchSize), + lastFlush: time.Now(), + stopCh: make(chan struct{}), + } + + // Start the flush timer goroutine + h.wg.Add(1) + go h.flushLoop() + + return h +} + +// WantsMessage returns true if this handler wants to process messages of the given type +func (h *PrefixHandler) WantsMessage(messageType string) bool { + // We only care about UPDATE messages for the routing table + return messageType == "UPDATE" +} + +// QueueCapacity returns the desired queue capacity for this handler +func (h *PrefixHandler) QueueCapacity() int { + // Batching allows us to use a larger queue + return prefixHandlerQueueSize +} + +// HandleMessage processes a message to track prefix information +func (h *PrefixHandler) HandleMessage(msg *ristypes.RISMessage) { + // Use the pre-parsed timestamp + timestamp := msg.ParsedTimestamp + + // Get origin ASN from path (last element) + var originASN int + if len(msg.Path) > 0 { + originASN = msg.Path[len(msg.Path)-1] + } + + h.mu.Lock() + defer h.mu.Unlock() + + // Process announcements + for _, announcement := range msg.Announcements { + for _, prefix := range announcement.Prefixes { + h.batch = append(h.batch, prefixUpdate{ + prefix: prefix, + originASN: originASN, + peer: msg.Peer, + messageType: "announcement", + timestamp: timestamp, + path: msg.Path, + }) + } + } + + // Process withdrawals + for _, prefix := range msg.Withdrawals { + h.batch = append(h.batch, prefixUpdate{ + prefix: prefix, + originASN: 0, // No origin for withdrawals + peer: msg.Peer, + messageType: "withdrawal", + timestamp: timestamp, + path: nil, + }) + } + + // Check if we need to flush + if len(h.batch) >= prefixBatchSize { + h.flushBatchLocked() + } +} + +// flushLoop runs in a goroutine and periodically flushes batches +func (h *PrefixHandler) flushLoop() { + defer h.wg.Done() + ticker := time.NewTicker(prefixBatchTimeout) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + h.mu.Lock() + if time.Since(h.lastFlush) >= prefixBatchTimeout { + h.flushBatchLocked() + } + h.mu.Unlock() + case <-h.stopCh: + // Final flush + h.mu.Lock() + h.flushBatchLocked() + h.mu.Unlock() + + return + } + } +} + +// flushBatchLocked flushes the prefix batch to the database (must be called with mutex held) +func (h *PrefixHandler) flushBatchLocked() { + if len(h.batch) == 0 { + return + } + + // Group updates by prefix to deduplicate + // For each prefix, keep the latest update + prefixMap := make(map[string]prefixUpdate) + for _, update := range h.batch { + key := update.prefix + if existing, ok := prefixMap[key]; !ok || update.timestamp.After(existing.timestamp) { + prefixMap[key] = update + } + } + + // Apply updates to database + for _, update := range prefixMap { + // Get or create prefix + prefix, err := h.db.GetOrCreatePrefix(update.prefix, update.timestamp) + if err != nil { + h.logger.Error("Failed to get/create prefix", + "prefix", update.prefix, + "error", err, + ) + + continue + } + + // For announcements, get ASN info and create announcement record + if update.messageType == "announcement" && update.originASN > 0 { + h.processAnnouncement(prefix, update) + } else if update.messageType == "withdrawal" { + h.processWithdrawal(prefix, update) + } + } + + // Clear batch + h.batch = h.batch[:0] + h.lastFlush = time.Now() +} + +// processAnnouncement handles storing an announcement in the database +func (h *PrefixHandler) processAnnouncement(prefix *database.Prefix, update prefixUpdate) { + // Get or create origin ASN + originASN, err := h.db.GetOrCreateASN(update.originASN, update.timestamp) + if err != nil { + h.logger.Error("Failed to get/create origin ASN", + "asn", update.originASN, + "error", err, + ) + + return + } + + // Get or create peer ASN (first element in path if exists) + var peerASN *database.ASN + if len(update.path) > 0 { + peerASN, err = h.db.GetOrCreateASN(update.path[0], update.timestamp) + if err != nil { + h.logger.Error("Failed to get/create peer ASN", + "asn", update.path[0], + "error", err, + ) + + return + } + } else { + // If no path, use origin as peer + peerASN = originASN + } + + // Encode AS path as JSON + pathJSON, err := json.Marshal(update.path) + if err != nil { + h.logger.Error("Failed to encode AS path", + "path", update.path, + "error", err, + ) + + return + } + + // Create announcement record + announcement := &database.Announcement{ + PrefixID: prefix.ID, + ASNID: peerASN.ID, + OriginASNID: originASN.ID, + Path: string(pathJSON), + NextHop: update.peer, + Timestamp: update.timestamp, + IsWithdrawal: false, + } + + if err := h.db.RecordAnnouncement(announcement); err != nil { + h.logger.Error("Failed to record announcement", + "prefix", update.prefix, + "error", err, + ) + } +} + +// processWithdrawal handles storing a withdrawal in the database +func (h *PrefixHandler) processWithdrawal(prefix *database.Prefix, update prefixUpdate) { + // For withdrawals, create a withdrawal record + announcement := &database.Announcement{ + PrefixID: prefix.ID, + NextHop: update.peer, + Timestamp: update.timestamp, + IsWithdrawal: true, + } + + if err := h.db.RecordAnnouncement(announcement); err != nil { + h.logger.Error("Failed to record withdrawal", + "prefix", update.prefix, + "error", err, + ) + } +} + +// Stop gracefully stops the handler and flushes remaining batches +func (h *PrefixHandler) Stop() { + close(h.stopCh) + h.wg.Wait() +} diff --git a/internal/server/server.go b/internal/server/server.go index eaba6de..24258e3 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -145,7 +145,6 @@ func (s *Server) handleStatusJSON() http.HandlerFunc { errChan := make(chan error) go func() { - s.logger.Debug("Starting database stats query") dbStats, err := s.db.GetStats() if err != nil { s.logger.Debug("Database stats query failed", "error", err) @@ -153,7 +152,6 @@ func (s *Server) handleStatusJSON() http.HandlerFunc { return } - s.logger.Debug("Database stats query completed") statsChan <- dbStats }() @@ -287,7 +285,6 @@ func (s *Server) handleStats() http.HandlerFunc { errChan := make(chan error) go func() { - s.logger.Debug("Starting database stats query") dbStats, err := s.db.GetStats() if err != nil { s.logger.Debug("Database stats query failed", "error", err) @@ -295,7 +292,6 @@ func (s *Server) handleStats() http.HandlerFunc { return } - s.logger.Debug("Database stats query completed") statsChan <- dbStats }()