Implement batched database operations for improved performance
- Add BatchedDatabaseHandler that batches prefix, ASN, and peering operations - Add BatchedPeerHandler that batches peer update operations - Batch operations are deduped and flushed every 100-200ms or when batch size is reached - Add EnableBatchedDatabaseWrites config option (enabled by default) - Properly flush remaining batches on shutdown - This significantly reduces database write pressure and improves throughput
This commit is contained in:
parent
d15a5e91b9
commit
155c08d735
@ -24,6 +24,9 @@ type Config struct {
|
||||
|
||||
// MaxRuntime is the maximum runtime (0 = run forever)
|
||||
MaxRuntime time.Duration
|
||||
|
||||
// EnableBatchedDatabaseWrites enables batched database operations for better performance
|
||||
EnableBatchedDatabaseWrites bool
|
||||
}
|
||||
|
||||
// New creates a new Config with default paths based on the OS
|
||||
@ -34,8 +37,9 @@ func New() (*Config, error) {
|
||||
}
|
||||
|
||||
return &Config{
|
||||
StateDir: stateDir,
|
||||
MaxRuntime: 0, // Run forever by default
|
||||
StateDir: stateDir,
|
||||
MaxRuntime: 0, // Run forever by default
|
||||
EnableBatchedDatabaseWrites: true, // Enable batching by default for better performance
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -41,15 +41,18 @@ type Dependencies struct {
|
||||
|
||||
// RouteWatch represents the main application instance
|
||||
type RouteWatch struct {
|
||||
db database.Store
|
||||
routingTable *routingtable.RoutingTable
|
||||
streamer *streamer.Streamer
|
||||
server *server.Server
|
||||
snapshotter *snapshotter.Snapshotter
|
||||
logger *slog.Logger
|
||||
maxRuntime time.Duration
|
||||
shutdown bool
|
||||
mu sync.Mutex
|
||||
db database.Store
|
||||
routingTable *routingtable.RoutingTable
|
||||
streamer *streamer.Streamer
|
||||
server *server.Server
|
||||
snapshotter *snapshotter.Snapshotter
|
||||
logger *slog.Logger
|
||||
maxRuntime time.Duration
|
||||
shutdown bool
|
||||
mu sync.Mutex
|
||||
config *config.Config
|
||||
batchedDBHandler *BatchedDatabaseHandler
|
||||
batchedPeerHandler *BatchedPeerHandler
|
||||
}
|
||||
|
||||
// isTruthy returns true if the value is considered truthy
|
||||
@ -72,6 +75,7 @@ func New(deps Dependencies) *RouteWatch {
|
||||
server: deps.Server,
|
||||
logger: deps.Logger,
|
||||
maxRuntime: deps.Config.MaxRuntime,
|
||||
config: deps.Config,
|
||||
}
|
||||
|
||||
// Create snapshotter if enabled
|
||||
@ -101,17 +105,25 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// Register database handler to process BGP UPDATE messages
|
||||
dbHandler := NewDatabaseHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(dbHandler)
|
||||
if rw.config.EnableBatchedDatabaseWrites {
|
||||
rw.logger.Info("Using batched database handlers for improved performance")
|
||||
rw.batchedDBHandler = NewBatchedDatabaseHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(rw.batchedDBHandler)
|
||||
|
||||
rw.batchedPeerHandler = NewBatchedPeerHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(rw.batchedPeerHandler)
|
||||
} else {
|
||||
dbHandler := NewDatabaseHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(dbHandler)
|
||||
|
||||
peerHandler := NewPeerHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(peerHandler)
|
||||
}
|
||||
|
||||
// Register routing table handler to maintain in-memory routing table
|
||||
rtHandler := NewRoutingTableHandler(rw.routingTable, rw.logger)
|
||||
rw.streamer.RegisterHandler(rtHandler)
|
||||
|
||||
// Register peer tracking handler to track all peers
|
||||
peerHandler := NewPeerHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(peerHandler)
|
||||
|
||||
// Start periodic routing table stats logging
|
||||
go rw.logRoutingTableStats(ctx)
|
||||
|
||||
@ -147,6 +159,16 @@ func (rw *RouteWatch) Shutdown() {
|
||||
rw.shutdown = true
|
||||
rw.mu.Unlock()
|
||||
|
||||
// Stop batched handlers first to flush remaining batches
|
||||
if rw.batchedDBHandler != nil {
|
||||
rw.logger.Info("Flushing batched database handler")
|
||||
rw.batchedDBHandler.Stop()
|
||||
}
|
||||
if rw.batchedPeerHandler != nil {
|
||||
rw.logger.Info("Flushing batched peer handler")
|
||||
rw.batchedPeerHandler.Stop()
|
||||
}
|
||||
|
||||
// Stop services
|
||||
rw.streamer.Stop()
|
||||
|
||||
|
272
internal/routewatch/dbhandler_batched.go
Normal file
272
internal/routewatch/dbhandler_batched.go
Normal file
@ -0,0 +1,272 @@
|
||||
package routewatch
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||
)
|
||||
|
||||
const (
|
||||
// batchedDatabaseHandlerQueueSize is the queue capacity for database operations
|
||||
batchedDatabaseHandlerQueueSize = 1000
|
||||
|
||||
// batchSize is the number of operations to batch together
|
||||
batchSize = 100
|
||||
|
||||
// batchTimeout is the maximum time to wait before flushing a batch
|
||||
batchTimeout = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
// BatchedDatabaseHandler handles BGP messages and stores them in the database using batched operations
|
||||
type BatchedDatabaseHandler struct {
|
||||
db database.Store
|
||||
logger *slog.Logger
|
||||
|
||||
// Batching
|
||||
mu sync.Mutex
|
||||
prefixBatch []prefixOp
|
||||
asnBatch []asnOp
|
||||
peeringBatch []peeringOp
|
||||
lastFlush time.Time
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type prefixOp struct {
|
||||
prefix string
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
type asnOp struct {
|
||||
number int
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
type peeringOp struct {
|
||||
fromASN int
|
||||
toASN int
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// NewBatchedDatabaseHandler creates a new batched database handler
|
||||
func NewBatchedDatabaseHandler(
|
||||
db database.Store,
|
||||
logger *slog.Logger,
|
||||
) *BatchedDatabaseHandler {
|
||||
h := &BatchedDatabaseHandler{
|
||||
db: db,
|
||||
logger: logger,
|
||||
prefixBatch: make([]prefixOp, 0, batchSize),
|
||||
asnBatch: make([]asnOp, 0, batchSize),
|
||||
peeringBatch: make([]peeringOp, 0, batchSize),
|
||||
lastFlush: time.Now(),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start the flush timer goroutine
|
||||
h.wg.Add(1)
|
||||
go h.flushLoop()
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
// WantsMessage returns true if this handler wants to process messages of the given type
|
||||
func (h *BatchedDatabaseHandler) WantsMessage(messageType string) bool {
|
||||
// We only care about UPDATE messages for the database
|
||||
return messageType == "UPDATE"
|
||||
}
|
||||
|
||||
// QueueCapacity returns the desired queue capacity for this handler
|
||||
func (h *BatchedDatabaseHandler) QueueCapacity() int {
|
||||
// Batching allows us to use a larger queue
|
||||
return batchedDatabaseHandlerQueueSize
|
||||
}
|
||||
|
||||
// HandleMessage processes a RIS message and queues database operations
|
||||
func (h *BatchedDatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
// Use the pre-parsed timestamp
|
||||
timestamp := msg.ParsedTimestamp
|
||||
|
||||
// Get origin ASN from path (last element)
|
||||
var originASN int
|
||||
if len(msg.Path) > 0 {
|
||||
originASN = msg.Path[len(msg.Path)-1]
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Queue operations for announcements
|
||||
for _, announcement := range msg.Announcements {
|
||||
for _, prefix := range announcement.Prefixes {
|
||||
// Queue prefix operation
|
||||
h.prefixBatch = append(h.prefixBatch, prefixOp{
|
||||
prefix: prefix,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
|
||||
// Queue origin ASN operation
|
||||
if originASN > 0 {
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: originASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
// Process AS path to queue peering operations
|
||||
if len(msg.Path) > 1 {
|
||||
for i := range len(msg.Path) - 1 {
|
||||
fromASN := msg.Path[i]
|
||||
toASN := msg.Path[i+1]
|
||||
|
||||
// Queue ASN operations
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: fromASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: toASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
|
||||
// Queue peering operation
|
||||
h.peeringBatch = append(h.peeringBatch, peeringOp{
|
||||
fromASN: fromASN,
|
||||
toASN: toASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Queue operations for withdrawals
|
||||
for _, prefix := range msg.Withdrawals {
|
||||
h.prefixBatch = append(h.prefixBatch, prefixOp{
|
||||
prefix: prefix,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
// Check if we need to flush
|
||||
if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize {
|
||||
h.flushBatchesLocked()
|
||||
}
|
||||
}
|
||||
|
||||
// flushLoop runs in a goroutine and periodically flushes batches
|
||||
func (h *BatchedDatabaseHandler) flushLoop() {
|
||||
defer h.wg.Done()
|
||||
ticker := time.NewTicker(batchTimeout)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
h.mu.Lock()
|
||||
if time.Since(h.lastFlush) >= batchTimeout {
|
||||
h.flushBatchesLocked()
|
||||
}
|
||||
h.mu.Unlock()
|
||||
case <-h.stopCh:
|
||||
// Final flush
|
||||
h.mu.Lock()
|
||||
h.flushBatchesLocked()
|
||||
h.mu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flushBatchesLocked flushes all batches to the database (must be called with mutex held)
|
||||
func (h *BatchedDatabaseHandler) flushBatchesLocked() {
|
||||
if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// Process ASNs first (deduped)
|
||||
asnMap := make(map[int]time.Time)
|
||||
for _, op := range h.asnBatch {
|
||||
if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) {
|
||||
asnMap[op.number] = op.timestamp
|
||||
}
|
||||
}
|
||||
|
||||
asnCache := make(map[int]*database.ASN)
|
||||
for asn, ts := range asnMap {
|
||||
asnObj, err := h.db.GetOrCreateASN(asn, ts)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err)
|
||||
|
||||
continue
|
||||
}
|
||||
asnCache[asn] = asnObj
|
||||
}
|
||||
|
||||
// Process prefixes (deduped)
|
||||
prefixMap := make(map[string]time.Time)
|
||||
for _, op := range h.prefixBatch {
|
||||
if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) {
|
||||
prefixMap[op.prefix] = op.timestamp
|
||||
}
|
||||
}
|
||||
|
||||
for prefix, ts := range prefixMap {
|
||||
_, err := h.db.GetOrCreatePrefix(prefix, ts)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Process peerings (deduped)
|
||||
type peeringKey struct {
|
||||
from, to int
|
||||
}
|
||||
peeringMap := make(map[peeringKey]time.Time)
|
||||
for _, op := range h.peeringBatch {
|
||||
key := peeringKey{from: op.fromASN, to: op.toASN}
|
||||
if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) {
|
||||
peeringMap[key] = op.timestamp
|
||||
}
|
||||
}
|
||||
|
||||
for key, ts := range peeringMap {
|
||||
fromAS := asnCache[key.from]
|
||||
toAS := asnCache[key.to]
|
||||
if fromAS != nil && toAS != nil {
|
||||
err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to record peering",
|
||||
"from_asn", key.from,
|
||||
"to_asn", key.to,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear batches
|
||||
h.prefixBatch = h.prefixBatch[:0]
|
||||
h.asnBatch = h.asnBatch[:0]
|
||||
h.peeringBatch = h.peeringBatch[:0]
|
||||
h.lastFlush = time.Now()
|
||||
|
||||
h.logger.Debug("Flushed database batches",
|
||||
"duration", time.Since(start),
|
||||
"asns", len(asnMap),
|
||||
"prefixes", len(prefixMap),
|
||||
"peerings", len(peeringMap),
|
||||
)
|
||||
}
|
||||
|
||||
// Stop gracefully stops the handler and flushes remaining batches
|
||||
func (h *BatchedDatabaseHandler) Stop() {
|
||||
close(h.stopCh)
|
||||
h.wg.Wait()
|
||||
}
|
170
internal/routewatch/peerhandler_batched.go
Normal file
170
internal/routewatch/peerhandler_batched.go
Normal file
@ -0,0 +1,170 @@
|
||||
package routewatch
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||
)
|
||||
|
||||
const (
|
||||
// batchedPeerHandlerQueueSize is the queue capacity for peer tracking operations
|
||||
batchedPeerHandlerQueueSize = 2000
|
||||
|
||||
// peerBatchSize is the number of peer updates to batch together
|
||||
peerBatchSize = 50
|
||||
|
||||
// peerBatchTimeout is the maximum time to wait before flushing a batch
|
||||
peerBatchTimeout = 200 * time.Millisecond
|
||||
)
|
||||
|
||||
// BatchedPeerHandler tracks BGP peers from all message types using batched operations
|
||||
type BatchedPeerHandler struct {
|
||||
db database.Store
|
||||
logger *slog.Logger
|
||||
|
||||
// Batching
|
||||
mu sync.Mutex
|
||||
peerBatch []peerUpdate
|
||||
lastFlush time.Time
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type peerUpdate struct {
|
||||
peerIP string
|
||||
peerASN int
|
||||
messageType string
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// NewBatchedPeerHandler creates a new batched peer tracking handler
|
||||
func NewBatchedPeerHandler(db database.Store, logger *slog.Logger) *BatchedPeerHandler {
|
||||
h := &BatchedPeerHandler{
|
||||
db: db,
|
||||
logger: logger,
|
||||
peerBatch: make([]peerUpdate, 0, peerBatchSize),
|
||||
lastFlush: time.Now(),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start the flush timer goroutine
|
||||
h.wg.Add(1)
|
||||
go h.flushLoop()
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
// WantsMessage returns true for all message types since we track peers from all messages
|
||||
func (h *BatchedPeerHandler) WantsMessage(_ string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// QueueCapacity returns the desired queue capacity for this handler
|
||||
func (h *BatchedPeerHandler) QueueCapacity() int {
|
||||
// Batching allows us to use a larger queue
|
||||
return batchedPeerHandlerQueueSize
|
||||
}
|
||||
|
||||
// HandleMessage processes a message to track peer information
|
||||
func (h *BatchedPeerHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
// Parse peer ASN from string
|
||||
peerASN := 0
|
||||
if msg.PeerASN != "" {
|
||||
if asn, err := strconv.Atoi(msg.PeerASN); err == nil {
|
||||
peerASN = asn
|
||||
}
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Add to batch
|
||||
h.peerBatch = append(h.peerBatch, peerUpdate{
|
||||
peerIP: msg.Peer,
|
||||
peerASN: peerASN,
|
||||
messageType: msg.Type,
|
||||
timestamp: msg.ParsedTimestamp,
|
||||
})
|
||||
|
||||
// Check if we need to flush
|
||||
if len(h.peerBatch) >= peerBatchSize {
|
||||
h.flushBatchLocked()
|
||||
}
|
||||
}
|
||||
|
||||
// flushLoop runs in a goroutine and periodically flushes batches
|
||||
func (h *BatchedPeerHandler) flushLoop() {
|
||||
defer h.wg.Done()
|
||||
ticker := time.NewTicker(peerBatchTimeout)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
h.mu.Lock()
|
||||
if time.Since(h.lastFlush) >= peerBatchTimeout {
|
||||
h.flushBatchLocked()
|
||||
}
|
||||
h.mu.Unlock()
|
||||
case <-h.stopCh:
|
||||
// Final flush
|
||||
h.mu.Lock()
|
||||
h.flushBatchLocked()
|
||||
h.mu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flushBatchLocked flushes the peer batch to the database (must be called with mutex held)
|
||||
func (h *BatchedPeerHandler) flushBatchLocked() {
|
||||
if len(h.peerBatch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// Deduplicate by peer IP, keeping the latest update for each peer
|
||||
peerMap := make(map[string]peerUpdate)
|
||||
for _, update := range h.peerBatch {
|
||||
if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) {
|
||||
peerMap[update.peerIP] = update
|
||||
}
|
||||
}
|
||||
|
||||
// Apply updates
|
||||
successCount := 0
|
||||
for _, update := range peerMap {
|
||||
if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil {
|
||||
h.logger.Error("Failed to update peer",
|
||||
"peer", update.peerIP,
|
||||
"peer_asn", update.peerASN,
|
||||
"message_type", update.messageType,
|
||||
"error", err,
|
||||
)
|
||||
} else {
|
||||
successCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Clear batch
|
||||
h.peerBatch = h.peerBatch[:0]
|
||||
h.lastFlush = time.Now()
|
||||
|
||||
h.logger.Debug("Flushed peer batch",
|
||||
"duration", time.Since(start),
|
||||
"total_updates", len(peerMap),
|
||||
"successful", successCount,
|
||||
)
|
||||
}
|
||||
|
||||
// Stop gracefully stops the handler and flushes remaining batches
|
||||
func (h *BatchedPeerHandler) Stop() {
|
||||
close(h.stopCh)
|
||||
h.wg.Wait()
|
||||
}
|
Loading…
Reference in New Issue
Block a user