routewatch/internal/routewatch/prefixhandler.go

274 lines
6.9 KiB
Go

package routewatch
import (
"net"
"strings"
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
"github.com/google/uuid"
)
const (
// prefixHandlerQueueSize is the queue capacity for prefix tracking operations
prefixHandlerQueueSize = 100000
// prefixBatchSize is the number of prefix updates to batch together
prefixBatchSize = 25000
// prefixBatchTimeout is the maximum time to wait before flushing a batch
prefixBatchTimeout = 2 * time.Second
// IP version constants
ipv4Version = 4
ipv6Version = 6
)
// PrefixHandler tracks BGP prefixes and maintains a live routing table in the database.
// Routes are added on announcement and deleted on withdrawal.
type PrefixHandler struct {
db database.Store
logger *logger.Logger
// Batching
mu sync.Mutex
batch []prefixUpdate
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
}
type prefixUpdate struct {
prefix string
originASN int
peer string
messageType string
timestamp time.Time
path []int
}
// NewPrefixHandler creates a new batched prefix tracking handler
func NewPrefixHandler(db database.Store, logger *logger.Logger) *PrefixHandler {
h := &PrefixHandler{
db: db,
logger: logger,
batch: make([]prefixUpdate, 0, prefixBatchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
}
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
}
// WantsMessage returns true if this handler wants to process messages of the given type
func (h *PrefixHandler) WantsMessage(messageType string) bool {
// We only care about UPDATE messages for the routing table
return messageType == "UPDATE"
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *PrefixHandler) QueueCapacity() int {
// Batching allows us to use a larger queue
return prefixHandlerQueueSize
}
// HandleMessage processes a message to track prefix information
func (h *PrefixHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp
timestamp := msg.ParsedTimestamp
// Get origin ASN from path (last element)
var originASN int
if len(msg.Path) > 0 {
originASN = msg.Path[len(msg.Path)-1]
}
h.mu.Lock()
defer h.mu.Unlock()
// Process announcements
for _, announcement := range msg.Announcements {
for _, prefix := range announcement.Prefixes {
h.batch = append(h.batch, prefixUpdate{
prefix: prefix,
originASN: originASN,
peer: msg.Peer,
messageType: "announcement",
timestamp: timestamp,
path: msg.Path,
})
}
}
// Process withdrawals
for _, prefix := range msg.Withdrawals {
h.batch = append(h.batch, prefixUpdate{
prefix: prefix,
originASN: originASN, // Use the originASN from path if available
peer: msg.Peer,
messageType: "withdrawal",
timestamp: timestamp,
path: msg.Path,
})
}
// Check if we need to flush
if len(h.batch) >= prefixBatchSize {
h.flushBatchLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *PrefixHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(prefixBatchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= prefixBatchTimeout {
h.flushBatchLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchLocked flushes the prefix batch to the database (must be called with mutex held)
func (h *PrefixHandler) flushBatchLocked() {
if len(h.batch) == 0 {
return
}
// Group updates by prefix to deduplicate
// For each prefix, keep the latest update
prefixMap := make(map[string]prefixUpdate)
for _, update := range h.batch {
key := update.prefix
if existing, ok := prefixMap[key]; !ok || update.timestamp.After(existing.timestamp) {
prefixMap[key] = update
}
}
// Apply updates to database
for _, update := range prefixMap {
// Get or create prefix (this maintains the prefixes table)
prefix, err := h.db.GetOrCreatePrefix(update.prefix, update.timestamp)
if err != nil {
h.logger.Error("Failed to get/create prefix",
"prefix", update.prefix,
"error", err,
)
continue
}
// For announcements, get ASN info and create announcement record
if update.messageType == "announcement" && update.originASN > 0 {
h.processAnnouncement(prefix, update)
} else if update.messageType == "withdrawal" {
h.processWithdrawal(prefix, update)
}
}
// Clear batch
h.batch = h.batch[:0]
h.lastFlush = time.Now()
}
// parseCIDR extracts the mask length and IP version from a prefix string
func parseCIDR(prefix string) (maskLength int, ipVersion int, err error) {
_, ipNet, err := net.ParseCIDR(prefix)
if err != nil {
return 0, 0, err
}
ones, _ := ipNet.Mask.Size()
if strings.Contains(prefix, ":") {
return ones, ipv6Version, nil
}
return ones, ipv4Version, nil
}
// processAnnouncement handles storing an announcement in the database
func (h *PrefixHandler) processAnnouncement(_ *database.Prefix, update prefixUpdate) {
// Parse CIDR to get mask length
maskLength, ipVersion, err := parseCIDR(update.prefix)
if err != nil {
h.logger.Error("Failed to parse CIDR",
"prefix", update.prefix,
"error", err,
)
return
}
// Create live route record
liveRoute := &database.LiveRoute{
ID: uuid.New(),
Prefix: update.prefix,
MaskLength: maskLength,
IPVersion: ipVersion,
OriginASN: update.originASN,
PeerIP: update.peer,
ASPath: update.path,
NextHop: update.peer, // Using peer as next hop
LastUpdated: update.timestamp,
}
if err := h.db.UpsertLiveRoute(liveRoute); err != nil {
h.logger.Error("Failed to upsert live route",
"prefix", update.prefix,
"error", err,
)
}
}
// processWithdrawal handles removing a route from the live routing table
func (h *PrefixHandler) processWithdrawal(_ *database.Prefix, update prefixUpdate) {
// For withdrawals, we need to delete the route from live_routes
// Since we have the origin ASN from the update, we can delete the specific route
if update.originASN > 0 {
if err := h.db.DeleteLiveRoute(update.prefix, update.originASN, update.peer); err != nil {
h.logger.Error("Failed to delete live route",
"prefix", update.prefix,
"origin_asn", update.originASN,
"peer", update.peer,
"error", err,
)
}
} else {
// If no origin ASN, just delete all routes for this prefix from this peer
if err := h.db.DeleteLiveRoute(update.prefix, 0, update.peer); err != nil {
h.logger.Error("Failed to delete live route (no origin ASN)",
"prefix", update.prefix,
"peer", update.peer,
"error", err,
)
}
}
}
// Stop gracefully stops the handler and flushes remaining batches
func (h *PrefixHandler) Stop() {
close(h.stopCh)
h.wg.Wait()
}