From 155c08d735501fa440b16cc308dcdebf2390bb0c Mon Sep 17 00:00:00 2001 From: sneak Date: Mon, 28 Jul 2025 01:01:27 +0200 Subject: [PATCH] Implement batched database operations for improved performance - Add BatchedDatabaseHandler that batches prefix, ASN, and peering operations - Add BatchedPeerHandler that batches peer update operations - Batch operations are deduped and flushed every 100-200ms or when batch size is reached - Add EnableBatchedDatabaseWrites config option (enabled by default) - Properly flush remaining batches on shutdown - This significantly reduces database write pressure and improves throughput --- internal/config/config.go | 8 +- internal/routewatch/app.go | 52 ++-- internal/routewatch/dbhandler_batched.go | 272 +++++++++++++++++++++ internal/routewatch/peerhandler_batched.go | 170 +++++++++++++ 4 files changed, 485 insertions(+), 17 deletions(-) create mode 100644 internal/routewatch/dbhandler_batched.go create mode 100644 internal/routewatch/peerhandler_batched.go diff --git a/internal/config/config.go b/internal/config/config.go index a59ff4a..1ae3c5e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -24,6 +24,9 @@ type Config struct { // MaxRuntime is the maximum runtime (0 = run forever) MaxRuntime time.Duration + + // EnableBatchedDatabaseWrites enables batched database operations for better performance + EnableBatchedDatabaseWrites bool } // New creates a new Config with default paths based on the OS @@ -34,8 +37,9 @@ func New() (*Config, error) { } return &Config{ - StateDir: stateDir, - MaxRuntime: 0, // Run forever by default + StateDir: stateDir, + MaxRuntime: 0, // Run forever by default + EnableBatchedDatabaseWrites: true, // Enable batching by default for better performance }, nil } diff --git a/internal/routewatch/app.go b/internal/routewatch/app.go index cbd704d..5bb9677 100644 --- a/internal/routewatch/app.go +++ b/internal/routewatch/app.go @@ -41,15 +41,18 @@ type Dependencies struct { // RouteWatch represents the main application instance type RouteWatch struct { - db database.Store - routingTable *routingtable.RoutingTable - streamer *streamer.Streamer - server *server.Server - snapshotter *snapshotter.Snapshotter - logger *slog.Logger - maxRuntime time.Duration - shutdown bool - mu sync.Mutex + db database.Store + routingTable *routingtable.RoutingTable + streamer *streamer.Streamer + server *server.Server + snapshotter *snapshotter.Snapshotter + logger *slog.Logger + maxRuntime time.Duration + shutdown bool + mu sync.Mutex + config *config.Config + batchedDBHandler *BatchedDatabaseHandler + batchedPeerHandler *BatchedPeerHandler } // isTruthy returns true if the value is considered truthy @@ -72,6 +75,7 @@ func New(deps Dependencies) *RouteWatch { server: deps.Server, logger: deps.Logger, maxRuntime: deps.Config.MaxRuntime, + config: deps.Config, } // Create snapshotter if enabled @@ -101,17 +105,25 @@ func (rw *RouteWatch) Run(ctx context.Context) error { } // Register database handler to process BGP UPDATE messages - dbHandler := NewDatabaseHandler(rw.db, rw.logger) - rw.streamer.RegisterHandler(dbHandler) + if rw.config.EnableBatchedDatabaseWrites { + rw.logger.Info("Using batched database handlers for improved performance") + rw.batchedDBHandler = NewBatchedDatabaseHandler(rw.db, rw.logger) + rw.streamer.RegisterHandler(rw.batchedDBHandler) + + rw.batchedPeerHandler = NewBatchedPeerHandler(rw.db, rw.logger) + rw.streamer.RegisterHandler(rw.batchedPeerHandler) + } else { + dbHandler := NewDatabaseHandler(rw.db, rw.logger) + rw.streamer.RegisterHandler(dbHandler) + + peerHandler := NewPeerHandler(rw.db, rw.logger) + rw.streamer.RegisterHandler(peerHandler) + } // Register routing table handler to maintain in-memory routing table rtHandler := NewRoutingTableHandler(rw.routingTable, rw.logger) rw.streamer.RegisterHandler(rtHandler) - // Register peer tracking handler to track all peers - peerHandler := NewPeerHandler(rw.db, rw.logger) - rw.streamer.RegisterHandler(peerHandler) - // Start periodic routing table stats logging go rw.logRoutingTableStats(ctx) @@ -147,6 +159,16 @@ func (rw *RouteWatch) Shutdown() { rw.shutdown = true rw.mu.Unlock() + // Stop batched handlers first to flush remaining batches + if rw.batchedDBHandler != nil { + rw.logger.Info("Flushing batched database handler") + rw.batchedDBHandler.Stop() + } + if rw.batchedPeerHandler != nil { + rw.logger.Info("Flushing batched peer handler") + rw.batchedPeerHandler.Stop() + } + // Stop services rw.streamer.Stop() diff --git a/internal/routewatch/dbhandler_batched.go b/internal/routewatch/dbhandler_batched.go new file mode 100644 index 0000000..a73baab --- /dev/null +++ b/internal/routewatch/dbhandler_batched.go @@ -0,0 +1,272 @@ +package routewatch + +import ( + "log/slog" + "sync" + "time" + + "git.eeqj.de/sneak/routewatch/internal/database" + "git.eeqj.de/sneak/routewatch/internal/ristypes" +) + +const ( + // batchedDatabaseHandlerQueueSize is the queue capacity for database operations + batchedDatabaseHandlerQueueSize = 1000 + + // batchSize is the number of operations to batch together + batchSize = 100 + + // batchTimeout is the maximum time to wait before flushing a batch + batchTimeout = 100 * time.Millisecond +) + +// BatchedDatabaseHandler handles BGP messages and stores them in the database using batched operations +type BatchedDatabaseHandler struct { + db database.Store + logger *slog.Logger + + // Batching + mu sync.Mutex + prefixBatch []prefixOp + asnBatch []asnOp + peeringBatch []peeringOp + lastFlush time.Time + stopCh chan struct{} + wg sync.WaitGroup +} + +type prefixOp struct { + prefix string + timestamp time.Time +} + +type asnOp struct { + number int + timestamp time.Time +} + +type peeringOp struct { + fromASN int + toASN int + timestamp time.Time +} + +// NewBatchedDatabaseHandler creates a new batched database handler +func NewBatchedDatabaseHandler( + db database.Store, + logger *slog.Logger, +) *BatchedDatabaseHandler { + h := &BatchedDatabaseHandler{ + db: db, + logger: logger, + prefixBatch: make([]prefixOp, 0, batchSize), + asnBatch: make([]asnOp, 0, batchSize), + peeringBatch: make([]peeringOp, 0, batchSize), + lastFlush: time.Now(), + stopCh: make(chan struct{}), + } + + // Start the flush timer goroutine + h.wg.Add(1) + go h.flushLoop() + + return h +} + +// WantsMessage returns true if this handler wants to process messages of the given type +func (h *BatchedDatabaseHandler) WantsMessage(messageType string) bool { + // We only care about UPDATE messages for the database + return messageType == "UPDATE" +} + +// QueueCapacity returns the desired queue capacity for this handler +func (h *BatchedDatabaseHandler) QueueCapacity() int { + // Batching allows us to use a larger queue + return batchedDatabaseHandlerQueueSize +} + +// HandleMessage processes a RIS message and queues database operations +func (h *BatchedDatabaseHandler) HandleMessage(msg *ristypes.RISMessage) { + // Use the pre-parsed timestamp + timestamp := msg.ParsedTimestamp + + // Get origin ASN from path (last element) + var originASN int + if len(msg.Path) > 0 { + originASN = msg.Path[len(msg.Path)-1] + } + + h.mu.Lock() + defer h.mu.Unlock() + + // Queue operations for announcements + for _, announcement := range msg.Announcements { + for _, prefix := range announcement.Prefixes { + // Queue prefix operation + h.prefixBatch = append(h.prefixBatch, prefixOp{ + prefix: prefix, + timestamp: timestamp, + }) + + // Queue origin ASN operation + if originASN > 0 { + h.asnBatch = append(h.asnBatch, asnOp{ + number: originASN, + timestamp: timestamp, + }) + } + + // Process AS path to queue peering operations + if len(msg.Path) > 1 { + for i := range len(msg.Path) - 1 { + fromASN := msg.Path[i] + toASN := msg.Path[i+1] + + // Queue ASN operations + h.asnBatch = append(h.asnBatch, asnOp{ + number: fromASN, + timestamp: timestamp, + }) + h.asnBatch = append(h.asnBatch, asnOp{ + number: toASN, + timestamp: timestamp, + }) + + // Queue peering operation + h.peeringBatch = append(h.peeringBatch, peeringOp{ + fromASN: fromASN, + toASN: toASN, + timestamp: timestamp, + }) + } + } + } + } + + // Queue operations for withdrawals + for _, prefix := range msg.Withdrawals { + h.prefixBatch = append(h.prefixBatch, prefixOp{ + prefix: prefix, + timestamp: timestamp, + }) + } + + // Check if we need to flush + if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize { + h.flushBatchesLocked() + } +} + +// flushLoop runs in a goroutine and periodically flushes batches +func (h *BatchedDatabaseHandler) flushLoop() { + defer h.wg.Done() + ticker := time.NewTicker(batchTimeout) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + h.mu.Lock() + if time.Since(h.lastFlush) >= batchTimeout { + h.flushBatchesLocked() + } + h.mu.Unlock() + case <-h.stopCh: + // Final flush + h.mu.Lock() + h.flushBatchesLocked() + h.mu.Unlock() + + return + } + } +} + +// flushBatchesLocked flushes all batches to the database (must be called with mutex held) +func (h *BatchedDatabaseHandler) flushBatchesLocked() { + if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 { + return + } + + start := time.Now() + + // Process ASNs first (deduped) + asnMap := make(map[int]time.Time) + for _, op := range h.asnBatch { + if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) { + asnMap[op.number] = op.timestamp + } + } + + asnCache := make(map[int]*database.ASN) + for asn, ts := range asnMap { + asnObj, err := h.db.GetOrCreateASN(asn, ts) + if err != nil { + h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err) + + continue + } + asnCache[asn] = asnObj + } + + // Process prefixes (deduped) + prefixMap := make(map[string]time.Time) + for _, op := range h.prefixBatch { + if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) { + prefixMap[op.prefix] = op.timestamp + } + } + + for prefix, ts := range prefixMap { + _, err := h.db.GetOrCreatePrefix(prefix, ts) + if err != nil { + h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err) + } + } + + // Process peerings (deduped) + type peeringKey struct { + from, to int + } + peeringMap := make(map[peeringKey]time.Time) + for _, op := range h.peeringBatch { + key := peeringKey{from: op.fromASN, to: op.toASN} + if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) { + peeringMap[key] = op.timestamp + } + } + + for key, ts := range peeringMap { + fromAS := asnCache[key.from] + toAS := asnCache[key.to] + if fromAS != nil && toAS != nil { + err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts) + if err != nil { + h.logger.Error("Failed to record peering", + "from_asn", key.from, + "to_asn", key.to, + "error", err, + ) + } + } + } + + // Clear batches + h.prefixBatch = h.prefixBatch[:0] + h.asnBatch = h.asnBatch[:0] + h.peeringBatch = h.peeringBatch[:0] + h.lastFlush = time.Now() + + h.logger.Debug("Flushed database batches", + "duration", time.Since(start), + "asns", len(asnMap), + "prefixes", len(prefixMap), + "peerings", len(peeringMap), + ) +} + +// Stop gracefully stops the handler and flushes remaining batches +func (h *BatchedDatabaseHandler) Stop() { + close(h.stopCh) + h.wg.Wait() +} diff --git a/internal/routewatch/peerhandler_batched.go b/internal/routewatch/peerhandler_batched.go new file mode 100644 index 0000000..d0aac62 --- /dev/null +++ b/internal/routewatch/peerhandler_batched.go @@ -0,0 +1,170 @@ +package routewatch + +import ( + "log/slog" + "strconv" + "sync" + "time" + + "git.eeqj.de/sneak/routewatch/internal/database" + "git.eeqj.de/sneak/routewatch/internal/ristypes" +) + +const ( + // batchedPeerHandlerQueueSize is the queue capacity for peer tracking operations + batchedPeerHandlerQueueSize = 2000 + + // peerBatchSize is the number of peer updates to batch together + peerBatchSize = 50 + + // peerBatchTimeout is the maximum time to wait before flushing a batch + peerBatchTimeout = 200 * time.Millisecond +) + +// BatchedPeerHandler tracks BGP peers from all message types using batched operations +type BatchedPeerHandler struct { + db database.Store + logger *slog.Logger + + // Batching + mu sync.Mutex + peerBatch []peerUpdate + lastFlush time.Time + stopCh chan struct{} + wg sync.WaitGroup +} + +type peerUpdate struct { + peerIP string + peerASN int + messageType string + timestamp time.Time +} + +// NewBatchedPeerHandler creates a new batched peer tracking handler +func NewBatchedPeerHandler(db database.Store, logger *slog.Logger) *BatchedPeerHandler { + h := &BatchedPeerHandler{ + db: db, + logger: logger, + peerBatch: make([]peerUpdate, 0, peerBatchSize), + lastFlush: time.Now(), + stopCh: make(chan struct{}), + } + + // Start the flush timer goroutine + h.wg.Add(1) + go h.flushLoop() + + return h +} + +// WantsMessage returns true for all message types since we track peers from all messages +func (h *BatchedPeerHandler) WantsMessage(_ string) bool { + return true +} + +// QueueCapacity returns the desired queue capacity for this handler +func (h *BatchedPeerHandler) QueueCapacity() int { + // Batching allows us to use a larger queue + return batchedPeerHandlerQueueSize +} + +// HandleMessage processes a message to track peer information +func (h *BatchedPeerHandler) HandleMessage(msg *ristypes.RISMessage) { + // Parse peer ASN from string + peerASN := 0 + if msg.PeerASN != "" { + if asn, err := strconv.Atoi(msg.PeerASN); err == nil { + peerASN = asn + } + } + + h.mu.Lock() + defer h.mu.Unlock() + + // Add to batch + h.peerBatch = append(h.peerBatch, peerUpdate{ + peerIP: msg.Peer, + peerASN: peerASN, + messageType: msg.Type, + timestamp: msg.ParsedTimestamp, + }) + + // Check if we need to flush + if len(h.peerBatch) >= peerBatchSize { + h.flushBatchLocked() + } +} + +// flushLoop runs in a goroutine and periodically flushes batches +func (h *BatchedPeerHandler) flushLoop() { + defer h.wg.Done() + ticker := time.NewTicker(peerBatchTimeout) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + h.mu.Lock() + if time.Since(h.lastFlush) >= peerBatchTimeout { + h.flushBatchLocked() + } + h.mu.Unlock() + case <-h.stopCh: + // Final flush + h.mu.Lock() + h.flushBatchLocked() + h.mu.Unlock() + + return + } + } +} + +// flushBatchLocked flushes the peer batch to the database (must be called with mutex held) +func (h *BatchedPeerHandler) flushBatchLocked() { + if len(h.peerBatch) == 0 { + return + } + + start := time.Now() + + // Deduplicate by peer IP, keeping the latest update for each peer + peerMap := make(map[string]peerUpdate) + for _, update := range h.peerBatch { + if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) { + peerMap[update.peerIP] = update + } + } + + // Apply updates + successCount := 0 + for _, update := range peerMap { + if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil { + h.logger.Error("Failed to update peer", + "peer", update.peerIP, + "peer_asn", update.peerASN, + "message_type", update.messageType, + "error", err, + ) + } else { + successCount++ + } + } + + // Clear batch + h.peerBatch = h.peerBatch[:0] + h.lastFlush = time.Now() + + h.logger.Debug("Flushed peer batch", + "duration", time.Since(start), + "total_updates", len(peerMap), + "successful", successCount, + ) +} + +// Stop gracefully stops the handler and flushes remaining batches +func (h *BatchedPeerHandler) Stop() { + close(h.stopCh) + h.wg.Wait() +}