Move prefixes table maintenance from DBHandler to PrefixHandler
- DBHandler now only maintains asns and asn_peerings tables - PrefixHandler maintains both prefixes and live_routes tables - This consolidates all prefix-related operations in one handler
This commit is contained in:
parent
8b43882526
commit
eaa11b5f8d
@ -27,7 +27,6 @@ type DBHandler struct {
|
||||
|
||||
// Batching
|
||||
mu sync.Mutex
|
||||
prefixBatch []prefixOp
|
||||
asnBatch []asnOp
|
||||
peeringBatch []peeringOp
|
||||
lastFlush time.Time
|
||||
@ -35,11 +34,6 @@ type DBHandler struct {
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type prefixOp struct {
|
||||
prefix string
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
type asnOp struct {
|
||||
number int
|
||||
timestamp time.Time
|
||||
@ -59,7 +53,6 @@ func NewDBHandler(
|
||||
h := &DBHandler{
|
||||
db: db,
|
||||
logger: logger,
|
||||
prefixBatch: make([]prefixOp, 0, batchSize),
|
||||
asnBatch: make([]asnOp, 0, batchSize),
|
||||
peeringBatch: make([]peeringOp, 0, batchSize),
|
||||
lastFlush: time.Now(),
|
||||
@ -99,60 +92,41 @@ func (h *DBHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Queue operations for announcements
|
||||
for _, announcement := range msg.Announcements {
|
||||
for _, prefix := range announcement.Prefixes {
|
||||
// Queue prefix operation
|
||||
h.prefixBatch = append(h.prefixBatch, prefixOp{
|
||||
prefix: prefix,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
|
||||
// Queue origin ASN operation
|
||||
if originASN > 0 {
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: originASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
// Process AS path to queue peering operations
|
||||
if len(msg.Path) > 1 {
|
||||
for i := range len(msg.Path) - 1 {
|
||||
fromASN := msg.Path[i]
|
||||
toASN := msg.Path[i+1]
|
||||
|
||||
// Queue ASN operations
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: fromASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: toASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
|
||||
// Queue peering operation
|
||||
h.peeringBatch = append(h.peeringBatch, peeringOp{
|
||||
fromASN: fromASN,
|
||||
toASN: toASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Queue operations for withdrawals
|
||||
for _, prefix := range msg.Withdrawals {
|
||||
h.prefixBatch = append(h.prefixBatch, prefixOp{
|
||||
prefix: prefix,
|
||||
// Queue origin ASN operation
|
||||
if originASN > 0 {
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: originASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
// Process AS path to queue peering operations
|
||||
if len(msg.Path) > 1 {
|
||||
for i := range len(msg.Path) - 1 {
|
||||
fromASN := msg.Path[i]
|
||||
toASN := msg.Path[i+1]
|
||||
|
||||
// Queue ASN operations
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: fromASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: toASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
|
||||
// Queue peering operation
|
||||
h.peeringBatch = append(h.peeringBatch, peeringOp{
|
||||
fromASN: fromASN,
|
||||
toASN: toASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we need to flush
|
||||
if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize {
|
||||
if len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize {
|
||||
h.flushBatchesLocked()
|
||||
}
|
||||
}
|
||||
@ -184,7 +158,7 @@ func (h *DBHandler) flushLoop() {
|
||||
|
||||
// flushBatchesLocked flushes all batches to the database (must be called with mutex held)
|
||||
func (h *DBHandler) flushBatchesLocked() {
|
||||
if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 {
|
||||
if len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@ -207,21 +181,6 @@ func (h *DBHandler) flushBatchesLocked() {
|
||||
asnCache[asn] = asnObj
|
||||
}
|
||||
|
||||
// Process prefixes (deduped)
|
||||
prefixMap := make(map[string]time.Time)
|
||||
for _, op := range h.prefixBatch {
|
||||
if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) {
|
||||
prefixMap[op.prefix] = op.timestamp
|
||||
}
|
||||
}
|
||||
|
||||
for prefix, ts := range prefixMap {
|
||||
_, err := h.db.GetOrCreatePrefix(prefix, ts)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Process peerings (deduped)
|
||||
type peeringKey struct {
|
||||
from, to int
|
||||
@ -250,7 +209,6 @@ func (h *DBHandler) flushBatchesLocked() {
|
||||
}
|
||||
|
||||
// Clear batches
|
||||
h.prefixBatch = h.prefixBatch[:0]
|
||||
h.asnBatch = h.asnBatch[:0]
|
||||
h.peeringBatch = h.peeringBatch[:0]
|
||||
h.lastFlush = time.Now()
|
||||
|
@ -168,7 +168,7 @@ func (h *PrefixHandler) flushBatchLocked() {
|
||||
|
||||
// Apply updates to database
|
||||
for _, update := range prefixMap {
|
||||
// Get or create prefix
|
||||
// Get or create prefix (this maintains the prefixes table)
|
||||
prefix, err := h.db.GetOrCreatePrefix(update.prefix, update.timestamp)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to get/create prefix",
|
||||
|
Loading…
Reference in New Issue
Block a user