package routewatch import ( "strconv" "sync" "time" "git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/logger" "git.eeqj.de/sneak/routewatch/internal/ristypes" ) const ( // batchedPeerHandlerQueueSize is the queue capacity for peer tracking operations batchedPeerHandlerQueueSize = 2000 // peerBatchSize is the number of peer updates to batch together peerBatchSize = 500 // peerBatchTimeout is the maximum time to wait before flushing a batch peerBatchTimeout = 5 * time.Second ) // BatchedPeerHandler tracks BGP peers from all message types using batched operations type BatchedPeerHandler struct { db database.Store logger *logger.Logger // Batching mu sync.Mutex peerBatch []peerUpdate lastFlush time.Time stopCh chan struct{} wg sync.WaitGroup } type peerUpdate struct { peerIP string peerASN int messageType string timestamp time.Time } // NewBatchedPeerHandler creates a new batched peer tracking handler func NewBatchedPeerHandler(db database.Store, logger *logger.Logger) *BatchedPeerHandler { h := &BatchedPeerHandler{ db: db, logger: logger, peerBatch: make([]peerUpdate, 0, peerBatchSize), lastFlush: time.Now(), stopCh: make(chan struct{}), } // Start the flush timer goroutine h.wg.Add(1) go h.flushLoop() return h } // WantsMessage returns true for all message types since we track peers from all messages func (h *BatchedPeerHandler) WantsMessage(_ string) bool { return true } // QueueCapacity returns the desired queue capacity for this handler func (h *BatchedPeerHandler) QueueCapacity() int { // Batching allows us to use a larger queue return batchedPeerHandlerQueueSize } // HandleMessage processes a message to track peer information func (h *BatchedPeerHandler) HandleMessage(msg *ristypes.RISMessage) { // Parse peer ASN from string peerASN := 0 if msg.PeerASN != "" { if asn, err := strconv.Atoi(msg.PeerASN); err == nil { peerASN = asn } } h.mu.Lock() defer h.mu.Unlock() // Add to batch h.peerBatch = append(h.peerBatch, peerUpdate{ peerIP: msg.Peer, peerASN: peerASN, messageType: msg.Type, timestamp: msg.ParsedTimestamp, }) // Check if we need to flush if len(h.peerBatch) >= peerBatchSize { h.flushBatchLocked() } } // flushLoop runs in a goroutine and periodically flushes batches func (h *BatchedPeerHandler) flushLoop() { defer h.wg.Done() ticker := time.NewTicker(peerBatchTimeout) defer ticker.Stop() for { select { case <-ticker.C: h.mu.Lock() if time.Since(h.lastFlush) >= peerBatchTimeout { h.flushBatchLocked() } h.mu.Unlock() case <-h.stopCh: // Final flush h.mu.Lock() h.flushBatchLocked() h.mu.Unlock() return } } } // flushBatchLocked flushes the peer batch to the database (must be called with mutex held) func (h *BatchedPeerHandler) flushBatchLocked() { if len(h.peerBatch) == 0 { return } start := time.Now() // Deduplicate by peer IP, keeping the latest update for each peer peerMap := make(map[string]peerUpdate) for _, update := range h.peerBatch { if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) { peerMap[update.peerIP] = update } } // Apply updates successCount := 0 for _, update := range peerMap { if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil { h.logger.Error("Failed to update peer", "peer", update.peerIP, "peer_asn", update.peerASN, "message_type", update.messageType, "error", err, ) } else { successCount++ } } // Clear batch h.peerBatch = h.peerBatch[:0] h.lastFlush = time.Now() h.logger.Debug("Flushed peer batch", "duration", time.Since(start), "total_updates", len(peerMap), "successful", successCount, ) } // Stop gracefully stops the handler and flushes remaining batches func (h *BatchedPeerHandler) Stop() { close(h.stopCh) h.wg.Wait() }