Compare commits

..

No commits in common. "eda90d96a9344988fc276ab5559793ad51faf920" and "3aef3f9a07cdb994907f8e459579b7307a8d0653" have entirely different histories.

15 changed files with 669 additions and 1034 deletions

View File

@ -15,9 +15,6 @@ const (
// dirPermissions for creating directories // dirPermissions for creating directories
dirPermissions = 0750 // rwxr-x--- dirPermissions = 0750 // rwxr-x---
// defaultRouteExpirationMinutes is the default route expiration timeout in minutes
defaultRouteExpirationMinutes = 5
) )
// Config holds configuration for the entire application // Config holds configuration for the entire application
@ -30,10 +27,6 @@ type Config struct {
// EnableBatchedDatabaseWrites enables batched database operations for better performance // EnableBatchedDatabaseWrites enables batched database operations for better performance
EnableBatchedDatabaseWrites bool EnableBatchedDatabaseWrites bool
// RouteExpirationTimeout is how long a route can go without being refreshed before expiring
// Default is 2 hours which is conservative for BGP (typical BGP hold time is 90-180 seconds)
RouteExpirationTimeout time.Duration
} }
// New creates a new Config with default paths based on the OS // New creates a new Config with default paths based on the OS
@ -46,8 +39,7 @@ func New() (*Config, error) {
return &Config{ return &Config{
StateDir: stateDir, StateDir: stateDir,
MaxRuntime: 0, // Run forever by default MaxRuntime: 0, // Run forever by default
EnableBatchedDatabaseWrites: true, // Enable batching by default EnableBatchedDatabaseWrites: true, // Enable batching by default for better performance
RouteExpirationTimeout: defaultRouteExpirationMinutes * time.Minute, // For active route monitoring
}, nil }, nil
} }

View File

@ -4,7 +4,6 @@ package database
import ( import (
"database/sql" "database/sql"
_ "embed" _ "embed"
"encoding/json"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
@ -388,120 +387,5 @@ func (d *Database) GetStats() (Stats, error) {
stats.FileSizeBytes = fileInfo.Size() stats.FileSizeBytes = fileInfo.Size()
} }
// Get live routes count
err = d.db.QueryRow("SELECT COUNT(*) FROM live_routes").Scan(&stats.LiveRoutes)
if err != nil {
return stats, fmt.Errorf("failed to count live routes: %w", err)
}
// Get prefix distribution
stats.IPv4PrefixDistribution, stats.IPv6PrefixDistribution, err = d.GetPrefixDistribution()
if err != nil {
// Log but don't fail
d.logger.Warn("Failed to get prefix distribution", "error", err)
}
return stats, nil return stats, nil
} }
// UpsertLiveRoute inserts or updates a live route
func (d *Database) UpsertLiveRoute(route *LiveRoute) error {
query := `
INSERT INTO live_routes (id, prefix, mask_length, ip_version, origin_asn, peer_ip, as_path, next_hop, last_updated)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(prefix, origin_asn, peer_ip) DO UPDATE SET
mask_length = excluded.mask_length,
ip_version = excluded.ip_version,
as_path = excluded.as_path,
next_hop = excluded.next_hop,
last_updated = excluded.last_updated
`
// Encode AS path as JSON
pathJSON, err := json.Marshal(route.ASPath)
if err != nil {
return fmt.Errorf("failed to encode AS path: %w", err)
}
_, err = d.db.Exec(query,
route.ID.String(),
route.Prefix,
route.MaskLength,
route.IPVersion,
route.OriginASN,
route.PeerIP,
string(pathJSON),
route.NextHop,
route.LastUpdated,
)
return err
}
// DeleteLiveRoute deletes a live route
// If originASN is 0, deletes all routes for the prefix/peer combination
func (d *Database) DeleteLiveRoute(prefix string, originASN int, peerIP string) error {
var query string
var err error
if originASN == 0 {
// Delete all routes for this prefix from this peer
query = `DELETE FROM live_routes WHERE prefix = ? AND peer_ip = ?`
_, err = d.db.Exec(query, prefix, peerIP)
} else {
// Delete specific route
query = `DELETE FROM live_routes WHERE prefix = ? AND origin_asn = ? AND peer_ip = ?`
_, err = d.db.Exec(query, prefix, originASN, peerIP)
}
return err
}
// GetPrefixDistribution returns the distribution of prefixes by mask length
func (d *Database) GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error) {
// IPv4 distribution
query := `
SELECT mask_length, COUNT(*) as count
FROM live_routes
WHERE ip_version = 4
GROUP BY mask_length
ORDER BY mask_length
`
rows, err := d.db.Query(query)
if err != nil {
return nil, nil, fmt.Errorf("failed to query IPv4 distribution: %w", err)
}
defer func() { _ = rows.Close() }()
for rows.Next() {
var dist PrefixDistribution
if err := rows.Scan(&dist.MaskLength, &dist.Count); err != nil {
return nil, nil, fmt.Errorf("failed to scan IPv4 distribution: %w", err)
}
ipv4 = append(ipv4, dist)
}
// IPv6 distribution
query = `
SELECT mask_length, COUNT(*) as count
FROM live_routes
WHERE ip_version = 6
GROUP BY mask_length
ORDER BY mask_length
`
rows, err = d.db.Query(query)
if err != nil {
return nil, nil, fmt.Errorf("failed to query IPv6 distribution: %w", err)
}
defer func() { _ = rows.Close() }()
for rows.Next() {
var dist PrefixDistribution
if err := rows.Scan(&dist.MaskLength, &dist.Count); err != nil {
return nil, nil, fmt.Errorf("failed to scan IPv6 distribution: %w", err)
}
ipv6 = append(ipv6, dist)
}
return ipv4, ipv6, nil
}

View File

@ -12,9 +12,6 @@ type Stats struct {
IPv6Prefixes int IPv6Prefixes int
Peerings int Peerings int
FileSizeBytes int64 FileSizeBytes int64
LiveRoutes int
IPv4PrefixDistribution []PrefixDistribution
IPv6PrefixDistribution []PrefixDistribution
} }
// Store defines the interface for database operations // Store defines the interface for database operations
@ -37,11 +34,6 @@ type Store interface {
// Peer operations // Peer operations
UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error
// Live route operations
UpsertLiveRoute(route *LiveRoute) error
DeleteLiveRoute(prefix string, originASN int, peerIP string) error
GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error)
// Lifecycle // Lifecycle
Close() error Close() error
} }

