diff --git a/internal/routewatch/dbhandler.go b/internal/routewatch/dbhandler.go index 611b1f5..dfbdaf1 100644 --- a/internal/routewatch/dbhandler.go +++ b/internal/routewatch/dbhandler.go @@ -27,7 +27,6 @@ type DBHandler struct { // Batching mu sync.Mutex - prefixBatch []prefixOp asnBatch []asnOp peeringBatch []peeringOp lastFlush time.Time @@ -35,11 +34,6 @@ type DBHandler struct { wg sync.WaitGroup } -type prefixOp struct { - prefix string - timestamp time.Time -} - type asnOp struct { number int timestamp time.Time @@ -59,7 +53,6 @@ func NewDBHandler( h := &DBHandler{ db: db, logger: logger, - prefixBatch: make([]prefixOp, 0, batchSize), asnBatch: make([]asnOp, 0, batchSize), peeringBatch: make([]peeringOp, 0, batchSize), lastFlush: time.Now(), @@ -99,60 +92,41 @@ func (h *DBHandler) HandleMessage(msg *ristypes.RISMessage) { h.mu.Lock() defer h.mu.Unlock() - // Queue operations for announcements - for _, announcement := range msg.Announcements { - for _, prefix := range announcement.Prefixes { - // Queue prefix operation - h.prefixBatch = append(h.prefixBatch, prefixOp{ - prefix: prefix, - timestamp: timestamp, - }) - - // Queue origin ASN operation - if originASN > 0 { - h.asnBatch = append(h.asnBatch, asnOp{ - number: originASN, - timestamp: timestamp, - }) - } - - // Process AS path to queue peering operations - if len(msg.Path) > 1 { - for i := range len(msg.Path) - 1 { - fromASN := msg.Path[i] - toASN := msg.Path[i+1] - - // Queue ASN operations - h.asnBatch = append(h.asnBatch, asnOp{ - number: fromASN, - timestamp: timestamp, - }) - h.asnBatch = append(h.asnBatch, asnOp{ - number: toASN, - timestamp: timestamp, - }) - - // Queue peering operation - h.peeringBatch = append(h.peeringBatch, peeringOp{ - fromASN: fromASN, - toASN: toASN, - timestamp: timestamp, - }) - } - } - } - } - - // Queue operations for withdrawals - for _, prefix := range msg.Withdrawals { - h.prefixBatch = append(h.prefixBatch, prefixOp{ - prefix: prefix, + // Queue origin ASN operation + if originASN > 0 { + h.asnBatch = append(h.asnBatch, asnOp{ + number: originASN, timestamp: timestamp, }) } + // Process AS path to queue peering operations + if len(msg.Path) > 1 { + for i := range len(msg.Path) - 1 { + fromASN := msg.Path[i] + toASN := msg.Path[i+1] + + // Queue ASN operations + h.asnBatch = append(h.asnBatch, asnOp{ + number: fromASN, + timestamp: timestamp, + }) + h.asnBatch = append(h.asnBatch, asnOp{ + number: toASN, + timestamp: timestamp, + }) + + // Queue peering operation + h.peeringBatch = append(h.peeringBatch, peeringOp{ + fromASN: fromASN, + toASN: toASN, + timestamp: timestamp, + }) + } + } + // Check if we need to flush - if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize { + if len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize { h.flushBatchesLocked() } } @@ -184,7 +158,7 @@ func (h *DBHandler) flushLoop() { // flushBatchesLocked flushes all batches to the database (must be called with mutex held) func (h *DBHandler) flushBatchesLocked() { - if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 { + if len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 { return } @@ -207,21 +181,6 @@ func (h *DBHandler) flushBatchesLocked() { asnCache[asn] = asnObj } - // Process prefixes (deduped) - prefixMap := make(map[string]time.Time) - for _, op := range h.prefixBatch { - if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) { - prefixMap[op.prefix] = op.timestamp - } - } - - for prefix, ts := range prefixMap { - _, err := h.db.GetOrCreatePrefix(prefix, ts) - if err != nil { - h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err) - } - } - // Process peerings (deduped) type peeringKey struct { from, to int @@ -250,7 +209,6 @@ func (h *DBHandler) flushBatchesLocked() { } // Clear batches - h.prefixBatch = h.prefixBatch[:0] h.asnBatch = h.asnBatch[:0] h.peeringBatch = h.peeringBatch[:0] h.lastFlush = time.Now() diff --git a/internal/routewatch/prefixhandler.go b/internal/routewatch/prefixhandler.go index f6ca240..1517602 100644 --- a/internal/routewatch/prefixhandler.go +++ b/internal/routewatch/prefixhandler.go @@ -168,7 +168,7 @@ func (h *PrefixHandler) flushBatchLocked() { // Apply updates to database for _, update := range prefixMap { - // Get or create prefix + // Get or create prefix (this maintains the prefixes table) prefix, err := h.db.GetOrCreatePrefix(update.prefix, update.timestamp) if err != nil { h.logger.Error("Failed to get/create prefix",