Refactor database handlers and optimize PeeringHandler

- Create PeeringHandler for asn_peerings table maintenance
- Rename DBHandler to ASHandler (now only handles asns table)
- Move prefixes table maintenance to PrefixHandler
- Optimize PeeringHandler with in-memory AS path tracking:
  - Stores AS paths in memory with timestamps
  - Processes peerings in batch every 2 minutes
  - Prunes old paths (>30 minutes) every 5 minutes
  - Normalizes peerings with lower AS number first
- Each handler now has a single responsibility:
  - ASHandler: asns table
  - PeerHandler: bgp_peers table
  - PrefixHandler: prefixes and live_routes tables
  - PeeringHandler: asn_peerings table
This commit is contained in:
Jeffrey Paul 2025-07-28 02:31:04 +02:00
parent eaa11b5f8d
commit 1157003db7
5 changed files with 438 additions and 235 deletions

View File

@ -40,19 +40,20 @@ type Dependencies struct {
// RouteWatch represents the main application instance
type RouteWatch struct {
db database.Store
routingTable *routingtable.RoutingTable
streamer *streamer.Streamer
server *server.Server
snapshotter *snapshotter.Snapshotter
logger *logger.Logger
maxRuntime time.Duration
shutdown bool
mu sync.Mutex
config *config.Config
dbHandler *DBHandler
peerHandler *PeerHandler
prefixHandler *PrefixHandler
db database.Store
routingTable *routingtable.RoutingTable
streamer *streamer.Streamer
server *server.Server
snapshotter *snapshotter.Snapshotter
logger *logger.Logger
maxRuntime time.Duration
shutdown bool
mu sync.Mutex
config *config.Config
dbHandler *ASHandler
peerHandler *PeerHandler
prefixHandler *PrefixHandler
peeringHandler *PeeringHandler
}
// isTruthy returns true if the value is considered truthy
@ -107,14 +108,21 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
// Register database handler to process BGP UPDATE messages
if rw.config.EnableBatchedDatabaseWrites {
rw.logger.Info("Using batched database handlers for improved performance")
rw.dbHandler = NewDBHandler(rw.db, rw.logger)
// ASHandler maintains the asns table
rw.dbHandler = NewASHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.dbHandler)
// PeerHandler maintains the bgp_peers table
rw.peerHandler = NewPeerHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.peerHandler)
// PrefixHandler maintains the prefixes and live_routes tables
rw.prefixHandler = NewPrefixHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.prefixHandler)
// PeeringHandler maintains the asn_peerings table
rw.peeringHandler = NewPeeringHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.peeringHandler)
} else {
// Non-batched handlers not implemented yet
rw.logger.Error("Non-batched handlers not implemented")

View File

@ -224,6 +224,11 @@ func TestRouteWatchLiveFeed(t *testing.T) {
// Wait for the configured duration
time.Sleep(5 * time.Second)
// Force peering processing for test
if rw.peeringHandler != nil {
rw.peeringHandler.ProcessPeeringsNow()
}
// Get statistics
stats, err := mockDB.GetStats()
if err != nil {

View File

@ -0,0 +1,163 @@
package routewatch
import (
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
// asHandlerQueueSize is the queue capacity for ASN operations
asHandlerQueueSize = 200000
// asnBatchSize is the number of ASN operations to batch together
asnBatchSize = 10000
// asnBatchTimeout is the maximum time to wait before flushing a batch
asnBatchTimeout = 2 * time.Second
)
// ASHandler handles ASN information from BGP messages using batched operations
type ASHandler struct {
db database.Store
logger *logger.Logger
// Batching
mu sync.Mutex
batch []asnOp
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
}
type asnOp struct {
number int
timestamp time.Time
}
// NewASHandler creates a new batched ASN handler
func NewASHandler(db database.Store, logger *logger.Logger) *ASHandler {
h := &ASHandler{
db: db,
logger: logger,
batch: make([]asnOp, 0, asnBatchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
}
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
}
// WantsMessage returns true if this handler wants to process messages of the given type
func (h *ASHandler) WantsMessage(messageType string) bool {
// We only care about UPDATE messages for the database
return messageType == "UPDATE"
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *ASHandler) QueueCapacity() int {
// Batching allows us to use a larger queue
return asHandlerQueueSize
}
// HandleMessage processes a RIS message and queues database operations
func (h *ASHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp
timestamp := msg.ParsedTimestamp
// Get origin ASN from path (last element)
var originASN int
if len(msg.Path) > 0 {
originASN = msg.Path[len(msg.Path)-1]
}
h.mu.Lock()
defer h.mu.Unlock()
// Queue origin ASN operation
if originASN > 0 {
h.batch = append(h.batch, asnOp{
number: originASN,
timestamp: timestamp,
})
}
// Also track all ASNs in the path
for _, asn := range msg.Path {
if asn > 0 {
h.batch = append(h.batch, asnOp{
number: asn,
timestamp: timestamp,
})
}
}
// Check if we need to flush
if len(h.batch) >= asnBatchSize {
h.flushBatchLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *ASHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(asnBatchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= asnBatchTimeout {
h.flushBatchLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchLocked flushes the ASN batch to the database (must be called with mutex held)
func (h *ASHandler) flushBatchLocked() {
if len(h.batch) == 0 {
return
}
// Process ASNs first (deduped)
asnMap := make(map[int]time.Time)
for _, op := range h.batch {
if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) {
asnMap[op.number] = op.timestamp
}
}
for asn, ts := range asnMap {
_, err := h.db.GetOrCreateASN(asn, ts)
if err != nil {
h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err)
}
}
// Clear batch
h.batch = h.batch[:0]
h.lastFlush = time.Now()
}
// Stop gracefully stops the handler and flushes remaining batches
func (h *ASHandler) Stop() {
close(h.stopCh)
h.wg.Wait()
}

View File

@ -1,221 +0,0 @@
package routewatch
import (
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
// dbHandlerQueueSize is the queue capacity for database operations
dbHandlerQueueSize = 200000
// batchSize is the number of operations to batch together
batchSize = 10000
// batchTimeout is the maximum time to wait before flushing a batch
batchTimeout = 2 * time.Second
)
// DBHandler handles BGP messages and stores them in the database using batched operations
type DBHandler struct {
db database.Store
logger *logger.Logger
// Batching
mu sync.Mutex
asnBatch []asnOp
peeringBatch []peeringOp
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
}
type asnOp struct {
number int
timestamp time.Time
}
type peeringOp struct {
fromASN int
toASN int
timestamp time.Time
}
// NewDBHandler creates a new batched database handler
func NewDBHandler(
db database.Store,
logger *logger.Logger,
) *DBHandler {
h := &DBHandler{
db: db,
logger: logger,
asnBatch: make([]asnOp, 0, batchSize),
peeringBatch: make([]peeringOp, 0, batchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
}
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
}
// WantsMessage returns true if this handler wants to process messages of the given type
func (h *DBHandler) WantsMessage(messageType string) bool {
// We only care about UPDATE messages for the database
return messageType == "UPDATE"
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *DBHandler) QueueCapacity() int {
// Batching allows us to use a larger queue
return dbHandlerQueueSize
}
// HandleMessage processes a RIS message and queues database operations
func (h *DBHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp
timestamp := msg.ParsedTimestamp
// Get origin ASN from path (last element)
var originASN int
if len(msg.Path) > 0 {
originASN = msg.Path[len(msg.Path)-1]
}
h.mu.Lock()
defer h.mu.Unlock()
// Queue origin ASN operation
if originASN > 0 {
h.asnBatch = append(h.asnBatch, asnOp{
number: originASN,
timestamp: timestamp,
})
}
// Process AS path to queue peering operations
if len(msg.Path) > 1 {
for i := range len(msg.Path) - 1 {
fromASN := msg.Path[i]
toASN := msg.Path[i+1]
// Queue ASN operations
h.asnBatch = append(h.asnBatch, asnOp{
number: fromASN,
timestamp: timestamp,
})
h.asnBatch = append(h.asnBatch, asnOp{
number: toASN,
timestamp: timestamp,
})
// Queue peering operation
h.peeringBatch = append(h.peeringBatch, peeringOp{
fromASN: fromASN,
toASN: toASN,
timestamp: timestamp,
})
}
}
// Check if we need to flush
if len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize {
h.flushBatchesLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *DBHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(batchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= batchTimeout {
h.flushBatchesLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchesLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchesLocked flushes all batches to the database (must be called with mutex held)
func (h *DBHandler) flushBatchesLocked() {
if len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 {
return
}
// Process ASNs first (deduped)
asnMap := make(map[int]time.Time)
for _, op := range h.asnBatch {
if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) {
asnMap[op.number] = op.timestamp
}
}
asnCache := make(map[int]*database.ASN)
for asn, ts := range asnMap {
asnObj, err := h.db.GetOrCreateASN(asn, ts)
if err != nil {
h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err)
continue
}
asnCache[asn] = asnObj
}
// Process peerings (deduped)
type peeringKey struct {
from, to int
}
peeringMap := make(map[peeringKey]time.Time)
for _, op := range h.peeringBatch {
key := peeringKey{from: op.fromASN, to: op.toASN}
if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) {
peeringMap[key] = op.timestamp
}
}
for key, ts := range peeringMap {
fromAS := asnCache[key.from]
toAS := asnCache[key.to]
if fromAS != nil && toAS != nil {
err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts)
if err != nil {
h.logger.Error("Failed to record peering",
"from_asn", key.from,
"to_asn", key.to,
"error", err,
)
}
}
}
// Clear batches
h.asnBatch = h.asnBatch[:0]
h.peeringBatch = h.peeringBatch[:0]
h.lastFlush = time.Now()
}
// Stop gracefully stops the handler and flushes remaining batches
func (h *DBHandler) Stop() {
close(h.stopCh)
h.wg.Wait()
}

View File

@ -0,0 +1,248 @@
package routewatch
import (
"encoding/json"
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
// peeringHandlerQueueSize is the queue capacity for peering operations
peeringHandlerQueueSize = 200000
// minPathLengthForPeering is the minimum AS path length to extract peerings
minPathLengthForPeering = 2
// pathExpirationTime is how long to keep AS paths in memory
pathExpirationTime = 30 * time.Minute
// peeringProcessInterval is how often to process AS paths into peerings
peeringProcessInterval = 2 * time.Minute
// pathPruneInterval is how often to prune old AS paths
pathPruneInterval = 5 * time.Minute
)
// PeeringHandler handles AS peering relationships from BGP path data
type PeeringHandler struct {
db database.Store
logger *logger.Logger
// In-memory AS path tracking
mu sync.RWMutex
asPaths map[string]time.Time // key is JSON-encoded AS path
asnCache map[int]*database.ASN
stopCh chan struct{}
}
// NewPeeringHandler creates a new batched peering handler
func NewPeeringHandler(db database.Store, logger *logger.Logger) *PeeringHandler {
h := &PeeringHandler{
db: db,
logger: logger,
asPaths: make(map[string]time.Time),
asnCache: make(map[int]*database.ASN),
stopCh: make(chan struct{}),
}
// Start the periodic processing goroutines
go h.processLoop()
go h.pruneLoop()
return h
}
// WantsMessage returns true if this handler wants to process messages of the given type
func (h *PeeringHandler) WantsMessage(messageType string) bool {
// We only care about UPDATE messages that have AS paths
return messageType == "UPDATE"
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *PeeringHandler) QueueCapacity() int {
return peeringHandlerQueueSize
}
// HandleMessage processes a message to extract AS paths
func (h *PeeringHandler) HandleMessage(msg *ristypes.RISMessage) {
// Skip if no AS path or only one AS
if len(msg.Path) < minPathLengthForPeering {
return
}
timestamp := msg.ParsedTimestamp
// Encode AS path as JSON for use as map key
pathJSON, err := json.Marshal(msg.Path)
if err != nil {
h.logger.Error("Failed to encode AS path", "error", err)
return
}
h.mu.Lock()
h.asPaths[string(pathJSON)] = timestamp
h.mu.Unlock()
}
// processLoop runs periodically to process AS paths into peerings
func (h *PeeringHandler) processLoop() {
ticker := time.NewTicker(peeringProcessInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.processPeerings()
case <-h.stopCh:
// Final processing
h.processPeerings()
return
}
}
}
// pruneLoop runs periodically to remove old AS paths
func (h *PeeringHandler) pruneLoop() {
ticker := time.NewTicker(pathPruneInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.prunePaths()
case <-h.stopCh:
return
}
}
}
// prunePaths removes AS paths older than pathExpirationTime
func (h *PeeringHandler) prunePaths() {
cutoff := time.Now().Add(-pathExpirationTime)
var removed int
h.mu.Lock()
for pathKey, timestamp := range h.asPaths {
if timestamp.Before(cutoff) {
delete(h.asPaths, pathKey)
removed++
}
}
pathCount := len(h.asPaths)
h.mu.Unlock()
if removed > 0 {
h.logger.Debug("Pruned old AS paths", "removed", removed, "remaining", pathCount)
}
}
// ProcessPeeringsNow forces immediate processing of peerings (for testing)
func (h *PeeringHandler) ProcessPeeringsNow() {
h.processPeerings()
}
// processPeerings extracts peerings from AS paths and writes to database
func (h *PeeringHandler) processPeerings() {
// Take a snapshot of current AS paths
h.mu.RLock()
pathsCopy := make(map[string]time.Time, len(h.asPaths))
for k, v := range h.asPaths {
pathsCopy[k] = v
}
h.mu.RUnlock()
if len(pathsCopy) == 0 {
return
}
// Extract unique peerings from AS paths
type peeringKey struct {
low, high int
}
peerings := make(map[peeringKey]time.Time)
uniqueASNs := make(map[int]struct{})
for pathJSON, timestamp := range pathsCopy {
var path []int
if err := json.Unmarshal([]byte(pathJSON), &path); err != nil {
h.logger.Error("Failed to decode AS path", "error", err)
continue
}
// Extract peerings from path
for i := range len(path) - 1 {
asn1 := path[i]
asn2 := path[i+1]
// Normalize: lower AS number first
low, high := asn1, asn2
if low > high {
low, high = high, low
}
key := peeringKey{low: low, high: high}
// Update timestamp if this is newer
if existing, ok := peerings[key]; !ok || timestamp.After(existing) {
peerings[key] = timestamp
}
uniqueASNs[asn1] = struct{}{}
uniqueASNs[asn2] = struct{}{}
}
}
// Get or create ASNs
for asn := range uniqueASNs {
if _, ok := h.asnCache[asn]; !ok {
asnObj, err := h.db.GetOrCreateASN(asn, time.Now())
if err != nil {
h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err)
continue
}
h.asnCache[asn] = asnObj
}
}
// Record peerings in database
start := time.Now()
successCount := 0
for key, ts := range peerings {
fromAS := h.asnCache[key.low]
toAS := h.asnCache[key.high]
if fromAS != nil && toAS != nil {
err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts)
if err != nil {
h.logger.Error("Failed to record peering",
"from_asn", key.low,
"to_asn", key.high,
"error", err,
)
} else {
successCount++
}
}
}
h.logger.Info("Processed AS peerings",
"paths", len(pathsCopy),
"unique_peerings", len(peerings),
"success", successCount,
"duration", time.Since(start),
)
}
// Stop gracefully stops the handler and processes remaining peerings
func (h *PeeringHandler) Stop() {
close(h.stopCh)
// Process any remaining peerings synchronously
h.processPeerings()
}