View File

@ -45,22 +45,3 @@ type ASNPeering struct {
FirstSeen time.Time `json:"first_seen"` FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"` LastSeen time.Time `json:"last_seen"`
} }
// LiveRoute represents a route in the live routing table
type LiveRoute struct {
ID uuid.UUID `json:"id"`
Prefix string `json:"prefix"`
MaskLength int `json:"mask_length"`
IPVersion int `json:"ip_version"`
OriginASN int `json:"origin_asn"`
PeerIP string `json:"peer_ip"`
ASPath []int `json:"as_path"`
NextHop string `json:"next_hop"`
LastUpdated time.Time `json:"last_updated"`
}
// PrefixDistribution represents the distribution of prefixes by mask length
type PrefixDistribution struct {
MaskLength int `json:"mask_length"`
Count int `json:"count"`
}

View File

@ -69,23 +69,3 @@ CREATE INDEX IF NOT EXISTS idx_asns_number ON asns(number);
CREATE INDEX IF NOT EXISTS idx_bgp_peers_asn ON bgp_peers(peer_asn); CREATE INDEX IF NOT EXISTS idx_bgp_peers_asn ON bgp_peers(peer_asn);
CREATE INDEX IF NOT EXISTS idx_bgp_peers_last_seen ON bgp_peers(last_seen); CREATE INDEX IF NOT EXISTS idx_bgp_peers_last_seen ON bgp_peers(last_seen);
CREATE INDEX IF NOT EXISTS idx_bgp_peers_ip ON bgp_peers(peer_ip); CREATE INDEX IF NOT EXISTS idx_bgp_peers_ip ON bgp_peers(peer_ip);
-- Live routing table maintained by PrefixHandler
CREATE TABLE IF NOT EXISTS live_routes (
id TEXT PRIMARY KEY,
prefix TEXT NOT NULL,
mask_length INTEGER NOT NULL, -- CIDR mask length (0-32 for IPv4, 0-128 for IPv6)
ip_version INTEGER NOT NULL, -- 4 or 6
origin_asn INTEGER NOT NULL,
peer_ip TEXT NOT NULL,
as_path TEXT NOT NULL, -- JSON array
next_hop TEXT NOT NULL,
last_updated DATETIME NOT NULL,
UNIQUE(prefix, origin_asn, peer_ip)
);
-- Indexes for live_routes table
CREATE INDEX IF NOT EXISTS idx_live_routes_prefix ON live_routes(prefix);
CREATE INDEX IF NOT EXISTS idx_live_routes_mask_length ON live_routes(mask_length);
CREATE INDEX IF NOT EXISTS idx_live_routes_ip_version_mask ON live_routes(ip_version, mask_length);
CREATE INDEX IF NOT EXISTS idx_live_routes_last_updated ON live_routes(last_updated);

View File

