Add live routing table with CIDR mask length tracking

- Added new live_routes table with mask_length column for tracking CIDR prefix lengths
- Updated PrefixHandler to maintain live routing table with additions and deletions
- Added route expiration functionality (5 minute timeout) to in-memory routing table
- Added prefix distribution stats showing count of prefixes by mask length
- Added IPv4/IPv6 prefix distribution cards to status page
- Updated database interface with UpsertLiveRoute, DeleteLiveRoute, and GetPrefixDistribution
- Set all handler queue depths to 50000 for consistency
- Doubled DBHandler batch size to 32000 for better throughput
- Fixed withdrawal handling to delete routes when origin ASN is available
This commit is contained in:
2025-07-28 01:51:42 +02:00
parent cea7c3dfd3
commit 3c46087976
13 changed files with 471 additions and 148 deletions

View File

@@ -178,6 +178,9 @@ func (rw *RouteWatch) Shutdown() {
// Stop services
rw.streamer.Stop()
// Stop routing table expiration
rw.routingTable.Stop()
// Stop HTTP server with a timeout
const serverStopTimeout = 5 * time.Second
stopCtx, cancel := context.WithTimeout(context.Background(), serverStopTimeout)

View File

@@ -157,6 +157,24 @@ func (m *mockStore) GetStats() (database.Stats, error) {
}, nil
}
// UpsertLiveRoute mock implementation
func (m *mockStore) UpsertLiveRoute(route *database.LiveRoute) error {
// Simple mock - just return nil
return nil
}
// DeleteLiveRoute mock implementation
func (m *mockStore) DeleteLiveRoute(prefix string, originASN int, peerIP string) error {
// Simple mock - just return nil
return nil
}
// GetPrefixDistribution mock implementation
func (m *mockStore) GetPrefixDistribution() (ipv4 []database.PrefixDistribution, ipv6 []database.PrefixDistribution, err error) {
// Return empty distributions for now
return nil, nil, nil
}
func TestRouteWatchLiveFeed(t *testing.T) {
// Disable snapshotter for tests
t.Setenv("ROUTEWATCH_DISABLE_SNAPSHOTTER", "1")

View File

@@ -11,10 +11,10 @@ import (
const (
// dbHandlerQueueSize is the queue capacity for database operations
dbHandlerQueueSize = 200000
dbHandlerQueueSize = 50000
// batchSize is the number of operations to batch together
batchSize = 16000
batchSize = 32000
// batchTimeout is the maximum time to wait before flushing a batch
batchTimeout = 5 * time.Second

View File

@@ -12,7 +12,7 @@ import (
const (
// peerHandlerQueueSize is the queue capacity for peer tracking operations
peerHandlerQueueSize = 2000
peerHandlerQueueSize = 50000
// peerBatchSize is the number of peer updates to batch together
peerBatchSize = 500

View File

@@ -1,13 +1,15 @@
package routewatch
import (
"encoding/json"
"net"
"strings"
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
"github.com/google/uuid"
)
const (
@@ -19,9 +21,14 @@ const (
// prefixBatchTimeout is the maximum time to wait before flushing a batch
prefixBatchTimeout = 5 * time.Second
// IP version constants
ipv4Version = 4
ipv6Version = 6
)
// PrefixHandler tracks BGP prefixes and maintains a routing table in the database
// PrefixHandler tracks BGP prefixes and maintains a live routing table in the database.
// Routes are added on announcement and deleted on withdrawal.
type PrefixHandler struct {
db database.Store
logger *logger.Logger
@@ -185,80 +192,73 @@ func (h *PrefixHandler) flushBatchLocked() {
h.lastFlush = time.Now()
}
// parseCIDR extracts the mask length and IP version from a prefix string
func parseCIDR(prefix string) (maskLength int, ipVersion int, err error) {
_, ipNet, err := net.ParseCIDR(prefix)
if err != nil {
return 0, 0, err
}
ones, _ := ipNet.Mask.Size()
if strings.Contains(prefix, ":") {
return ones, ipv6Version, nil
}
return ones, ipv4Version, nil
}
// processAnnouncement handles storing an announcement in the database
func (h *PrefixHandler) processAnnouncement(prefix *database.Prefix, update prefixUpdate) {
// Get or create origin ASN
originASN, err := h.db.GetOrCreateASN(update.originASN, update.timestamp)
func (h *PrefixHandler) processAnnouncement(_ *database.Prefix, update prefixUpdate) {
// Parse CIDR to get mask length
maskLength, ipVersion, err := parseCIDR(update.prefix)
if err != nil {
h.logger.Error("Failed to get/create origin ASN",
"asn", update.originASN,
h.logger.Error("Failed to parse CIDR",
"prefix", update.prefix,
"error", err,
)
return
}
// Get or create peer ASN (first element in path if exists)
var peerASN *database.ASN
if len(update.path) > 0 {
peerASN, err = h.db.GetOrCreateASN(update.path[0], update.timestamp)
if err != nil {
h.logger.Error("Failed to get/create peer ASN",
"asn", update.path[0],
"error", err,
)
return
}
} else {
// If no path, use origin as peer
peerASN = originASN
// Create live route record
liveRoute := &database.LiveRoute{
ID: uuid.New(),
Prefix: update.prefix,
MaskLength: maskLength,
IPVersion: ipVersion,
OriginASN: update.originASN,
PeerIP: update.peer,
ASPath: update.path,
NextHop: update.peer, // Using peer as next hop
LastUpdated: update.timestamp,
}
// Encode AS path as JSON
pathJSON, err := json.Marshal(update.path)
if err != nil {
h.logger.Error("Failed to encode AS path",
"path", update.path,
"error", err,
)
return
}
// Create announcement record
announcement := &database.Announcement{
PrefixID: prefix.ID,
ASNID: peerASN.ID,
OriginASNID: originASN.ID,
Path: string(pathJSON),
NextHop: update.peer,
Timestamp: update.timestamp,
IsWithdrawal: false,
}
if err := h.db.RecordAnnouncement(announcement); err != nil {
h.logger.Error("Failed to record announcement",
if err := h.db.UpsertLiveRoute(liveRoute); err != nil {
h.logger.Error("Failed to upsert live route",
"prefix", update.prefix,
"error", err,
)
}
}
// processWithdrawal handles storing a withdrawal in the database
func (h *PrefixHandler) processWithdrawal(prefix *database.Prefix, update prefixUpdate) {
// For withdrawals, create a withdrawal record
announcement := &database.Announcement{
PrefixID: prefix.ID,
NextHop: update.peer,
Timestamp: update.timestamp,
IsWithdrawal: true,
}
if err := h.db.RecordAnnouncement(announcement); err != nil {
h.logger.Error("Failed to record withdrawal",
// processWithdrawal handles removing a route from the live routing table
func (h *PrefixHandler) processWithdrawal(_ *database.Prefix, update prefixUpdate) {
// For withdrawals, we need to delete the route from live_routes
// Since we have the origin ASN from the update, we can delete the specific route
if update.originASN > 0 {
if err := h.db.DeleteLiveRoute(update.prefix, update.originASN, update.peer); err != nil {
h.logger.Error("Failed to delete live route",
"prefix", update.prefix,
"origin_asn", update.originASN,
"peer", update.peer,
"error", err,
)
}
} else {
// If no origin ASN, log a warning
h.logger.Warn("Withdrawal without origin ASN",
"prefix", update.prefix,
"error", err,
"peer", update.peer,
)
}
}