package routewatch import ( "strconv" "sync" "time" "git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/logger" "git.eeqj.de/sneak/routewatch/internal/ristypes" ) const ( // peerHandlerQueueSize is the queue capacity for peer tracking operations peerHandlerQueueSize = 100000 // peerBatchSize is the number of peer updates to batch together peerBatchSize = 10000 // peerBatchTimeout is the maximum time to wait before flushing a batch peerBatchTimeout = 2 * time.Second ) // PeerHandler tracks BGP peers from all message types using batched operations type PeerHandler struct { db database.Store logger *logger.Logger // Batching mu sync.Mutex peerBatch []peerUpdate lastFlush time.Time stopCh chan struct{} wg sync.WaitGroup } type peerUpdate struct { peerIP string peerASN int messageType string timestamp time.Time } // NewPeerHandler creates a new batched peer tracking handler func NewPeerHandler(db database.Store, logger *logger.Logger) *PeerHandler { h := &PeerHandler{ db: db, logger: logger, peerBatch: make([]peerUpdate, 0, peerBatchSize), lastFlush: time.Now(), stopCh: make(chan struct{}), } // Start the flush timer goroutine h.wg.Add(1) go h.flushLoop() return h } // WantsMessage returns true for all message types since we track peers from all messages func (h *PeerHandler) WantsMessage(_ string) bool { return true } // QueueCapacity returns the desired queue capacity for this handler func (h *PeerHandler) QueueCapacity() int { // Batching allows us to use a larger queue return peerHandlerQueueSize } // HandleMessage processes a message to track peer information func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) { // Parse peer ASN from string peerASN := 0 if msg.PeerASN != "" { if asn, err := strconv.Atoi(msg.PeerASN); err == nil { peerASN = asn } } h.mu.Lock() defer h.mu.Unlock() // Add to batch h.peerBatch = append(h.peerBatch, peerUpdate{ peerIP: msg.Peer, peerASN: peerASN, messageType: msg.Type, timestamp: msg.ParsedTimestamp, }) // Check if we need to flush if len(h.peerBatch) >= peerBatchSize { h.flushBatchLocked() } } // flushLoop runs in a goroutine and periodically flushes batches func (h *PeerHandler) flushLoop() { defer h.wg.Done() ticker := time.NewTicker(peerBatchTimeout) defer ticker.Stop() for { select { case <-ticker.C: h.mu.Lock() if time.Since(h.lastFlush) >= peerBatchTimeout { h.flushBatchLocked() } h.mu.Unlock() case <-h.stopCh: // Final flush h.mu.Lock() h.flushBatchLocked() h.mu.Unlock() return } } } // flushBatchLocked flushes the peer batch to the database (must be called with mutex held) func (h *PeerHandler) flushBatchLocked() { if len(h.peerBatch) == 0 { return } // Deduplicate by peer IP, keeping the latest update for each peer peerMap := make(map[string]peerUpdate) for _, update := range h.peerBatch { if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) { peerMap[update.peerIP] = update } } // Convert to database format dbPeerMap := make(map[string]database.PeerUpdate) for peerIP, update := range peerMap { dbPeerMap[peerIP] = database.PeerUpdate{ PeerIP: update.peerIP, PeerASN: update.peerASN, MessageType: update.messageType, Timestamp: update.timestamp, } } // Process all peers in a single batch transaction if err := h.db.UpdatePeerBatch(dbPeerMap); err != nil { h.logger.Error("Failed to process peer batch", "error", err, "count", len(dbPeerMap)) } // Clear batch h.peerBatch = h.peerBatch[:0] h.lastFlush = time.Now() } // Stop gracefully stops the handler and flushes remaining batches func (h *PeerHandler) Stop() { close(h.stopCh) h.wg.Wait() }