@ -50,9 +50,8 @@ type RouteWatch struct {
shutdown bool shutdown bool
mu sync.Mutex mu sync.Mutex
config *config.Config config *config.Config
dbHandler *DBHandler batchedDBHandler *BatchedDatabaseHandler
peerHandler *PeerHandler batchedPeerHandler *BatchedPeerHandler
prefixHandler *PrefixHandler
} }
// isTruthy returns true if the value is considered truthy // isTruthy returns true if the value is considered truthy
@ -107,19 +106,17 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
// Register database handler to process BGP UPDATE messages // Register database handler to process BGP UPDATE messages
if rw.config.EnableBatchedDatabaseWrites { if rw.config.EnableBatchedDatabaseWrites {
rw.logger.Info("Using batched database handlers for improved performance") rw.logger.Info("Using batched database handlers for improved performance")
rw.dbHandler = NewDBHandler(rw.db, rw.logger) rw.batchedDBHandler = NewBatchedDatabaseHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.dbHandler) rw.streamer.RegisterHandler(rw.batchedDBHandler)
rw.peerHandler = NewPeerHandler(rw.db, rw.logger) rw.batchedPeerHandler = NewBatchedPeerHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.peerHandler) rw.streamer.RegisterHandler(rw.batchedPeerHandler)
rw.prefixHandler = NewPrefixHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(rw.prefixHandler)
} else { } else {
// Non-batched handlers not implemented yet dbHandler := NewDatabaseHandler(rw.db, rw.logger)
rw.logger.Error("Non-batched handlers not implemented") rw.streamer.RegisterHandler(dbHandler)
return fmt.Errorf("non-batched handlers not implemented") peerHandler := NewPeerHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(peerHandler)
} }
// Register routing table handler to maintain in-memory routing table // Register routing table handler to maintain in-memory routing table
@ -162,25 +159,18 @@ func (rw *RouteWatch) Shutdown() {
rw.mu.Unlock() rw.mu.Unlock()
// Stop batched handlers first to flush remaining batches // Stop batched handlers first to flush remaining batches
if rw.dbHandler != nil { if rw.batchedDBHandler != nil {
rw.logger.Info("Flushing database handler") rw.logger.Info("Flushing batched database handler")
rw.dbHandler.Stop() rw.batchedDBHandler.Stop()
} }
if rw.peerHandler != nil { if rw.batchedPeerHandler != nil {
rw.logger.Info("Flushing peer handler") rw.logger.Info("Flushing batched peer handler")
rw.peerHandler.Stop() rw.batchedPeerHandler.Stop()
}
if rw.prefixHandler != nil {
rw.logger.Info("Flushing prefix handler")
rw.prefixHandler.Stop()
} }
// Stop services // Stop services
rw.streamer.Stop() rw.streamer.Stop()
// Stop routing table expiration
rw.routingTable.Stop()
// Stop HTTP server with a timeout // Stop HTTP server with a timeout
const serverStopTimeout = 5 * time.Second const serverStopTimeout = 5 * time.Second
stopCtx, cancel := context.WithTimeout(context.Background(), serverStopTimeout) stopCtx, cancel := context.WithTimeout(context.Background(), serverStopTimeout)

View File

@ -157,24 +157,6 @@ func (m *mockStore) GetStats() (database.Stats, error) {
}, nil }, nil
} }
// UpsertLiveRoute mock implementation
func (m *mockStore) UpsertLiveRoute(route *database.LiveRoute) error {
// Simple mock - just return nil
return nil
}
// DeleteLiveRoute mock implementation
func (m *mockStore) DeleteLiveRoute(prefix string, originASN int, peerIP string) error {
// Simple mock - just return nil
return nil
}
// GetPrefixDistribution mock implementation
func (m *mockStore) GetPrefixDistribution() (ipv4 []database.PrefixDistribution, ipv6 []database.PrefixDistribution, err error) {
// Return empty distributions for now
return nil, nil, nil
}
func TestRouteWatchLiveFeed(t *testing.T) { func TestRouteWatchLiveFeed(t *testing.T) {
// Disable snapshotter for tests // Disable snapshotter for tests
t.Setenv("ROUTEWATCH_DISABLE_SNAPSHOTTER", "1") t.Setenv("ROUTEWATCH_DISABLE_SNAPSHOTTER", "1")
@ -195,7 +177,6 @@ func TestRouteWatchLiveFeed(t *testing.T) {
cfg := &config.Config{ cfg := &config.Config{
StateDir: "", StateDir: "",
MaxRuntime: 5 * time.Second, MaxRuntime: 5 * time.Second,
EnableBatchedDatabaseWrites: true,
} }
// Create routing table // Create routing table

View File

@ -1,92 +1,47 @@
package routewatch package routewatch
import ( import (
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger" "git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes" "git.eeqj.de/sneak/routewatch/internal/ristypes"
) )
const ( const (
// dbHandlerQueueSize is the queue capacity for database operations // databaseHandlerQueueSize is the queue capacity for database operations
dbHandlerQueueSize = 50000 databaseHandlerQueueSize = 200
// batchSize is the number of operations to batch together
batchSize = 32000
// batchTimeout is the maximum time to wait before flushing a batch
batchTimeout = 5 * time.Second
) )
// DBHandler handles BGP messages and stores them in the database using batched operations // DatabaseHandler handles BGP messages and stores them in the database
type DBHandler struct { type DatabaseHandler struct {
db database.Store db database.Store
logger *logger.Logger logger *logger.Logger
// Batching
mu sync.Mutex
prefixBatch []prefixOp
asnBatch []asnOp
peeringBatch []peeringOp
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
} }
type prefixOp struct { // NewDatabaseHandler creates a new database handler
prefix string func NewDatabaseHandler(
timestamp time.Time
}
type asnOp struct {
number int
timestamp time.Time
}
type peeringOp struct {
fromASN int
toASN int
timestamp time.Time
}
// NewDBHandler creates a new batched database handler
func NewDBHandler(
db database.Store, db database.Store,
logger *logger.Logger, logger *logger.Logger,
) *DBHandler { ) *DatabaseHandler {
h := &DBHandler{ return &DatabaseHandler{
db: db, db: db,
logger: logger, logger: logger,
prefixBatch: make([]prefixOp, 0, batchSize),
asnBatch: make([]asnOp, 0, batchSize),
peeringBatch: make([]peeringOp, 0, batchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
} }
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
} }
// WantsMessage returns true if this handler wants to process messages of the given type // WantsMessage returns true if this handler wants to process messages of the given type
func (h *DBHandler) WantsMessage(messageType string) bool { func (h *DatabaseHandler) WantsMessage(messageType string) bool {
// We only care about UPDATE messages for the database // We only care about UPDATE messages for the database
return messageType == "UPDATE" return messageType == "UPDATE"
} }
// QueueCapacity returns the desired queue capacity for this handler // QueueCapacity returns the desired queue capacity for this handler
func (h *DBHandler) QueueCapacity() int { func (h *DatabaseHandler) QueueCapacity() int {
// Batching allows us to use a larger queue // Database operations are slow, so use a smaller queue
return dbHandlerQueueSize return databaseHandlerQueueSize
} }
// HandleMessage processes a RIS message and queues database operations // HandleMessage processes a RIS message and updates the database
func (h *DBHandler) HandleMessage(msg *ristypes.RISMessage) { func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp // Use the pre-parsed timestamp
timestamp := msg.ParsedTimestamp timestamp := msg.ParsedTimestamp
@ -96,168 +51,105 @@ func (h *DBHandler) HandleMessage(msg *ristypes.RISMessage) {
originASN = msg.Path[len(msg.Path)-1] originASN = msg.Path[len(msg.Path)-1]
} }
h.mu.Lock() // Process announcements
defer h.mu.Unlock()
// Queue operations for announcements
for _, announcement := range msg.Announcements { for _, announcement := range msg.Announcements {
for _, prefix := range announcement.Prefixes { for _, prefix := range announcement.Prefixes {
// Queue prefix operation // Get or create prefix
h.prefixBatch = append(h.prefixBatch, prefixOp{ _, err := h.db.GetOrCreatePrefix(prefix, timestamp)
prefix: prefix, if err != nil {
timestamp: timestamp, h.logger.Error(
}) "Failed to get/create prefix",
"prefix",
prefix,
"error",
err,
)
// Queue origin ASN operation continue
if originASN > 0 {
h.asnBatch = append(h.asnBatch, asnOp{
number: originASN,
timestamp: timestamp,
})
} }
// Process AS path to queue peering operations // Get or create origin ASN
_, err = h.db.GetOrCreateASN(originASN, timestamp)
if err != nil {
h.logger.Error(
"Failed to get/create ASN",
"asn",
originASN,
"error",
err,
)
continue
}
// TODO: Record the announcement in the announcements table
// Process AS path to update peerings
if len(msg.Path) > 1 { if len(msg.Path) > 1 {
for i := range len(msg.Path) - 1 { for i := range len(msg.Path) - 1 {
fromASN := msg.Path[i] fromASN := msg.Path[i]
toASN := msg.Path[i+1] toASN := msg.Path[i+1]
// Queue ASN operations // Get or create both ASNs
h.asnBatch = append(h.asnBatch, asnOp{ fromAS, err := h.db.GetOrCreateASN(fromASN, timestamp)
number: fromASN,
timestamp: timestamp,
})
h.asnBatch = append(h.asnBatch, asnOp{
number: toASN,
timestamp: timestamp,
})
// Queue peering operation
h.peeringBatch = append(h.peeringBatch, peeringOp{
fromASN: fromASN,
toASN: toASN,
timestamp: timestamp,
})
}
}
}
}
// Queue operations for withdrawals
for _, prefix := range msg.Withdrawals {
h.prefixBatch = append(h.prefixBatch, prefixOp{
prefix: prefix,
timestamp: timestamp,
})
}
// Check if we need to flush
if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize {
h.flushBatchesLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *DBHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(batchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= batchTimeout {
h.flushBatchesLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchesLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchesLocked flushes all batches to the database (must be called with mutex held)
func (h *DBHandler) flushBatchesLocked() {
if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 {
return
}
// Process ASNs first (deduped)
asnMap := make(map[int]time.Time)
for _, op := range h.asnBatch {
if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) {
asnMap[op.number] = op.timestamp
}
}
asnCache := make(map[int]*database.ASN)
for asn, ts := range asnMap {
asnObj, err := h.db.GetOrCreateASN(asn, ts)
if err != nil { if err != nil {
h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err) h.logger.Error(
"Failed to get/create from ASN",
"asn",
fromASN,
"error",
err,
)
continue continue
} }
asnCache[asn] = asnObj
}
// Process prefixes (deduped) toAS, err := h.db.GetOrCreateASN(toASN, timestamp)
prefixMap := make(map[string]time.Time)
for _, op := range h.prefixBatch {
if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) {
prefixMap[op.prefix] = op.timestamp
}
}
for prefix, ts := range prefixMap {
_, err := h.db.GetOrCreatePrefix(prefix, ts)
if err != nil { if err != nil {
h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err) h.logger.Error(
} "Failed to get/create to ASN",
"asn",
toASN,
"error",
err,
)
continue
} }
// Process peerings (deduped) // Record the peering
type peeringKey struct { err = h.db.RecordPeering(
from, to int fromAS.ID.String(),
} toAS.ID.String(),
peeringMap := make(map[peeringKey]time.Time) timestamp,
for _, op := range h.peeringBatch { )
key := peeringKey{from: op.fromASN, to: op.toASN}
if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) {
peeringMap[key] = op.timestamp
}
}
for key, ts := range peeringMap {
fromAS := asnCache[key.from]
toAS := asnCache[key.to]
if fromAS != nil && toAS != nil {
err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts)
if err != nil { if err != nil {
h.logger.Error("Failed to record peering", h.logger.Error("Failed to record peering",
"from_asn", key.from, "from_asn", fromASN,
"to_asn", key.to, "to_asn", toASN,
"error", err, "error", err,
) )
} }
} }
} }
}
}
// Clear batches // Process withdrawals
h.prefixBatch = h.prefixBatch[:0] for _, prefix := range msg.Withdrawals {
h.asnBatch = h.asnBatch[:0] // Get prefix
h.peeringBatch = h.peeringBatch[:0] _, err := h.db.GetOrCreatePrefix(prefix, timestamp)
h.lastFlush = time.Now() if err != nil {
} h.logger.Error(
"Failed to get prefix for withdrawal",
"prefix",
prefix,
"error",
err,
)
// Stop gracefully stops the handler and flushes remaining batches continue
func (h *DBHandler) Stop() { }
close(h.stopCh)
h.wg.Wait() // TODO: Record the withdrawal in the announcements table as a withdrawal
}
} }

