Rename handlers and add PrefixHandler for database routing table

- Renamed BatchedDatabaseHandler to DBHandler
- Renamed BatchedPeerHandler to PeerHandler
- Quadrupled DBHandler batch size from 4000 to 16000
- Created new PrefixHandler using same batching strategy to maintain routing table in database
- Removed verbose batch flush logging from all handlers
- Updated app.go to use renamed handlers and register PrefixHandler
- Fixed test configuration to enable batched database writes
This commit is contained in:
Jeffrey Paul 2025-07-28 01:37:19 +02:00
parent 3aef3f9a07
commit cea7c3dfd3
8 changed files with 623 additions and 584 deletions

View File

@ -40,18 +40,19 @@ type Dependencies struct {
// RouteWatch represents the main application instance
type RouteWatch struct {
db database.Store
routingTable *routingtable.RoutingTable
streamer *streamer.Streamer
server *server.Server
snapshotter *snapshotter.Snapshotter
logger *logger.Logger
maxRuntime time.Duration
shutdown bool
mu sync.Mutex
config *config.Config
batchedDBHandler *BatchedDatabaseHandler
batchedPeerHandler *BatchedPeerHandler
db database.Store
routingTable *routingtable.RoutingTable
streamer *streamer.Streamer
server *server.Server
snapshotter *snapshotter.Snapshotter
logger *logger.Logger
maxRuntime time.Duration
shutdown bool
mu sync.Mutex
config *config.Config
dbHandler *DBHandler
peerHandler *PeerHandler
prefixHandler *PrefixHandler
}
// isTruthy returns true if the value is considered truthy
@ -106,17 +107,19 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
// Register database handler to process BGP UPDATE messages
if rw.config.EnableBatchedDatabaseWrites {
rw.logger.Info("Using batched database handlers for improved performance")
rw.batchedDBHandler = NewBatchedDatabaseHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.batchedDBHandler)
rw.dbHandler = NewDBHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.dbHandler)
rw.batchedPeerHandler = NewBatchedPeerHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.batchedPeerHandler)
rw.peerHandler = NewPeerHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.peerHandler)
rw.prefixHandler = NewPrefixHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.prefixHandler)
} else {
dbHandler := NewDatabaseHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(dbHandler)
// Non-batched handlers not implemented yet
rw.logger.Error("Non-batched handlers not implemented")
peerHandler := NewPeerHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(peerHandler)
return fmt.Errorf("non-batched handlers not implemented")
}
// Register routing table handler to maintain in-memory routing table
@ -159,13 +162,17 @@ func (rw *RouteWatch) Shutdown() {
rw.mu.Unlock()
// Stop batched handlers first to flush remaining batches
if rw.batchedDBHandler != nil {
rw.logger.Info("Flushing batched database handler")
rw.batchedDBHandler.Stop()
if rw.dbHandler != nil {
rw.logger.Info("Flushing database handler")
rw.dbHandler.Stop()
}
if rw.batchedPeerHandler != nil {
rw.logger.Info("Flushing batched peer handler")
rw.batchedPeerHandler.Stop()
if rw.peerHandler != nil {
rw.logger.Info("Flushing peer handler")
rw.peerHandler.Stop()
}
if rw.prefixHandler != nil {
rw.logger.Info("Flushing prefix handler")
rw.prefixHandler.Stop()
}
// Stop services

View File

@ -175,8 +175,9 @@ func TestRouteWatchLiveFeed(t *testing.T) {
// Create test config with empty state dir (no snapshot loading)
cfg := &config.Config{
StateDir: "",
MaxRuntime: 5 * time.Second,
StateDir: "",
MaxRuntime: 5 * time.Second,
EnableBatchedDatabaseWrites: true,
}
// Create routing table

View File

@ -1,47 +1,92 @@
package routewatch
import (
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
// databaseHandlerQueueSize is the queue capacity for database operations
databaseHandlerQueueSize = 200
// dbHandlerQueueSize is the queue capacity for database operations
dbHandlerQueueSize = 200000
// batchSize is the number of operations to batch together
batchSize = 16000
// batchTimeout is the maximum time to wait before flushing a batch
batchTimeout = 5 * time.Second
)
// DatabaseHandler handles BGP messages and stores them in the database
type DatabaseHandler struct {
// DBHandler handles BGP messages and stores them in the database using batched operations
type DBHandler struct {
db database.Store
logger *logger.Logger
// Batching
mu sync.Mutex
prefixBatch []prefixOp
asnBatch []asnOp
peeringBatch []peeringOp
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
}
// NewDatabaseHandler creates a new database handler
func NewDatabaseHandler(
type prefixOp struct {
prefix string
timestamp time.Time
}
type asnOp struct {
number int
timestamp time.Time
}
type peeringOp struct {
fromASN int
toASN int
timestamp time.Time
}
// NewDBHandler creates a new batched database handler
func NewDBHandler(
db database.Store,
logger *logger.Logger,
) *DatabaseHandler {
return &DatabaseHandler{
db: db,
logger: logger,
) *DBHandler {
h := &DBHandler{
db: db,
logger: logger,
prefixBatch: make([]prefixOp, 0, batchSize),
asnBatch: make([]asnOp, 0, batchSize),
peeringBatch: make([]peeringOp, 0, batchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
}
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
}
// WantsMessage returns true if this handler wants to process messages of the given type
func (h *DatabaseHandler) WantsMessage(messageType string) bool {
func (h *DBHandler) WantsMessage(messageType string) bool {
// We only care about UPDATE messages for the database
return messageType == "UPDATE"
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *DatabaseHandler) QueueCapacity() int {
// Database operations are slow, so use a smaller queue
return databaseHandlerQueueSize
func (h *DBHandler) QueueCapacity() int {
// Batching allows us to use a larger queue
return dbHandlerQueueSize
}
// HandleMessage processes a RIS message and updates the database
func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
// HandleMessage processes a RIS message and queues database operations
func (h *DBHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp
timestamp := msg.ParsedTimestamp
@ -51,105 +96,168 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
originASN = msg.Path[len(msg.Path)-1]
}
// Process announcements
h.mu.Lock()
defer h.mu.Unlock()
// Queue operations for announcements
for _, announcement := range msg.Announcements {
for _, prefix := range announcement.Prefixes {
// Get or create prefix
_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
if err != nil {
h.logger.Error(
"Failed to get/create prefix",
"prefix",
prefix,
"error",
err,
)
// Queue prefix operation
h.prefixBatch = append(h.prefixBatch, prefixOp{
prefix: prefix,
timestamp: timestamp,
})
continue
// Queue origin ASN operation
if originASN > 0 {
h.asnBatch = append(h.asnBatch, asnOp{
number: originASN,
timestamp: timestamp,
})
}
// Get or create origin ASN
_, err = h.db.GetOrCreateASN(originASN, timestamp)
if err != nil {
h.logger.Error(
"Failed to get/create ASN",
"asn",
originASN,
"error",
err,
)
continue
}
// TODO: Record the announcement in the announcements table
// Process AS path to update peerings
// Process AS path to queue peering operations
if len(msg.Path) > 1 {
for i := range len(msg.Path) - 1 {
fromASN := msg.Path[i]
toASN := msg.Path[i+1]
// Get or create both ASNs
fromAS, err := h.db.GetOrCreateASN(fromASN, timestamp)
if err != nil {
h.logger.Error(
"Failed to get/create from ASN",
"asn",
fromASN,
"error",
err,
)
// Queue ASN operations
h.asnBatch = append(h.asnBatch, asnOp{
number: fromASN,
timestamp: timestamp,
})
h.asnBatch = append(h.asnBatch, asnOp{
number: toASN,
timestamp: timestamp,
})
continue
}
toAS, err := h.db.GetOrCreateASN(toASN, timestamp)
if err != nil {
h.logger.Error(
"Failed to get/create to ASN",
"asn",
toASN,
"error",
err,
)
continue
}
// Record the peering
err = h.db.RecordPeering(
fromAS.ID.String(),
toAS.ID.String(),
timestamp,
)
if err != nil {
h.logger.Error("Failed to record peering",
"from_asn", fromASN,
"to_asn", toASN,
"error", err,
)
}
// Queue peering operation
h.peeringBatch = append(h.peeringBatch, peeringOp{
fromASN: fromASN,
toASN: toASN,
timestamp: timestamp,
})
}
}
}
}
// Process withdrawals
// Queue operations for withdrawals
for _, prefix := range msg.Withdrawals {
// Get prefix
_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
h.prefixBatch = append(h.prefixBatch, prefixOp{
prefix: prefix,
timestamp: timestamp,
})
}
// Check if we need to flush
if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize {
h.flushBatchesLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *DBHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(batchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= batchTimeout {
h.flushBatchesLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchesLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchesLocked flushes all batches to the database (must be called with mutex held)
func (h *DBHandler) flushBatchesLocked() {
if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 {
return
}
// Process ASNs first (deduped)
asnMap := make(map[int]time.Time)
for _, op := range h.asnBatch {
if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) {
asnMap[op.number] = op.timestamp
}
}
asnCache := make(map[int]*database.ASN)
for asn, ts := range asnMap {
asnObj, err := h.db.GetOrCreateASN(asn, ts)
if err != nil {
h.logger.Error(
"Failed to get prefix for withdrawal",
"prefix",
prefix,
"error",
err,
)
h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err)
continue
}
// TODO: Record the withdrawal in the announcements table as a withdrawal
asnCache[asn] = asnObj
}
// Process prefixes (deduped)
prefixMap := make(map[string]time.Time)
for _, op := range h.prefixBatch {
if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) {
prefixMap[op.prefix] = op.timestamp
}
}
for prefix, ts := range prefixMap {
_, err := h.db.GetOrCreatePrefix(prefix, ts)
if err != nil {
h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
}
}
// Process peerings (deduped)
type peeringKey struct {
from, to int
}
peeringMap := make(map[peeringKey]time.Time)
for _, op := range h.peeringBatch {
key := peeringKey{from: op.fromASN, to: op.toASN}
if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) {
peeringMap[key] = op.timestamp
}
}
for key, ts := range peeringMap {
fromAS := asnCache[key.from]
toAS := asnCache[key.to]
if fromAS != nil && toAS != nil {
err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts)
if err != nil {
h.logger.Error("Failed to record peering",
"from_asn", key.from,
"to_asn", key.to,
"error", err,
)
}
}
}
// Clear batches
h.prefixBatch = h.prefixBatch[:0]
h.asnBatch = h.asnBatch[:0]
h.peeringBatch = h.peeringBatch[:0]
h.lastFlush = time.Now()
}
// Stop gracefully stops the handler and flushes remaining batches
func (h *DBHandler) Stop() {
close(h.stopCh)
h.wg.Wait()
}

View File

@ -1,272 +0,0 @@
package routewatch
import (
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
// batchedDatabaseHandlerQueueSize is the queue capacity for database operations
batchedDatabaseHandlerQueueSize = 1000
// batchSize is the number of operations to batch together
batchSize = 500
// batchTimeout is the maximum time to wait before flushing a batch
batchTimeout = 5 * time.Second
)
// BatchedDatabaseHandler handles BGP messages and stores them in the database using batched operations
type BatchedDatabaseHandler struct {
db database.Store
logger *logger.Logger
// Batching
mu sync.Mutex
prefixBatch []prefixOp
asnBatch []asnOp
peeringBatch []peeringOp
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
}
type prefixOp struct {
prefix string
timestamp time.Time
}
type asnOp struct {
number int
timestamp time.Time
}
type peeringOp struct {
fromASN int
toASN int
timestamp time.Time
}
// NewBatchedDatabaseHandler creates a new batched database handler
func NewBatchedDatabaseHandler(
db database.Store,
logger *logger.Logger,
) *BatchedDatabaseHandler {
h := &BatchedDatabaseHandler{
db: db,
logger: logger,
prefixBatch: make([]prefixOp, 0, batchSize),
asnBatch: make([]asnOp, 0, batchSize),
peeringBatch: make([]peeringOp, 0, batchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
}
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
}
// WantsMessage returns true if this handler wants to process messages of the given type
func (h *BatchedDatabaseHandler) WantsMessage(messageType string) bool {
// We only care about UPDATE messages for the database
return messageType == "UPDATE"
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *BatchedDatabaseHandler) QueueCapacity() int {
// Batching allows us to use a larger queue
return batchedDatabaseHandlerQueueSize
}
// HandleMessage processes a RIS message and queues database operations
func (h *BatchedDatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp
timestamp := msg.ParsedTimestamp
// Get origin ASN from path (last element)
var originASN int
if len(msg.Path) > 0 {
originASN = msg.Path[len(msg.Path)-1]
}
h.mu.Lock()
defer h.mu.Unlock()
// Queue operations for announcements
for _, announcement := range msg.Announcements {
for _, prefix := range announcement.Prefixes {
// Queue prefix operation
h.prefixBatch = append(h.prefixBatch, prefixOp{
prefix: prefix,
timestamp: timestamp,
})
// Queue origin ASN operation
if originASN > 0 {
h.asnBatch = append(h.asnBatch, asnOp{
number: originASN,
timestamp: timestamp,
})
}
// Process AS path to queue peering operations
if len(msg.Path) > 1 {
for i := range len(msg.Path) - 1 {
fromASN := msg.Path[i]
toASN := msg.Path[i+1]
// Queue ASN operations
h.asnBatch = append(h.asnBatch, asnOp{
number: fromASN,
timestamp: timestamp,
})
h.asnBatch = append(h.asnBatch, asnOp{
number: toASN,
timestamp: timestamp,
})
// Queue peering operation
h.peeringBatch = append(h.peeringBatch, peeringOp{
fromASN: fromASN,
toASN: toASN,
timestamp: timestamp,
})
}
}
}
}
// Queue operations for withdrawals
for _, prefix := range msg.Withdrawals {
h.prefixBatch = append(h.prefixBatch, prefixOp{
prefix: prefix,
timestamp: timestamp,
})
}
// Check if we need to flush
if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize {
h.flushBatchesLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *BatchedDatabaseHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(batchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= batchTimeout {
h.flushBatchesLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchesLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchesLocked flushes all batches to the database (must be called with mutex held)
func (h *BatchedDatabaseHandler) flushBatchesLocked() {
if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 {
return
}
start := time.Now()
// Process ASNs first (deduped)
asnMap := make(map[int]time.Time)
for _, op := range h.asnBatch {
if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) {
asnMap[op.number] = op.timestamp
}
}
asnCache := make(map[int]*database.ASN)
for asn, ts := range asnMap {
asnObj, err := h.db.GetOrCreateASN(asn, ts)
if err != nil {
h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err)
continue
}
asnCache[asn] = asnObj
}
// Process prefixes (deduped)
prefixMap := make(map[string]time.Time)
for _, op := range h.prefixBatch {
if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) {
prefixMap[op.prefix] = op.timestamp
}
}
for prefix, ts := range prefixMap {
_, err := h.db.GetOrCreatePrefix(prefix, ts)
if err != nil {
h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
}
}
// Process peerings (deduped)
type peeringKey struct {
from, to int
}
peeringMap := make(map[peeringKey]time.Time)
for _, op := range h.peeringBatch {
key := peeringKey{from: op.fromASN, to: op.toASN}
if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) {
peeringMap[key] = op.timestamp
}
}
for key, ts := range peeringMap {
fromAS := asnCache[key.from]
toAS := asnCache[key.to]
if fromAS != nil && toAS != nil {
err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts)
if err != nil {
h.logger.Error("Failed to record peering",
"from_asn", key.from,
"to_asn", key.to,
"error", err,
)
}
}
}
// Clear batches
h.prefixBatch = h.prefixBatch[:0]
h.asnBatch = h.asnBatch[:0]
h.peeringBatch = h.peeringBatch[:0]
h.lastFlush = time.Now()
h.logger.Debug("Flushed database batches",
"duration", time.Since(start),
"asns", len(asnMap),
"prefixes", len(prefixMap),
"peerings", len(peeringMap),
)
}
// Stop gracefully stops the handler and flushes remaining batches
func (h *BatchedDatabaseHandler) Stop() {
close(h.stopCh)
h.wg.Wait()
}

View File

@ -2,6 +2,8 @@ package routewatch
import (
"strconv"
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
@ -10,21 +12,50 @@ import (
const (
// peerHandlerQueueSize is the queue capacity for peer tracking operations
peerHandlerQueueSize = 500
peerHandlerQueueSize = 2000
// peerBatchSize is the number of peer updates to batch together
peerBatchSize = 500
// peerBatchTimeout is the maximum time to wait before flushing a batch
peerBatchTimeout = 5 * time.Second
)
// PeerHandler tracks BGP peers from all message types
// PeerHandler tracks BGP peers from all message types using batched operations
type PeerHandler struct {
db database.Store
logger *logger.Logger
// Batching
mu sync.Mutex
peerBatch []peerUpdate
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
}
// NewPeerHandler creates a new peer tracking handler
type peerUpdate struct {
peerIP string
peerASN int
messageType string
timestamp time.Time
}
// NewPeerHandler creates a new batched peer tracking handler
func NewPeerHandler(db database.Store, logger *logger.Logger) *PeerHandler {
return &PeerHandler{
db: db,
logger: logger,
h := &PeerHandler{
db: db,
logger: logger,
peerBatch: make([]peerUpdate, 0, peerBatchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
}
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
}
// WantsMessage returns true for all message types since we track peers from all messages
@ -34,7 +65,7 @@ func (h *PeerHandler) WantsMessage(_ string) bool {
// QueueCapacity returns the desired queue capacity for this handler
func (h *PeerHandler) QueueCapacity() int {
// Peer tracking is lightweight but involves database ops, use moderate queue
// Batching allows us to use a larger queue
return peerHandlerQueueSize
}
@ -48,13 +79,81 @@ func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) {
}
}
// Update peer in database
if err := h.db.UpdatePeer(msg.Peer, peerASN, msg.Type, msg.ParsedTimestamp); err != nil {
h.logger.Error("Failed to update peer",
"peer", msg.Peer,
"peer_asn", peerASN,
"message_type", msg.Type,
"error", err,
)
h.mu.Lock()
defer h.mu.Unlock()
// Add to batch
h.peerBatch = append(h.peerBatch, peerUpdate{
peerIP: msg.Peer,
peerASN: peerASN,
messageType: msg.Type,
timestamp: msg.ParsedTimestamp,
})
// Check if we need to flush
if len(h.peerBatch) >= peerBatchSize {
h.flushBatchLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *PeerHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(peerBatchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= peerBatchTimeout {
h.flushBatchLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchLocked flushes the peer batch to the database (must be called with mutex held)
func (h *PeerHandler) flushBatchLocked() {
if len(h.peerBatch) == 0 {
return
}
// Deduplicate by peer IP, keeping the latest update for each peer
peerMap := make(map[string]peerUpdate)
for _, update := range h.peerBatch {
if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) {
peerMap[update.peerIP] = update
}
}
// Apply updates
for _, update := range peerMap {
if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil {
h.logger.Error("Failed to update peer",
"peer", update.peerIP,
"peer_asn", update.peerASN,
"message_type", update.messageType,
"error", err,
)
}
}
// Clear batch
h.peerBatch = h.peerBatch[:0]
h.lastFlush = time.Now()
}
// Stop gracefully stops the handler and flushes remaining batches
func (h *PeerHandler) Stop() {
close(h.stopCh)
h.wg.Wait()
}

View File

@ -1,170 +0,0 @@
package routewatch
import (
"strconv"
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
// batchedPeerHandlerQueueSize is the queue capacity for peer tracking operations
batchedPeerHandlerQueueSize = 2000
// peerBatchSize is the number of peer updates to batch together
peerBatchSize = 500
// peerBatchTimeout is the maximum time to wait before flushing a batch
peerBatchTimeout = 5 * time.Second
)
// BatchedPeerHandler tracks BGP peers from all message types using batched operations
type BatchedPeerHandler struct {
db database.Store
logger *logger.Logger
// Batching
mu sync.Mutex
peerBatch []peerUpdate
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
}
type peerUpdate struct {
peerIP string
peerASN int
messageType string
timestamp time.Time
}
// NewBatchedPeerHandler creates a new batched peer tracking handler
func NewBatchedPeerHandler(db database.Store, logger *logger.Logger) *BatchedPeerHandler {
h := &BatchedPeerHandler{
db: db,
logger: logger,
peerBatch: make([]peerUpdate, 0, peerBatchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
}
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
}
// WantsMessage returns true for all message types since we track peers from all messages
func (h *BatchedPeerHandler) WantsMessage(_ string) bool {
return true
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *BatchedPeerHandler) QueueCapacity() int {
// Batching allows us to use a larger queue
return batchedPeerHandlerQueueSize
}
// HandleMessage processes a message to track peer information
func (h *BatchedPeerHandler) HandleMessage(msg *ristypes.RISMessage) {
// Parse peer ASN from string
peerASN := 0
if msg.PeerASN != "" {
if asn, err := strconv.Atoi(msg.PeerASN); err == nil {
peerASN = asn
}
}
h.mu.Lock()
defer h.mu.Unlock()
// Add to batch
h.peerBatch = append(h.peerBatch, peerUpdate{
peerIP: msg.Peer,
peerASN: peerASN,
messageType: msg.Type,
timestamp: msg.ParsedTimestamp,
})
// Check if we need to flush
if len(h.peerBatch) >= peerBatchSize {
h.flushBatchLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *BatchedPeerHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(peerBatchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= peerBatchTimeout {
h.flushBatchLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchLocked flushes the peer batch to the database (must be called with mutex held)
func (h *BatchedPeerHandler) flushBatchLocked() {
if len(h.peerBatch) == 0 {
return
}
start := time.Now()
// Deduplicate by peer IP, keeping the latest update for each peer
peerMap := make(map[string]peerUpdate)
for _, update := range h.peerBatch {
if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) {
peerMap[update.peerIP] = update
}
}
// Apply updates
successCount := 0
for _, update := range peerMap {
if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil {
h.logger.Error("Failed to update peer",
"peer", update.peerIP,
"peer_asn", update.peerASN,
"message_type", update.messageType,
"error", err,
)
} else {
successCount++
}
}
// Clear batch
h.peerBatch = h.peerBatch[:0]
h.lastFlush = time.Now()
h.logger.Debug("Flushed peer batch",
"duration", time.Since(start),
"total_updates", len(peerMap),
"successful", successCount,
)
}
// Stop gracefully stops the handler and flushes remaining batches
func (h *BatchedPeerHandler) Stop() {
close(h.stopCh)
h.wg.Wait()
}

View File

@ -0,0 +1,270 @@
package routewatch
import (
"encoding/json"
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
// prefixHandlerQueueSize is the queue capacity for prefix tracking operations
prefixHandlerQueueSize = 50000
// prefixBatchSize is the number of prefix updates to batch together
prefixBatchSize = 2000
// prefixBatchTimeout is the maximum time to wait before flushing a batch
prefixBatchTimeout = 5 * time.Second
)
// PrefixHandler tracks BGP prefixes and maintains a routing table in the database
type PrefixHandler struct {
db database.Store
logger *logger.Logger
// Batching
mu sync.Mutex
batch []prefixUpdate
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
}
type prefixUpdate struct {
prefix string
originASN int
peer string
messageType string
timestamp time.Time
path []int
}
// NewPrefixHandler creates a new batched prefix tracking handler
func NewPrefixHandler(db database.Store, logger *logger.Logger) *PrefixHandler {
h := &PrefixHandler{
db: db,
logger: logger,
batch: make([]prefixUpdate, 0, prefixBatchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
}
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
}
// WantsMessage returns true if this handler wants to process messages of the given type
func (h *PrefixHandler) WantsMessage(messageType string) bool {
// We only care about UPDATE messages for the routing table
return messageType == "UPDATE"
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *PrefixHandler) QueueCapacity() int {
// Batching allows us to use a larger queue
return prefixHandlerQueueSize
}
// HandleMessage processes a message to track prefix information
func (h *PrefixHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp
timestamp := msg.ParsedTimestamp
// Get origin ASN from path (last element)
var originASN int
if len(msg.Path) > 0 {
originASN = msg.Path[len(msg.Path)-1]
}
h.mu.Lock()
defer h.mu.Unlock()
// Process announcements
for _, announcement := range msg.Announcements {
for _, prefix := range announcement.Prefixes {
h.batch = append(h.batch, prefixUpdate{
prefix: prefix,
originASN: originASN,
peer: msg.Peer,
messageType: "announcement",
timestamp: timestamp,
path: msg.Path,
})
}
}
// Process withdrawals
for _, prefix := range msg.Withdrawals {
h.batch = append(h.batch, prefixUpdate{
prefix: prefix,
originASN: 0, // No origin for withdrawals
peer: msg.Peer,
messageType: "withdrawal",
timestamp: timestamp,
path: nil,
})
}
// Check if we need to flush
if len(h.batch) >= prefixBatchSize {
h.flushBatchLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *PrefixHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(prefixBatchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= prefixBatchTimeout {
h.flushBatchLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchLocked flushes the prefix batch to the database (must be called with mutex held)
func (h *PrefixHandler) flushBatchLocked() {
if len(h.batch) == 0 {
return
}
// Group updates by prefix to deduplicate
// For each prefix, keep the latest update
prefixMap := make(map[string]prefixUpdate)
for _, update := range h.batch {
key := update.prefix
if existing, ok := prefixMap[key]; !ok || update.timestamp.After(existing.timestamp) {
prefixMap[key] = update
}
}
// Apply updates to database
for _, update := range prefixMap {
// Get or create prefix
prefix, err := h.db.GetOrCreatePrefix(update.prefix, update.timestamp)
if err != nil {
h.logger.Error("Failed to get/create prefix",
"prefix", update.prefix,
"error", err,
)
continue
}
// For announcements, get ASN info and create announcement record
if update.messageType == "announcement" && update.originASN > 0 {
h.processAnnouncement(prefix, update)
} else if update.messageType == "withdrawal" {
h.processWithdrawal(prefix, update)
}
}
// Clear batch
h.batch = h.batch[:0]
h.lastFlush = time.Now()
}
// processAnnouncement handles storing an announcement in the database
func (h *PrefixHandler) processAnnouncement(prefix *database.Prefix, update prefixUpdate) {
// Get or create origin ASN
originASN, err := h.db.GetOrCreateASN(update.originASN, update.timestamp)
if err != nil {
h.logger.Error("Failed to get/create origin ASN",
"asn", update.originASN,
"error", err,
)
return
}
// Get or create peer ASN (first element in path if exists)
var peerASN *database.ASN
if len(update.path) > 0 {
peerASN, err = h.db.GetOrCreateASN(update.path[0], update.timestamp)
if err != nil {
h.logger.Error("Failed to get/create peer ASN",
"asn", update.path[0],
"error", err,
)
return
}
} else {
// If no path, use origin as peer
peerASN = originASN
}
// Encode AS path as JSON
pathJSON, err := json.Marshal(update.path)
if err != nil {
h.logger.Error("Failed to encode AS path",
"path", update.path,
"error", err,
)
return
}
// Create announcement record
announcement := &database.Announcement{
PrefixID: prefix.ID,
ASNID: peerASN.ID,
OriginASNID: originASN.ID,
Path: string(pathJSON),
NextHop: update.peer,
Timestamp: update.timestamp,
IsWithdrawal: false,
}
if err := h.db.RecordAnnouncement(announcement); err != nil {
h.logger.Error("Failed to record announcement",
"prefix", update.prefix,
"error", err,
)
}
}
// processWithdrawal handles storing a withdrawal in the database
func (h *PrefixHandler) processWithdrawal(prefix *database.Prefix, update prefixUpdate) {
// For withdrawals, create a withdrawal record
announcement := &database.Announcement{
PrefixID: prefix.ID,
NextHop: update.peer,
Timestamp: update.timestamp,
IsWithdrawal: true,
}
if err := h.db.RecordAnnouncement(announcement); err != nil {
h.logger.Error("Failed to record withdrawal",
"prefix", update.prefix,
"error", err,
)
}
}
// Stop gracefully stops the handler and flushes remaining batches
func (h *PrefixHandler) Stop() {
close(h.stopCh)
h.wg.Wait()
}

View File

@ -145,7 +145,6 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
errChan := make(chan error)
go func() {
s.logger.Debug("Starting database stats query")
dbStats, err := s.db.GetStats()
if err != nil {
s.logger.Debug("Database stats query failed", "error", err)
@ -153,7 +152,6 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
return
}
s.logger.Debug("Database stats query completed")
statsChan <- dbStats
}()
@ -287,7 +285,6 @@ func (s *Server) handleStats() http.HandlerFunc {
errChan := make(chan error)
go func() {
s.logger.Debug("Starting database stats query")
dbStats, err := s.db.GetStats()
if err != nil {
s.logger.Debug("Database stats query failed", "error", err)
@ -295,7 +292,6 @@ func (s *Server) handleStats() http.HandlerFunc {
return
}
s.logger.Debug("Database stats query completed")
statsChan <- dbStats
}()