- Increase prefix batch size from 5K to 20K - Increase ASN batch size from 10K to 30K - Add comments warning not to reduce batch timeouts - Add comments warning not to increase queue sizes above 100K - Maintains existing batch timeouts for efficiency
164 lines
3.7 KiB
Go
164 lines
3.7 KiB
Go
package routewatch
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"git.eeqj.de/sneak/routewatch/internal/database"
|
|
"git.eeqj.de/sneak/routewatch/internal/logger"
|
|
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
|
)
|
|
|
|
const (
|
|
// asHandlerQueueSize is the queue capacity for ASN operations
|
|
// DO NOT set this higher than 100000 without explicit instructions
|
|
asHandlerQueueSize = 100000
|
|
|
|
// asnBatchSize is the number of ASN operations to batch together
|
|
asnBatchSize = 30000
|
|
|
|
// asnBatchTimeout is the maximum time to wait before flushing a batch
|
|
// DO NOT reduce this timeout - larger batches are more efficient
|
|
asnBatchTimeout = 2 * time.Second
|
|
)
|
|
|
|
// ASHandler handles ASN information from BGP messages using batched operations
|
|
type ASHandler struct {
|
|
db database.Store
|
|
logger *logger.Logger
|
|
|
|
// Batching
|
|
mu sync.Mutex
|
|
batch []asnOp
|
|
lastFlush time.Time
|
|
stopCh chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
type asnOp struct {
|
|
number int
|
|
timestamp time.Time
|
|
}
|
|
|
|
// NewASHandler creates a new batched ASN handler
|
|
func NewASHandler(db database.Store, logger *logger.Logger) *ASHandler {
|
|
h := &ASHandler{
|
|
db: db,
|
|
logger: logger,
|
|
batch: make([]asnOp, 0, asnBatchSize),
|
|
lastFlush: time.Now(),
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
|
|
// Start the flush timer goroutine
|
|
h.wg.Add(1)
|
|
go h.flushLoop()
|
|
|
|
return h
|
|
}
|
|
|
|
// WantsMessage returns true if this handler wants to process messages of the given type
|
|
func (h *ASHandler) WantsMessage(messageType string) bool {
|
|
// We only care about UPDATE messages for the database
|
|
return messageType == "UPDATE"
|
|
}
|
|
|
|
// QueueCapacity returns the desired queue capacity for this handler
|
|
func (h *ASHandler) QueueCapacity() int {
|
|
// Batching allows us to use a larger queue
|
|
return asHandlerQueueSize
|
|
}
|
|
|
|
// HandleMessage processes a RIS message and queues database operations
|
|
func (h *ASHandler) HandleMessage(msg *ristypes.RISMessage) {
|
|
// Use the pre-parsed timestamp
|
|
timestamp := msg.ParsedTimestamp
|
|
|
|
// Get origin ASN from path (last element)
|
|
var originASN int
|
|
if len(msg.Path) > 0 {
|
|
originASN = msg.Path[len(msg.Path)-1]
|
|
}
|
|
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
// Queue origin ASN operation
|
|
if originASN > 0 {
|
|
h.batch = append(h.batch, asnOp{
|
|
number: originASN,
|
|
timestamp: timestamp,
|
|
})
|
|
}
|
|
|
|
// Also track all ASNs in the path
|
|
for _, asn := range msg.Path {
|
|
if asn > 0 {
|
|
h.batch = append(h.batch, asnOp{
|
|
number: asn,
|
|
timestamp: timestamp,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Check if we need to flush
|
|
if len(h.batch) >= asnBatchSize {
|
|
h.flushBatchLocked()
|
|
}
|
|
}
|
|
|
|
// flushLoop runs in a goroutine and periodically flushes batches
|
|
func (h *ASHandler) flushLoop() {
|
|
defer h.wg.Done()
|
|
ticker := time.NewTicker(asnBatchTimeout)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
h.mu.Lock()
|
|
if time.Since(h.lastFlush) >= asnBatchTimeout {
|
|
h.flushBatchLocked()
|
|
}
|
|
h.mu.Unlock()
|
|
case <-h.stopCh:
|
|
// Final flush
|
|
h.mu.Lock()
|
|
h.flushBatchLocked()
|
|
h.mu.Unlock()
|
|
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// flushBatchLocked flushes the ASN batch to the database (must be called with mutex held)
|
|
func (h *ASHandler) flushBatchLocked() {
|
|
if len(h.batch) == 0 {
|
|
return
|
|
}
|
|
|
|
// Process ASNs first (deduped)
|
|
asnMap := make(map[int]time.Time)
|
|
for _, op := range h.batch {
|
|
if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) {
|
|
asnMap[op.number] = op.timestamp
|
|
}
|
|
}
|
|
|
|
// Process all ASNs in a single batch transaction
|
|
if err := h.db.GetOrCreateASNBatch(asnMap); err != nil {
|
|
h.logger.Error("Failed to process ASN batch", "error", err, "count", len(asnMap))
|
|
}
|
|
|
|
// Clear batch
|
|
h.batch = h.batch[:0]
|
|
h.lastFlush = time.Now()
|
|
}
|
|
|
|
// Stop gracefully stops the handler and flushes remaining batches
|
|
func (h *ASHandler) Stop() {
|
|
close(h.stopCh)
|
|
h.wg.Wait()
|
|
}
|