View File

@ -0,0 +1,272 @@
package routewatch
import (
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
// batchedDatabaseHandlerQueueSize is the queue capacity for database operations
batchedDatabaseHandlerQueueSize = 1000
// batchSize is the number of operations to batch together
batchSize = 500
// batchTimeout is the maximum time to wait before flushing a batch
batchTimeout = 5 * time.Second
)
// BatchedDatabaseHandler handles BGP messages and stores them in the database using batched operations
type BatchedDatabaseHandler struct {
db database.Store
logger *logger.Logger
// Batching
mu sync.Mutex
prefixBatch []prefixOp
asnBatch []asnOp
peeringBatch []peeringOp
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
}
type prefixOp struct {
prefix string
timestamp time.Time
}
type asnOp struct {
number int
timestamp time.Time
}
type peeringOp struct {
fromASN int
toASN int
timestamp time.Time
}
// NewBatchedDatabaseHandler creates a new batched database handler
func NewBatchedDatabaseHandler(
db database.Store,
logger *logger.Logger,
) *BatchedDatabaseHandler {
h := &BatchedDatabaseHandler{
db: db,
logger: logger,
prefixBatch: make([]prefixOp, 0, batchSize),
asnBatch: make([]asnOp, 0, batchSize),
peeringBatch: make([]peeringOp, 0, batchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
}
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
}
// WantsMessage returns true if this handler wants to process messages of the given type
func (h *BatchedDatabaseHandler) WantsMessage(messageType string) bool {
// We only care about UPDATE messages for the database
return messageType == "UPDATE"
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *BatchedDatabaseHandler) QueueCapacity() int {
// Batching allows us to use a larger queue
return batchedDatabaseHandlerQueueSize
}
// HandleMessage processes a RIS message and queues database operations
func (h *BatchedDatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp
timestamp := msg.ParsedTimestamp
// Get origin ASN from path (last element)
var originASN int
if len(msg.Path) > 0 {
originASN = msg.Path[len(msg.Path)-1]
}
h.mu.Lock()
defer h.mu.Unlock()
// Queue operations for announcements
for _, announcement := range msg.Announcements {
for _, prefix := range announcement.Prefixes {
// Queue prefix operation
h.prefixBatch = append(h.prefixBatch, prefixOp{
prefix: prefix,
timestamp: timestamp,
})
// Queue origin ASN operation
if originASN > 0 {
h.asnBatch = append(h.asnBatch, asnOp{
number: originASN,
timestamp: timestamp,
})
}
// Process AS path to queue peering operations
if len(msg.Path) > 1 {
for i := range len(msg.Path) - 1 {
fromASN := msg.Path[i]
toASN := msg.Path[i+1]
// Queue ASN operations
h.asnBatch = append(h.asnBatch, asnOp{
number: fromASN,
timestamp: timestamp,
})
h.asnBatch = append(h.asnBatch, asnOp{
number: toASN,
timestamp: timestamp,
})
// Queue peering operation
h.peeringBatch = append(h.peeringBatch, peeringOp{
fromASN: fromASN,
toASN: toASN,
timestamp: timestamp,
})
}
}
}
}
// Queue operations for withdrawals
for _, prefix := range msg.Withdrawals {
h.prefixBatch = append(h.prefixBatch, prefixOp{
prefix: prefix,
timestamp: timestamp,
})
}
// Check if we need to flush
if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize {
h.flushBatchesLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *BatchedDatabaseHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(batchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= batchTimeout {
h.flushBatchesLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchesLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchesLocked flushes all batches to the database (must be called with mutex held)
func (h *BatchedDatabaseHandler) flushBatchesLocked() {
if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 {
return
}
start := time.Now()
// Process ASNs first (deduped)
asnMap := make(map[int]time.Time)
for _, op := range h.asnBatch {
if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) {
asnMap[op.number] = op.timestamp
}
}
asnCache := make(map[int]*database.ASN)
for asn, ts := range asnMap {
asnObj, err := h.db.GetOrCreateASN(asn, ts)
if err != nil {
h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err)
continue
}
asnCache[asn] = asnObj
}
// Process prefixes (deduped)
prefixMap := make(map[string]time.Time)
for _, op := range h.prefixBatch {
if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) {
prefixMap[op.prefix] = op.timestamp
}
}
for prefix, ts := range prefixMap {
_, err := h.db.GetOrCreatePrefix(prefix, ts)
if err != nil {
h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
}
}
// Process peerings (deduped)
type peeringKey struct {
from, to int
}
peeringMap := make(map[peeringKey]time.Time)
for _, op := range h.peeringBatch {
key := peeringKey{from: op.fromASN, to: op.toASN}
if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) {
peeringMap[key] = op.timestamp
}
}
for key, ts := range peeringMap {
fromAS := asnCache[key.from]
toAS := asnCache[key.to]
if fromAS != nil && toAS != nil {
err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts)
if err != nil {
h.logger.Error("Failed to record peering",
"from_asn", key.from,
"to_asn", key.to,
"error", err,
)
}
}
}
// Clear batches
h.prefixBatch = h.prefixBatch[:0]
h.asnBatch = h.asnBatch[:0]
h.peeringBatch = h.peeringBatch[:0]
h.lastFlush = time.Now()
h.logger.Debug("Flushed database batches",
"duration", time.Since(start),
"asns", len(asnMap),
"prefixes", len(prefixMap),
"peerings", len(peeringMap),
)
}
// Stop gracefully stops the handler and flushes remaining batches
func (h *BatchedDatabaseHandler) Stop() {
close(h.stopCh)
h.wg.Wait()
}

View File

@ -2,8 +2,6 @@ package routewatch
import ( import (
"strconv" "strconv"
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger" "git.eeqj.de/sneak/routewatch/internal/logger"
@ -12,50 +10,21 @@ import (
const ( const (
// peerHandlerQueueSize is the queue capacity for peer tracking operations // peerHandlerQueueSize is the queue capacity for peer tracking operations
peerHandlerQueueSize = 50000 peerHandlerQueueSize = 500
// peerBatchSize is the number of peer updates to batch together
peerBatchSize = 500
// peerBatchTimeout is the maximum time to wait before flushing a batch
peerBatchTimeout = 5 * time.Second
) )
// PeerHandler tracks BGP peers from all message types using batched operations // PeerHandler tracks BGP peers from all message types
type PeerHandler struct { type PeerHandler struct {
db database.Store db database.Store
logger *logger.Logger logger *logger.Logger
// Batching
mu sync.Mutex
peerBatch []peerUpdate
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
} }
type peerUpdate struct { // NewPeerHandler creates a new peer tracking handler
peerIP string
peerASN int
messageType string
timestamp time.Time
}
// NewPeerHandler creates a new batched peer tracking handler
func NewPeerHandler(db database.Store, logger *logger.Logger) *PeerHandler { func NewPeerHandler(db database.Store, logger *logger.Logger) *PeerHandler {
h := &PeerHandler{ return &PeerHandler{
db: db, db: db,
logger: logger, logger: logger,
peerBatch: make([]peerUpdate, 0, peerBatchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
} }
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
} }
// WantsMessage returns true for all message types since we track peers from all messages // WantsMessage returns true for all message types since we track peers from all messages
@ -65,7 +34,7 @@ func (h *PeerHandler) WantsMessage(_ string) bool {
// QueueCapacity returns the desired queue capacity for this handler // QueueCapacity returns the desired queue capacity for this handler
func (h *PeerHandler) QueueCapacity() int { func (h *PeerHandler) QueueCapacity() int {
// Batching allows us to use a larger queue // Peer tracking is lightweight but involves database ops, use moderate queue
return peerHandlerQueueSize return peerHandlerQueueSize
} }
@ -79,81 +48,13 @@ func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) {
} }
} }
h.mu.Lock() // Update peer in database
defer h.mu.Unlock() if err := h.db.UpdatePeer(msg.Peer, peerASN, msg.Type, msg.ParsedTimestamp); err != nil {
// Add to batch
h.peerBatch = append(h.peerBatch, peerUpdate{
peerIP: msg.Peer,
peerASN: peerASN,
messageType: msg.Type,
timestamp: msg.ParsedTimestamp,
})
// Check if we need to flush
if len(h.peerBatch) >= peerBatchSize {
h.flushBatchLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *PeerHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(peerBatchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= peerBatchTimeout {
h.flushBatchLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchLocked flushes the peer batch to the database (must be called with mutex held)
func (h *PeerHandler) flushBatchLocked() {
if len(h.peerBatch) == 0 {
return
}
// Deduplicate by peer IP, keeping the latest update for each peer
peerMap := make(map[string]peerUpdate)
for _, update := range h.peerBatch {
if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) {
peerMap[update.peerIP] = update
}
}
// Apply updates
for _, update := range peerMap {
if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil {
h.logger.Error("Failed to update peer", h.logger.Error("Failed to update peer",
"peer", update.peerIP, "peer", msg.Peer,
"peer_asn", update.peerASN, "peer_asn", peerASN,
"message_type", update.messageType, "message_type", msg.Type,
"error", err, "error", err,
) )
} }
}
// Clear batch
h.peerBatch = h.peerBatch[:0]
h.lastFlush = time.Now()
}
// Stop gracefully stops the handler and flushes remaining batches
func (h *PeerHandler) Stop() {
close(h.stopCh)
h.wg.Wait()
} }

View File

@ -0,0 +1,170 @@
package routewatch
import (
"strconv"
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
const (
// batchedPeerHandlerQueueSize is the queue capacity for peer tracking operations
batchedPeerHandlerQueueSize = 2000
// peerBatchSize is the number of peer updates to batch together
peerBatchSize = 500
// peerBatchTimeout is the maximum time to wait before flushing a batch
peerBatchTimeout = 5 * time.Second
)
// BatchedPeerHandler tracks BGP peers from all message types using batched operations
type BatchedPeerHandler struct {
db database.Store
logger *logger.Logger
// Batching
mu sync.Mutex
peerBatch []peerUpdate
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
}
type peerUpdate struct {
peerIP string
peerASN int
messageType string
timestamp time.Time
}
// NewBatchedPeerHandler creates a new batched peer tracking handler
func NewBatchedPeerHandler(db database.Store, logger *logger.Logger) *BatchedPeerHandler {
h := &BatchedPeerHandler{
db: db,
logger: logger,
peerBatch: make([]peerUpdate, 0, peerBatchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
}
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
}
// WantsMessage returns true for all message types since we track peers from all messages
func (h *BatchedPeerHandler) WantsMessage(_ string) bool {
return true
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *BatchedPeerHandler) QueueCapacity() int {
// Batching allows us to use a larger queue
return batchedPeerHandlerQueueSize
}
// HandleMessage processes a message to track peer information
func (h *BatchedPeerHandler) HandleMessage(msg *ristypes.RISMessage) {
// Parse peer ASN from string
peerASN := 0
if msg.PeerASN != "" {
if asn, err := strconv.Atoi(msg.PeerASN); err == nil {
peerASN = asn
}
}
h.mu.Lock()
defer h.mu.Unlock()
// Add to batch
h.peerBatch = append(h.peerBatch, peerUpdate{
peerIP: msg.Peer,
peerASN: peerASN,
messageType: msg.Type,
timestamp: msg.ParsedTimestamp,
})
// Check if we need to flush
if len(h.peerBatch) >= peerBatchSize {
h.flushBatchLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *BatchedPeerHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(peerBatchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= peerBatchTimeout {
h.flushBatchLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchLocked flushes the peer batch to the database (must be called with mutex held)
func (h *BatchedPeerHandler) flushBatchLocked() {
if len(h.peerBatch) == 0 {
return
}
start := time.Now()
// Deduplicate by peer IP, keeping the latest update for each peer
peerMap := make(map[string]peerUpdate)
for _, update := range h.peerBatch {
if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) {
peerMap[update.peerIP] = update
}
}
// Apply updates
successCount := 0
for _, update := range peerMap {
if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil {
h.logger.Error("Failed to update peer",
"peer", update.peerIP,
"peer_asn", update.peerASN,
"message_type", update.messageType,
"error", err,
)
} else {
successCount++
}
}
// Clear batch
h.peerBatch = h.peerBatch[:0]
h.lastFlush = time.Now()
h.logger.Debug("Flushed peer batch",
"duration", time.Since(start),
"total_updates", len(peerMap),
"successful", successCount,
)
}
// Stop gracefully stops the handler and flushes remaining batches
func (h *BatchedPeerHandler) Stop() {
close(h.stopCh)
h.wg.Wait()
}

View File

@ -1,273 +0,0 @@
package routewatch
import (
"net"
"strings"
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
"github.com/google/uuid"
)
const (
// prefixHandlerQueueSize is the queue capacity for prefix tracking operations
prefixHandlerQueueSize = 50000
// prefixBatchSize is the number of prefix updates to batch together
prefixBatchSize = 2000
// prefixBatchTimeout is the maximum time to wait before flushing a batch
prefixBatchTimeout = 5 * time.Second
// IP version constants
ipv4Version = 4
ipv6Version = 6
)
// PrefixHandler tracks BGP prefixes and maintains a live routing table in the database.
// Routes are added on announcement and deleted on withdrawal.
type PrefixHandler struct {
db database.Store
logger *logger.Logger
// Batching
mu sync.Mutex
batch []prefixUpdate
lastFlush time.Time
stopCh chan struct{}
wg sync.WaitGroup
}
type prefixUpdate struct {
prefix string
originASN int
peer string
messageType string
timestamp time.Time
path []int
}
// NewPrefixHandler creates a new batched prefix tracking handler
func NewPrefixHandler(db database.Store, logger *logger.Logger) *PrefixHandler {
h := &PrefixHandler{
db: db,
logger: logger,
batch: make([]prefixUpdate, 0, prefixBatchSize),
lastFlush: time.Now(),
stopCh: make(chan struct{}),
}
// Start the flush timer goroutine
h.wg.Add(1)
go h.flushLoop()
return h
}
// WantsMessage returns true if this handler wants to process messages of the given type
func (h *PrefixHandler) WantsMessage(messageType string) bool {
// We only care about UPDATE messages for the routing table
return messageType == "UPDATE"
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *PrefixHandler) QueueCapacity() int {
// Batching allows us to use a larger queue
return prefixHandlerQueueSize
}
// HandleMessage processes a message to track prefix information
func (h *PrefixHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp
timestamp := msg.ParsedTimestamp
// Get origin ASN from path (last element)
var originASN int
if len(msg.Path) > 0 {
originASN = msg.Path[len(msg.Path)-1]
}
h.mu.Lock()
defer h.mu.Unlock()
// Process announcements
for _, announcement := range msg.Announcements {
for _, prefix := range announcement.Prefixes {
h.batch = append(h.batch, prefixUpdate{
prefix: prefix,
originASN: originASN,
peer: msg.Peer,
messageType: "announcement",
timestamp: timestamp,
path: msg.Path,
})
}
}
// Process withdrawals
for _, prefix := range msg.Withdrawals {
h.batch = append(h.batch, prefixUpdate{
prefix: prefix,
originASN: originASN, // Use the originASN from path if available
peer: msg.Peer,
messageType: "withdrawal",
timestamp: timestamp,
path: msg.Path,
})
}
// Check if we need to flush
if len(h.batch) >= prefixBatchSize {
h.flushBatchLocked()
}
}
// flushLoop runs in a goroutine and periodically flushes batches
func (h *PrefixHandler) flushLoop() {
defer h.wg.Done()
ticker := time.NewTicker(prefixBatchTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.mu.Lock()
if time.Since(h.lastFlush) >= prefixBatchTimeout {
h.flushBatchLocked()
}
h.mu.Unlock()
case <-h.stopCh:
// Final flush
h.mu.Lock()
h.flushBatchLocked()
h.mu.Unlock()
return
}
}
}
// flushBatchLocked flushes the prefix batch to the database (must be called with mutex held)
func (h *PrefixHandler) flushBatchLocked() {
if len(h.batch) == 0 {
return
}
// Group updates by prefix to deduplicate
// For each prefix, keep the latest update
prefixMap := make(map[string]prefixUpdate)
for _, update := range h.batch {
key := update.prefix
if existing, ok := prefixMap[key]; !ok || update.timestamp.After(existing.timestamp) {
prefixMap[key] = update
}
}
// Apply updates to database
for _, update := range prefixMap {
// Get or create prefix
prefix, err := h.db.GetOrCreatePrefix(update.prefix, update.timestamp)
if err != nil {
h.logger.Error("Failed to get/create prefix",
"prefix", update.prefix,
"error", err,
)
continue
}
// For announcements, get ASN info and create announcement record
if update.messageType == "announcement" && update.originASN > 0 {
h.processAnnouncement(prefix, update)
} else if update.messageType == "withdrawal" {
h.processWithdrawal(prefix, update)
}
}
// Clear batch
h.batch = h.batch[:0]
h.lastFlush = time.Now()
}
// parseCIDR extracts the mask length and IP version from a prefix string
func parseCIDR(prefix string) (maskLength int, ipVersion int, err error) {
_, ipNet, err := net.ParseCIDR(prefix)
if err != nil {
return 0, 0, err
}
ones, _ := ipNet.Mask.Size()
if strings.Contains(prefix, ":") {
return ones, ipv6Version, nil
}
return ones, ipv4Version, nil
}
// processAnnouncement handles storing an announcement in the database
func (h *PrefixHandler) processAnnouncement(_ *database.Prefix, update prefixUpdate) {
// Parse CIDR to get mask length
maskLength, ipVersion, err := parseCIDR(update.prefix)
if err != nil {
h.logger.Error("Failed to parse CIDR",
"prefix", update.prefix,
"error", err,
)
return
}
// Create live route record
liveRoute := &database.LiveRoute{
ID: uuid.New(),
Prefix: update.prefix,
MaskLength: maskLength,
IPVersion: ipVersion,
OriginASN: update.originASN,
PeerIP: update.peer,
ASPath: update.path,
NextHop: update.peer, // Using peer as next hop
LastUpdated: update.timestamp,
}
if err := h.db.UpsertLiveRoute(liveRoute); err != nil {
h.logger.Error("Failed to upsert live route",
"prefix", update.prefix,
"error", err,
)
}
}
// processWithdrawal handles removing a route from the live routing table
func (h *PrefixHandler) processWithdrawal(_ *database.Prefix, update prefixUpdate) {
// For withdrawals, we need to delete the route from live_routes
// Since we have the origin ASN from the update, we can delete the specific route
if update.originASN > 0 {
if err := h.db.DeleteLiveRoute(update.prefix, update.originASN, update.peer); err != nil {
h.logger.Error("Failed to delete live route",
"prefix", update.prefix,
"origin_asn", update.originASN,
"peer", update.peer,
"error", err,
)
}
} else {
// If no origin ASN, just delete all routes for this prefix from this peer
if err := h.db.DeleteLiveRoute(update.prefix, 0, update.peer); err != nil {
h.logger.Error("Failed to delete live route (no origin ASN)",
"prefix", update.prefix,
"peer", update.peer,
"error", err,
)
}
}
}
// Stop gracefully stops the handler and flushes remaining batches
func (h *PrefixHandler) Stop() {
close(h.stopCh)
h.wg.Wait()
}

View File

@ -65,11 +65,6 @@ type RoutingTable struct {
// Configuration // Configuration
snapshotDir string snapshotDir string
routeExpirationTimeout time.Duration
logger *logger.Logger
// Expiration management
stopExpiration chan struct{}
} }
// New creates a new routing table, loading from snapshot if available // New creates a new routing table, loading from snapshot if available
@ -81,9 +76,6 @@ func New(cfg *config.Config, logger *logger.Logger) *RoutingTable {
byPeerASN: make(map[int]map[RouteKey]*Route), byPeerASN: make(map[int]map[RouteKey]*Route),
lastMetricsReset: time.Now(), lastMetricsReset: time.Now(),
snapshotDir: cfg.GetStateDir(), snapshotDir: cfg.GetStateDir(),
routeExpirationTimeout: cfg.RouteExpirationTimeout,
logger: logger,
stopExpiration: make(chan struct{}),
} }
// Try to load from snapshot // Try to load from snapshot
@ -91,9 +83,6 @@ func New(cfg *config.Config, logger *logger.Logger) *RoutingTable {
logger.Warn("Failed to load routing table from snapshot", "error", err) logger.Warn("Failed to load routing table from snapshot", "error", err)
} }
// Start expiration goroutine
go rt.expireRoutesLoop()
return rt return rt
} }
@ -533,72 +522,3 @@ func (rt *RoutingTable) loadFromSnapshot(logger *logger.Logger) error {
return nil return nil
} }
// expireRoutesLoop periodically removes expired routes
func (rt *RoutingTable) expireRoutesLoop() {
// Run every minute to check for expired routes
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rt.expireStaleRoutes()
case <-rt.stopExpiration:
return
}
}
}
// expireStaleRoutes removes routes that haven't been updated recently
func (rt *RoutingTable) expireStaleRoutes() {
rt.mu.Lock()
defer rt.mu.Unlock()
now := time.Now().UTC()
cutoffTime := now.Add(-rt.routeExpirationTimeout)
expiredCount := 0
// Collect keys to delete (can't delete while iterating)
var keysToDelete []RouteKey
for key, route := range rt.routes {
// Use AnnouncedAt as the last update time
if route.AnnouncedAt.Before(cutoffTime) {
keysToDelete = append(keysToDelete, key)
}
}
// Delete expired routes
for _, key := range keysToDelete {
route, exists := rt.routes[key]
if !exists {
continue
}
rt.removeFromIndexes(key, route)
delete(rt.routes, key)
expiredCount++
// Update metrics
if isIPv6(route.Prefix) {
rt.ipv6Routes--
} else {
rt.ipv4Routes--
}
}
if expiredCount > 0 {
rt.logger.Info("Expired stale routes",
"count", expiredCount,
"timeout", rt.routeExpirationTimeout,
"remaining_routes", len(rt.routes),
)
}
}
// Stop gracefully stops the routing table background tasks
func (rt *RoutingTable) Stop() {
if rt.stopExpiration != nil {
close(rt.stopExpiration)
}
}

View File

@ -131,8 +131,6 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
IPv6Routes int `json:"ipv6_routes"` IPv6Routes int `json:"ipv6_routes"`
IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"` IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"`
IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"` IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"`
IPv4PrefixDistribution []database.PrefixDistribution `json:"ipv4_prefix_distribution"`
IPv6PrefixDistribution []database.PrefixDistribution `json:"ipv6_prefix_distribution"`
} }
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
@ -147,6 +145,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
errChan := make(chan error) errChan := make(chan error)
go func() { go func() {
s.logger.Debug("Starting database stats query")
dbStats, err := s.db.GetStats() dbStats, err := s.db.GetStats()
if err != nil { if err != nil {
s.logger.Debug("Database stats query failed", "error", err) s.logger.Debug("Database stats query failed", "error", err)
@ -154,6 +153,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
return return
} }
s.logger.Debug("Database stats query completed")
statsChan <- dbStats statsChan <- dbStats
}() }()
@ -212,13 +212,11 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
IPv6Prefixes: dbStats.IPv6Prefixes, IPv6Prefixes: dbStats.IPv6Prefixes,
Peerings: dbStats.Peerings, Peerings: dbStats.Peerings,
DatabaseSizeBytes: dbStats.FileSizeBytes, DatabaseSizeBytes: dbStats.FileSizeBytes,
LiveRoutes: dbStats.LiveRoutes, LiveRoutes: rtStats.TotalRoutes,
IPv4Routes: rtStats.IPv4Routes, IPv4Routes: rtStats.IPv4Routes,
IPv6Routes: rtStats.IPv6Routes, IPv6Routes: rtStats.IPv6Routes,
IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate, IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate,
IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate, IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate,
IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution,
IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution,
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@ -266,8 +264,6 @@ func (s *Server) handleStats() http.HandlerFunc {
IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"` IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"`
IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"` IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"`
HandlerStats []HandlerStatsInfo `json:"handler_stats"` HandlerStats []HandlerStatsInfo `json:"handler_stats"`
IPv4PrefixDistribution []database.PrefixDistribution `json:"ipv4_prefix_distribution"`
IPv6PrefixDistribution []database.PrefixDistribution `json:"ipv6_prefix_distribution"`
} }
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
@ -291,6 +287,7 @@ func (s *Server) handleStats() http.HandlerFunc {
errChan := make(chan error) errChan := make(chan error)
go func() { go func() {
s.logger.Debug("Starting database stats query")
dbStats, err := s.db.GetStats() dbStats, err := s.db.GetStats()
if err != nil { if err != nil {
s.logger.Debug("Database stats query failed", "error", err) s.logger.Debug("Database stats query failed", "error", err)
@ -298,6 +295,7 @@ func (s *Server) handleStats() http.HandlerFunc {
return return
} }
s.logger.Debug("Database stats query completed")
statsChan <- dbStats statsChan <- dbStats
}() }()
@ -357,14 +355,12 @@ func (s *Server) handleStats() http.HandlerFunc {
IPv6Prefixes: dbStats.IPv6Prefixes, IPv6Prefixes: dbStats.IPv6Prefixes,
Peerings: dbStats.Peerings, Peerings: dbStats.Peerings,
DatabaseSizeBytes: dbStats.FileSizeBytes, DatabaseSizeBytes: dbStats.FileSizeBytes,
LiveRoutes: dbStats.LiveRoutes, LiveRoutes: rtStats.TotalRoutes,
IPv4Routes: rtStats.IPv4Routes, IPv4Routes: rtStats.IPv4Routes,
IPv6Routes: rtStats.IPv6Routes, IPv6Routes: rtStats.IPv6Routes,
IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate, IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate,
IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate, IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate,
HandlerStats: handlerStatsInfo, HandlerStats: handlerStatsInfo,
IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution,
IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution,
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")

View File

@ -153,22 +153,6 @@
</div> </div>
</div> </div>
<div class="status-grid">
<div class="status-card">
<h2>IPv4 Prefix Distribution</h2>
<div id="ipv4-prefix-distribution">
<!-- Will be populated dynamically -->
</div>
</div>
<div class="status-card">
<h2>IPv6 Prefix Distribution</h2>
<div id="ipv6-prefix-distribution">
<!-- Will be populated dynamically -->
</div>
</div>
</div>
<div id="handler-stats-container" class="status-grid"> <div id="handler-stats-container" class="status-grid">
<!-- Handler stats will be dynamically added here --> <!-- Handler stats will be dynamically added here -->
</div> </div>
@ -186,29 +170,6 @@
return num.toLocaleString(); return num.toLocaleString();
} }
function updatePrefixDistribution(elementId, distribution) {
const container = document.getElementById(elementId);
container.innerHTML = '';
if (!distribution || distribution.length === 0) {
container.innerHTML = '<div class="metric"><span class="metric-label">No data</span></div>';
return;
}
// Sort by mask length
distribution.sort((a, b) => a.mask_length - b.mask_length);
distribution.forEach(item => {
const metric = document.createElement('div');
metric.className = 'metric';
metric.innerHTML = `
<span class="metric-label">/${item.mask_length}</span>
<span class="metric-value">${formatNumber(item.count)}</span>
`;
container.appendChild(metric);
});
}
function updateHandlerStats(handlerStats) { function updateHandlerStats(handlerStats) {
const container = document.getElementById('handler-stats-container'); const container = document.getElementById('handler-stats-container');
container.innerHTML = ''; container.innerHTML = '';
@ -288,10 +249,6 @@
// Update handler stats // Update handler stats
updateHandlerStats(data.handler_stats || []); updateHandlerStats(data.handler_stats || []);
// Update prefix distribution
updatePrefixDistribution('ipv4-prefix-distribution', data.ipv4_prefix_distribution);
updatePrefixDistribution('ipv6-prefix-distribution', data.ipv6_prefix_distribution);
// Clear any errors // Clear any errors
document.getElementById('error').style.display = 'none'; document.getElementById('error').style.display = 'none';
}) })