Compare commits
3 Commits
3aef3f9a07
...
eda90d96a9
Author | SHA1 | Date | |
---|---|---|---|
eda90d96a9 | |||
3c46087976 | |||
cea7c3dfd3 |
@ -15,6 +15,9 @@ const (
|
||||
|
||||
// dirPermissions for creating directories
|
||||
dirPermissions = 0750 // rwxr-x---
|
||||
|
||||
// defaultRouteExpirationMinutes is the default route expiration timeout in minutes
|
||||
defaultRouteExpirationMinutes = 5
|
||||
)
|
||||
|
||||
// Config holds configuration for the entire application
|
||||
@ -27,6 +30,10 @@ type Config struct {
|
||||
|
||||
// EnableBatchedDatabaseWrites enables batched database operations for better performance
|
||||
EnableBatchedDatabaseWrites bool
|
||||
|
||||
// RouteExpirationTimeout is how long a route can go without being refreshed before expiring
|
||||
// Default is 2 hours which is conservative for BGP (typical BGP hold time is 90-180 seconds)
|
||||
RouteExpirationTimeout time.Duration
|
||||
}
|
||||
|
||||
// New creates a new Config with default paths based on the OS
|
||||
@ -38,8 +45,9 @@ func New() (*Config, error) {
|
||||
|
||||
return &Config{
|
||||
StateDir: stateDir,
|
||||
MaxRuntime: 0, // Run forever by default
|
||||
EnableBatchedDatabaseWrites: true, // Enable batching by default for better performance
|
||||
MaxRuntime: 0, // Run forever by default
|
||||
EnableBatchedDatabaseWrites: true, // Enable batching by default
|
||||
RouteExpirationTimeout: defaultRouteExpirationMinutes * time.Minute, // For active route monitoring
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@ package database
|
||||
import (
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -387,5 +388,120 @@ func (d *Database) GetStats() (Stats, error) {
|
||||
stats.FileSizeBytes = fileInfo.Size()
|
||||
}
|
||||
|
||||
// Get live routes count
|
||||
err = d.db.QueryRow("SELECT COUNT(*) FROM live_routes").Scan(&stats.LiveRoutes)
|
||||
if err != nil {
|
||||
return stats, fmt.Errorf("failed to count live routes: %w", err)
|
||||
}
|
||||
|
||||
// Get prefix distribution
|
||||
stats.IPv4PrefixDistribution, stats.IPv6PrefixDistribution, err = d.GetPrefixDistribution()
|
||||
if err != nil {
|
||||
// Log but don't fail
|
||||
d.logger.Warn("Failed to get prefix distribution", "error", err)
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// UpsertLiveRoute inserts or updates a live route
|
||||
func (d *Database) UpsertLiveRoute(route *LiveRoute) error {
|
||||
query := `
|
||||
INSERT INTO live_routes (id, prefix, mask_length, ip_version, origin_asn, peer_ip, as_path, next_hop, last_updated)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(prefix, origin_asn, peer_ip) DO UPDATE SET
|
||||
mask_length = excluded.mask_length,
|
||||
ip_version = excluded.ip_version,
|
||||
as_path = excluded.as_path,
|
||||
next_hop = excluded.next_hop,
|
||||
last_updated = excluded.last_updated
|
||||
`
|
||||
|
||||
// Encode AS path as JSON
|
||||
pathJSON, err := json.Marshal(route.ASPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to encode AS path: %w", err)
|
||||
}
|
||||
|
||||
_, err = d.db.Exec(query,
|
||||
route.ID.String(),
|
||||
route.Prefix,
|
||||
route.MaskLength,
|
||||
route.IPVersion,
|
||||
route.OriginASN,
|
||||
route.PeerIP,
|
||||
string(pathJSON),
|
||||
route.NextHop,
|
||||
route.LastUpdated,
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteLiveRoute deletes a live route
|
||||
// If originASN is 0, deletes all routes for the prefix/peer combination
|
||||
func (d *Database) DeleteLiveRoute(prefix string, originASN int, peerIP string) error {
|
||||
var query string
|
||||
var err error
|
||||
|
||||
if originASN == 0 {
|
||||
// Delete all routes for this prefix from this peer
|
||||
query = `DELETE FROM live_routes WHERE prefix = ? AND peer_ip = ?`
|
||||
_, err = d.db.Exec(query, prefix, peerIP)
|
||||
} else {
|
||||
// Delete specific route
|
||||
query = `DELETE FROM live_routes WHERE prefix = ? AND origin_asn = ? AND peer_ip = ?`
|
||||
_, err = d.db.Exec(query, prefix, originASN, peerIP)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// GetPrefixDistribution returns the distribution of prefixes by mask length
|
||||
func (d *Database) GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error) {
|
||||
// IPv4 distribution
|
||||
query := `
|
||||
SELECT mask_length, COUNT(*) as count
|
||||
FROM live_routes
|
||||
WHERE ip_version = 4
|
||||
GROUP BY mask_length
|
||||
ORDER BY mask_length
|
||||
`
|
||||
rows, err := d.db.Query(query)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to query IPv4 distribution: %w", err)
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
|
||||
for rows.Next() {
|
||||
var dist PrefixDistribution
|
||||
if err := rows.Scan(&dist.MaskLength, &dist.Count); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to scan IPv4 distribution: %w", err)
|
||||
}
|
||||
ipv4 = append(ipv4, dist)
|
||||
}
|
||||
|
||||
// IPv6 distribution
|
||||
query = `
|
||||
SELECT mask_length, COUNT(*) as count
|
||||
FROM live_routes
|
||||
WHERE ip_version = 6
|
||||
GROUP BY mask_length
|
||||
ORDER BY mask_length
|
||||
`
|
||||
rows, err = d.db.Query(query)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to query IPv6 distribution: %w", err)
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
|
||||
for rows.Next() {
|
||||
var dist PrefixDistribution
|
||||
if err := rows.Scan(&dist.MaskLength, &dist.Count); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to scan IPv6 distribution: %w", err)
|
||||
}
|
||||
ipv6 = append(ipv6, dist)
|
||||
}
|
||||
|
||||
return ipv4, ipv6, nil
|
||||
}
|
||||
|
@ -6,12 +6,15 @@ import (
|
||||
|
||||
// Stats contains database statistics
|
||||
type Stats struct {
|
||||
ASNs int
|
||||
Prefixes int
|
||||
IPv4Prefixes int
|
||||
IPv6Prefixes int
|
||||
Peerings int
|
||||
FileSizeBytes int64
|
||||
ASNs int
|
||||
Prefixes int
|
||||
IPv4Prefixes int
|
||||
IPv6Prefixes int
|
||||
Peerings int
|
||||
FileSizeBytes int64
|
||||
LiveRoutes int
|
||||
IPv4PrefixDistribution []PrefixDistribution
|
||||
IPv6PrefixDistribution []PrefixDistribution
|
||||
}
|
||||
|
||||
// Store defines the interface for database operations
|
||||
@ -34,6 +37,11 @@ type Store interface {
|
||||
// Peer operations
|
||||
UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error
|
||||
|
||||
// Live route operations
|
||||
UpsertLiveRoute(route *LiveRoute) error
|
||||
DeleteLiveRoute(prefix string, originASN int, peerIP string) error
|
||||
GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error)
|
||||
|
||||
// Lifecycle
|
||||
Close() error
|
||||
}
|
||||
|
@ -45,3 +45,22 @@ type ASNPeering struct {
|
||||
FirstSeen time.Time `json:"first_seen"`
|
||||
LastSeen time.Time `json:"last_seen"`
|
||||
}
|
||||
|
||||
// LiveRoute represents a route in the live routing table
|
||||
type LiveRoute struct {
|
||||
ID uuid.UUID `json:"id"`
|
||||
Prefix string `json:"prefix"`
|
||||
MaskLength int `json:"mask_length"`
|
||||
IPVersion int `json:"ip_version"`
|
||||
OriginASN int `json:"origin_asn"`
|
||||
PeerIP string `json:"peer_ip"`
|
||||
ASPath []int `json:"as_path"`
|
||||
NextHop string `json:"next_hop"`
|
||||
LastUpdated time.Time `json:"last_updated"`
|
||||
}
|
||||
|
||||
// PrefixDistribution represents the distribution of prefixes by mask length
|
||||
type PrefixDistribution struct {
|
||||
MaskLength int `json:"mask_length"`
|
||||
Count int `json:"count"`
|
||||
}
|
||||
|
@ -68,4 +68,24 @@ CREATE INDEX IF NOT EXISTS idx_asns_number ON asns(number);
|
||||
-- Indexes for bgp_peers table
|
||||
CREATE INDEX IF NOT EXISTS idx_bgp_peers_asn ON bgp_peers(peer_asn);
|
||||
CREATE INDEX IF NOT EXISTS idx_bgp_peers_last_seen ON bgp_peers(last_seen);
|
||||
CREATE INDEX IF NOT EXISTS idx_bgp_peers_ip ON bgp_peers(peer_ip);
|
||||
CREATE INDEX IF NOT EXISTS idx_bgp_peers_ip ON bgp_peers(peer_ip);
|
||||
|
||||
-- Live routing table maintained by PrefixHandler
|
||||
CREATE TABLE IF NOT EXISTS live_routes (
|
||||
id TEXT PRIMARY KEY,
|
||||
prefix TEXT NOT NULL,
|
||||
mask_length INTEGER NOT NULL, -- CIDR mask length (0-32 for IPv4, 0-128 for IPv6)
|
||||
ip_version INTEGER NOT NULL, -- 4 or 6
|
||||
origin_asn INTEGER NOT NULL,
|
||||
peer_ip TEXT NOT NULL,
|
||||
as_path TEXT NOT NULL, -- JSON array
|
||||
next_hop TEXT NOT NULL,
|
||||
last_updated DATETIME NOT NULL,
|
||||
UNIQUE(prefix, origin_asn, peer_ip)
|
||||
);
|
||||
|
||||
-- Indexes for live_routes table
|
||||
CREATE INDEX IF NOT EXISTS idx_live_routes_prefix ON live_routes(prefix);
|
||||
CREATE INDEX IF NOT EXISTS idx_live_routes_mask_length ON live_routes(mask_length);
|
||||
CREATE INDEX IF NOT EXISTS idx_live_routes_ip_version_mask ON live_routes(ip_version, mask_length);
|
||||
CREATE INDEX IF NOT EXISTS idx_live_routes_last_updated ON live_routes(last_updated);
|
@ -40,18 +40,19 @@ type Dependencies struct {
|
||||
|
||||
// RouteWatch represents the main application instance
|
||||
type RouteWatch struct {
|
||||
db database.Store
|
||||
routingTable *routingtable.RoutingTable
|
||||
streamer *streamer.Streamer
|
||||
server *server.Server
|
||||
snapshotter *snapshotter.Snapshotter
|
||||
logger *logger.Logger
|
||||
maxRuntime time.Duration
|
||||
shutdown bool
|
||||
mu sync.Mutex
|
||||
config *config.Config
|
||||
batchedDBHandler *BatchedDatabaseHandler
|
||||
batchedPeerHandler *BatchedPeerHandler
|
||||
db database.Store
|
||||
routingTable *routingtable.RoutingTable
|
||||
streamer *streamer.Streamer
|
||||
server *server.Server
|
||||
snapshotter *snapshotter.Snapshotter
|
||||
logger *logger.Logger
|
||||
maxRuntime time.Duration
|
||||
shutdown bool
|
||||
mu sync.Mutex
|
||||
config *config.Config
|
||||
dbHandler *DBHandler
|
||||
peerHandler *PeerHandler
|
||||
prefixHandler *PrefixHandler
|
||||
}
|
||||
|
||||
// isTruthy returns true if the value is considered truthy
|
||||
@ -106,17 +107,19 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
|
||||
// Register database handler to process BGP UPDATE messages
|
||||
if rw.config.EnableBatchedDatabaseWrites {
|
||||
rw.logger.Info("Using batched database handlers for improved performance")
|
||||
rw.batchedDBHandler = NewBatchedDatabaseHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(rw.batchedDBHandler)
|
||||
rw.dbHandler = NewDBHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(rw.dbHandler)
|
||||
|
||||
rw.batchedPeerHandler = NewBatchedPeerHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(rw.batchedPeerHandler)
|
||||
rw.peerHandler = NewPeerHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(rw.peerHandler)
|
||||
|
||||
rw.prefixHandler = NewPrefixHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(rw.prefixHandler)
|
||||
} else {
|
||||
dbHandler := NewDatabaseHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(dbHandler)
|
||||
// Non-batched handlers not implemented yet
|
||||
rw.logger.Error("Non-batched handlers not implemented")
|
||||
|
||||
peerHandler := NewPeerHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(peerHandler)
|
||||
return fmt.Errorf("non-batched handlers not implemented")
|
||||
}
|
||||
|
||||
// Register routing table handler to maintain in-memory routing table
|
||||
@ -159,18 +162,25 @@ func (rw *RouteWatch) Shutdown() {
|
||||
rw.mu.Unlock()
|
||||
|
||||
// Stop batched handlers first to flush remaining batches
|
||||
if rw.batchedDBHandler != nil {
|
||||
rw.logger.Info("Flushing batched database handler")
|
||||
rw.batchedDBHandler.Stop()
|
||||
if rw.dbHandler != nil {
|
||||
rw.logger.Info("Flushing database handler")
|
||||
rw.dbHandler.Stop()
|
||||
}
|
||||
if rw.batchedPeerHandler != nil {
|
||||
rw.logger.Info("Flushing batched peer handler")
|
||||
rw.batchedPeerHandler.Stop()
|
||||
if rw.peerHandler != nil {
|
||||
rw.logger.Info("Flushing peer handler")
|
||||
rw.peerHandler.Stop()
|
||||
}
|
||||
if rw.prefixHandler != nil {
|
||||
rw.logger.Info("Flushing prefix handler")
|
||||
rw.prefixHandler.Stop()
|
||||
}
|
||||
|
||||
// Stop services
|
||||
rw.streamer.Stop()
|
||||
|
||||
// Stop routing table expiration
|
||||
rw.routingTable.Stop()
|
||||
|
||||
// Stop HTTP server with a timeout
|
||||
const serverStopTimeout = 5 * time.Second
|
||||
stopCtx, cancel := context.WithTimeout(context.Background(), serverStopTimeout)
|
||||
|
@ -157,6 +157,24 @@ func (m *mockStore) GetStats() (database.Stats, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// UpsertLiveRoute mock implementation
|
||||
func (m *mockStore) UpsertLiveRoute(route *database.LiveRoute) error {
|
||||
// Simple mock - just return nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteLiveRoute mock implementation
|
||||
func (m *mockStore) DeleteLiveRoute(prefix string, originASN int, peerIP string) error {
|
||||
// Simple mock - just return nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPrefixDistribution mock implementation
|
||||
func (m *mockStore) GetPrefixDistribution() (ipv4 []database.PrefixDistribution, ipv6 []database.PrefixDistribution, err error) {
|
||||
// Return empty distributions for now
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func TestRouteWatchLiveFeed(t *testing.T) {
|
||||
// Disable snapshotter for tests
|
||||
t.Setenv("ROUTEWATCH_DISABLE_SNAPSHOTTER", "1")
|
||||
@ -175,8 +193,9 @@ func TestRouteWatchLiveFeed(t *testing.T) {
|
||||
|
||||
// Create test config with empty state dir (no snapshot loading)
|
||||
cfg := &config.Config{
|
||||
StateDir: "",
|
||||
MaxRuntime: 5 * time.Second,
|
||||
StateDir: "",
|
||||
MaxRuntime: 5 * time.Second,
|
||||
EnableBatchedDatabaseWrites: true,
|
||||
}
|
||||
|
||||
// Create routing table
|
||||
|
@ -1,47 +1,92 @@
|
||||
package routewatch
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||
"git.eeqj.de/sneak/routewatch/internal/logger"
|
||||
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||
)
|
||||
|
||||
const (
|
||||
// databaseHandlerQueueSize is the queue capacity for database operations
|
||||
databaseHandlerQueueSize = 200
|
||||
// dbHandlerQueueSize is the queue capacity for database operations
|
||||
dbHandlerQueueSize = 50000
|
||||
|
||||
// batchSize is the number of operations to batch together
|
||||
batchSize = 32000
|
||||
|
||||
// batchTimeout is the maximum time to wait before flushing a batch
|
||||
batchTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// DatabaseHandler handles BGP messages and stores them in the database
|
||||
type DatabaseHandler struct {
|
||||
// DBHandler handles BGP messages and stores them in the database using batched operations
|
||||
type DBHandler struct {
|
||||
db database.Store
|
||||
logger *logger.Logger
|
||||
|
||||
// Batching
|
||||
mu sync.Mutex
|
||||
prefixBatch []prefixOp
|
||||
asnBatch []asnOp
|
||||
peeringBatch []peeringOp
|
||||
lastFlush time.Time
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewDatabaseHandler creates a new database handler
|
||||
func NewDatabaseHandler(
|
||||
type prefixOp struct {
|
||||
prefix string
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
type asnOp struct {
|
||||
number int
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
type peeringOp struct {
|
||||
fromASN int
|
||||
toASN int
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// NewDBHandler creates a new batched database handler
|
||||
func NewDBHandler(
|
||||
db database.Store,
|
||||
logger *logger.Logger,
|
||||
) *DatabaseHandler {
|
||||
return &DatabaseHandler{
|
||||
db: db,
|
||||
logger: logger,
|
||||
) *DBHandler {
|
||||
h := &DBHandler{
|
||||
db: db,
|
||||
logger: logger,
|
||||
prefixBatch: make([]prefixOp, 0, batchSize),
|
||||
asnBatch: make([]asnOp, 0, batchSize),
|
||||
peeringBatch: make([]peeringOp, 0, batchSize),
|
||||
lastFlush: time.Now(),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start the flush timer goroutine
|
||||
h.wg.Add(1)
|
||||
go h.flushLoop()
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
// WantsMessage returns true if this handler wants to process messages of the given type
|
||||
func (h *DatabaseHandler) WantsMessage(messageType string) bool {
|
||||
func (h *DBHandler) WantsMessage(messageType string) bool {
|
||||
// We only care about UPDATE messages for the database
|
||||
return messageType == "UPDATE"
|
||||
}
|
||||
|
||||
// QueueCapacity returns the desired queue capacity for this handler
|
||||
func (h *DatabaseHandler) QueueCapacity() int {
|
||||
// Database operations are slow, so use a smaller queue
|
||||
return databaseHandlerQueueSize
|
||||
func (h *DBHandler) QueueCapacity() int {
|
||||
// Batching allows us to use a larger queue
|
||||
return dbHandlerQueueSize
|
||||
}
|
||||
|
||||
// HandleMessage processes a RIS message and updates the database
|
||||
func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
// HandleMessage processes a RIS message and queues database operations
|
||||
func (h *DBHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
// Use the pre-parsed timestamp
|
||||
timestamp := msg.ParsedTimestamp
|
||||
|
||||
@ -51,105 +96,168 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
originASN = msg.Path[len(msg.Path)-1]
|
||||
}
|
||||
|
||||
// Process announcements
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Queue operations for announcements
|
||||
for _, announcement := range msg.Announcements {
|
||||
for _, prefix := range announcement.Prefixes {
|
||||
// Get or create prefix
|
||||
_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
|
||||
if err != nil {
|
||||
h.logger.Error(
|
||||
"Failed to get/create prefix",
|
||||
"prefix",
|
||||
prefix,
|
||||
"error",
|
||||
err,
|
||||
)
|
||||
// Queue prefix operation
|
||||
h.prefixBatch = append(h.prefixBatch, prefixOp{
|
||||
prefix: prefix,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
|
||||
continue
|
||||
// Queue origin ASN operation
|
||||
if originASN > 0 {
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: originASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
// Get or create origin ASN
|
||||
_, err = h.db.GetOrCreateASN(originASN, timestamp)
|
||||
if err != nil {
|
||||
h.logger.Error(
|
||||
"Failed to get/create ASN",
|
||||
"asn",
|
||||
originASN,
|
||||
"error",
|
||||
err,
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: Record the announcement in the announcements table
|
||||
// Process AS path to update peerings
|
||||
// Process AS path to queue peering operations
|
||||
if len(msg.Path) > 1 {
|
||||
for i := range len(msg.Path) - 1 {
|
||||
fromASN := msg.Path[i]
|
||||
toASN := msg.Path[i+1]
|
||||
|
||||
// Get or create both ASNs
|
||||
fromAS, err := h.db.GetOrCreateASN(fromASN, timestamp)
|
||||
if err != nil {
|
||||
h.logger.Error(
|
||||
"Failed to get/create from ASN",
|
||||
"asn",
|
||||
fromASN,
|
||||
"error",
|
||||
err,
|
||||
)
|
||||
// Queue ASN operations
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: fromASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: toASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
toAS, err := h.db.GetOrCreateASN(toASN, timestamp)
|
||||
if err != nil {
|
||||
h.logger.Error(
|
||||
"Failed to get/create to ASN",
|
||||
"asn",
|
||||
toASN,
|
||||
"error",
|
||||
err,
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// Record the peering
|
||||
err = h.db.RecordPeering(
|
||||
fromAS.ID.String(),
|
||||
toAS.ID.String(),
|
||||
timestamp,
|
||||
)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to record peering",
|
||||
"from_asn", fromASN,
|
||||
"to_asn", toASN,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
// Queue peering operation
|
||||
h.peeringBatch = append(h.peeringBatch, peeringOp{
|
||||
fromASN: fromASN,
|
||||
toASN: toASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process withdrawals
|
||||
// Queue operations for withdrawals
|
||||
for _, prefix := range msg.Withdrawals {
|
||||
// Get prefix
|
||||
_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
|
||||
h.prefixBatch = append(h.prefixBatch, prefixOp{
|
||||
prefix: prefix,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
// Check if we need to flush
|
||||
if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize {
|
||||
h.flushBatchesLocked()
|
||||
}
|
||||
}
|
||||
|
||||
// flushLoop runs in a goroutine and periodically flushes batches
|
||||
func (h *DBHandler) flushLoop() {
|
||||
defer h.wg.Done()
|
||||
ticker := time.NewTicker(batchTimeout)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
h.mu.Lock()
|
||||
if time.Since(h.lastFlush) >= batchTimeout {
|
||||
h.flushBatchesLocked()
|
||||
}
|
||||
h.mu.Unlock()
|
||||
case <-h.stopCh:
|
||||
// Final flush
|
||||
h.mu.Lock()
|
||||
h.flushBatchesLocked()
|
||||
h.mu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flushBatchesLocked flushes all batches to the database (must be called with mutex held)
|
||||
func (h *DBHandler) flushBatchesLocked() {
|
||||
if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Process ASNs first (deduped)
|
||||
asnMap := make(map[int]time.Time)
|
||||
for _, op := range h.asnBatch {
|
||||
if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) {
|
||||
asnMap[op.number] = op.timestamp
|
||||
}
|
||||
}
|
||||
|
||||
asnCache := make(map[int]*database.ASN)
|
||||
for asn, ts := range asnMap {
|
||||
asnObj, err := h.db.GetOrCreateASN(asn, ts)
|
||||
if err != nil {
|
||||
h.logger.Error(
|
||||
"Failed to get prefix for withdrawal",
|
||||
"prefix",
|
||||
prefix,
|
||||
"error",
|
||||
err,
|
||||
)
|
||||
h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: Record the withdrawal in the announcements table as a withdrawal
|
||||
asnCache[asn] = asnObj
|
||||
}
|
||||
|
||||
// Process prefixes (deduped)
|
||||
prefixMap := make(map[string]time.Time)
|
||||
for _, op := range h.prefixBatch {
|
||||
if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) {
|
||||
prefixMap[op.prefix] = op.timestamp
|
||||
}
|
||||
}
|
||||
|
||||
for prefix, ts := range prefixMap {
|
||||
_, err := h.db.GetOrCreatePrefix(prefix, ts)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Process peerings (deduped)
|
||||
type peeringKey struct {
|
||||
from, to int
|
||||
}
|
||||
peeringMap := make(map[peeringKey]time.Time)
|
||||
for _, op := range h.peeringBatch {
|
||||
key := peeringKey{from: op.fromASN, to: op.toASN}
|
||||
if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) {
|
||||
peeringMap[key] = op.timestamp
|
||||
}
|
||||
}
|
||||
|
||||
for key, ts := range peeringMap {
|
||||
fromAS := asnCache[key.from]
|
||||
toAS := asnCache[key.to]
|
||||
if fromAS != nil && toAS != nil {
|
||||
err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to record peering",
|
||||
"from_asn", key.from,
|
||||
"to_asn", key.to,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear batches
|
||||
h.prefixBatch = h.prefixBatch[:0]
|
||||
h.asnBatch = h.asnBatch[:0]
|
||||
h.peeringBatch = h.peeringBatch[:0]
|
||||
h.lastFlush = time.Now()
|
||||
}
|
||||
|
||||
// Stop gracefully stops the handler and flushes remaining batches
|
||||
func (h *DBHandler) Stop() {
|
||||
close(h.stopCh)
|
||||
h.wg.Wait()
|
||||
}
|
||||
|
@ -1,272 +0,0 @@
|
||||
package routewatch
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||
"git.eeqj.de/sneak/routewatch/internal/logger"
|
||||
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||
)
|
||||
|
||||
const (
|
||||
// batchedDatabaseHandlerQueueSize is the queue capacity for database operations
|
||||
batchedDatabaseHandlerQueueSize = 1000
|
||||
|
||||
// batchSize is the number of operations to batch together
|
||||
batchSize = 500
|
||||
|
||||
// batchTimeout is the maximum time to wait before flushing a batch
|
||||
batchTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// BatchedDatabaseHandler handles BGP messages and stores them in the database using batched operations
|
||||
type BatchedDatabaseHandler struct {
|
||||
db database.Store
|
||||
logger *logger.Logger
|
||||
|
||||
// Batching
|
||||
mu sync.Mutex
|
||||
prefixBatch []prefixOp
|
||||
asnBatch []asnOp
|
||||
peeringBatch []peeringOp
|
||||
lastFlush time.Time
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type prefixOp struct {
|
||||
prefix string
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
type asnOp struct {
|
||||
number int
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
type peeringOp struct {
|
||||
fromASN int
|
||||
toASN int
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// NewBatchedDatabaseHandler creates a new batched database handler
|
||||
func NewBatchedDatabaseHandler(
|
||||
db database.Store,
|
||||
logger *logger.Logger,
|
||||
) *BatchedDatabaseHandler {
|
||||
h := &BatchedDatabaseHandler{
|
||||
db: db,
|
||||
logger: logger,
|
||||
prefixBatch: make([]prefixOp, 0, batchSize),
|
||||
asnBatch: make([]asnOp, 0, batchSize),
|
||||
peeringBatch: make([]peeringOp, 0, batchSize),
|
||||
lastFlush: time.Now(),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start the flush timer goroutine
|
||||
h.wg.Add(1)
|
||||
go h.flushLoop()
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
// WantsMessage returns true if this handler wants to process messages of the given type
|
||||
func (h *BatchedDatabaseHandler) WantsMessage(messageType string) bool {
|
||||
// We only care about UPDATE messages for the database
|
||||
return messageType == "UPDATE"
|
||||
}
|
||||
|
||||
// QueueCapacity returns the desired queue capacity for this handler
|
||||
func (h *BatchedDatabaseHandler) QueueCapacity() int {
|
||||
// Batching allows us to use a larger queue
|
||||
return batchedDatabaseHandlerQueueSize
|
||||
}
|
||||
|
||||
// HandleMessage processes a RIS message and queues database operations
|
||||
func (h *BatchedDatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
// Use the pre-parsed timestamp
|
||||
timestamp := msg.ParsedTimestamp
|
||||
|
||||
// Get origin ASN from path (last element)
|
||||
var originASN int
|
||||
if len(msg.Path) > 0 {
|
||||
originASN = msg.Path[len(msg.Path)-1]
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Queue operations for announcements
|
||||
for _, announcement := range msg.Announcements {
|
||||
for _, prefix := range announcement.Prefixes {
|
||||
// Queue prefix operation
|
||||
h.prefixBatch = append(h.prefixBatch, prefixOp{
|
||||
prefix: prefix,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
|
||||
// Queue origin ASN operation
|
||||
if originASN > 0 {
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: originASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
// Process AS path to queue peering operations
|
||||
if len(msg.Path) > 1 {
|
||||
for i := range len(msg.Path) - 1 {
|
||||
fromASN := msg.Path[i]
|
||||
toASN := msg.Path[i+1]
|
||||
|
||||
// Queue ASN operations
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: fromASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
h.asnBatch = append(h.asnBatch, asnOp{
|
||||
number: toASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
|
||||
// Queue peering operation
|
||||
h.peeringBatch = append(h.peeringBatch, peeringOp{
|
||||
fromASN: fromASN,
|
||||
toASN: toASN,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Queue operations for withdrawals
|
||||
for _, prefix := range msg.Withdrawals {
|
||||
h.prefixBatch = append(h.prefixBatch, prefixOp{
|
||||
prefix: prefix,
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
// Check if we need to flush
|
||||
if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize {
|
||||
h.flushBatchesLocked()
|
||||
}
|
||||
}
|
||||
|
||||
// flushLoop runs in a goroutine and periodically flushes batches
|
||||
func (h *BatchedDatabaseHandler) flushLoop() {
|
||||
defer h.wg.Done()
|
||||
ticker := time.NewTicker(batchTimeout)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
h.mu.Lock()
|
||||
if time.Since(h.lastFlush) >= batchTimeout {
|
||||
h.flushBatchesLocked()
|
||||
}
|
||||
h.mu.Unlock()
|
||||
case <-h.stopCh:
|
||||
// Final flush
|
||||
h.mu.Lock()
|
||||
h.flushBatchesLocked()
|
||||
h.mu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flushBatchesLocked flushes all batches to the database (must be called with mutex held)
|
||||
func (h *BatchedDatabaseHandler) flushBatchesLocked() {
|
||||
if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// Process ASNs first (deduped)
|
||||
asnMap := make(map[int]time.Time)
|
||||
for _, op := range h.asnBatch {
|
||||
if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) {
|
||||
asnMap[op.number] = op.timestamp
|
||||
}
|
||||
}
|
||||
|
||||
asnCache := make(map[int]*database.ASN)
|
||||
for asn, ts := range asnMap {
|
||||
asnObj, err := h.db.GetOrCreateASN(asn, ts)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err)
|
||||
|
||||
continue
|
||||
}
|
||||
asnCache[asn] = asnObj
|
||||
}
|
||||
|
||||
// Process prefixes (deduped)
|
||||
prefixMap := make(map[string]time.Time)
|
||||
for _, op := range h.prefixBatch {
|
||||
if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) {
|
||||
prefixMap[op.prefix] = op.timestamp
|
||||
}
|
||||
}
|
||||
|
||||
for prefix, ts := range prefixMap {
|
||||
_, err := h.db.GetOrCreatePrefix(prefix, ts)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Process peerings (deduped)
|
||||
type peeringKey struct {
|
||||
from, to int
|
||||
}
|
||||
peeringMap := make(map[peeringKey]time.Time)
|
||||
for _, op := range h.peeringBatch {
|
||||
key := peeringKey{from: op.fromASN, to: op.toASN}
|
||||
if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) {
|
||||
peeringMap[key] = op.timestamp
|
||||
}
|
||||
}
|
||||
|
||||
for key, ts := range peeringMap {
|
||||
fromAS := asnCache[key.from]
|
||||
toAS := asnCache[key.to]
|
||||
if fromAS != nil && toAS != nil {
|
||||
err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to record peering",
|
||||
"from_asn", key.from,
|
||||
"to_asn", key.to,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear batches
|
||||
h.prefixBatch = h.prefixBatch[:0]
|
||||
h.asnBatch = h.asnBatch[:0]
|
||||
h.peeringBatch = h.peeringBatch[:0]
|
||||
h.lastFlush = time.Now()
|
||||
|
||||
h.logger.Debug("Flushed database batches",
|
||||
"duration", time.Since(start),
|
||||
"asns", len(asnMap),
|
||||
"prefixes", len(prefixMap),
|
||||
"peerings", len(peeringMap),
|
||||
)
|
||||
}
|
||||
|
||||
// Stop gracefully stops the handler and flushes remaining batches
|
||||
func (h *BatchedDatabaseHandler) Stop() {
|
||||
close(h.stopCh)
|
||||
h.wg.Wait()
|
||||
}
|
@ -2,6 +2,8 @@ package routewatch
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||
"git.eeqj.de/sneak/routewatch/internal/logger"
|
||||
@ -10,21 +12,50 @@ import (
|
||||
|
||||
const (
|
||||
// peerHandlerQueueSize is the queue capacity for peer tracking operations
|
||||
peerHandlerQueueSize = 500
|
||||
peerHandlerQueueSize = 50000
|
||||
|
||||
// peerBatchSize is the number of peer updates to batch together
|
||||
peerBatchSize = 500
|
||||
|
||||
// peerBatchTimeout is the maximum time to wait before flushing a batch
|
||||
peerBatchTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// PeerHandler tracks BGP peers from all message types
|
||||
// PeerHandler tracks BGP peers from all message types using batched operations
|
||||
type PeerHandler struct {
|
||||
db database.Store
|
||||
logger *logger.Logger
|
||||
|
||||
// Batching
|
||||
mu sync.Mutex
|
||||
peerBatch []peerUpdate
|
||||
lastFlush time.Time
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewPeerHandler creates a new peer tracking handler
|
||||
type peerUpdate struct {
|
||||
peerIP string
|
||||
peerASN int
|
||||
messageType string
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// NewPeerHandler creates a new batched peer tracking handler
|
||||
func NewPeerHandler(db database.Store, logger *logger.Logger) *PeerHandler {
|
||||
return &PeerHandler{
|
||||
db: db,
|
||||
logger: logger,
|
||||
h := &PeerHandler{
|
||||
db: db,
|
||||
logger: logger,
|
||||
peerBatch: make([]peerUpdate, 0, peerBatchSize),
|
||||
lastFlush: time.Now(),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start the flush timer goroutine
|
||||
h.wg.Add(1)
|
||||
go h.flushLoop()
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
// WantsMessage returns true for all message types since we track peers from all messages
|
||||
@ -34,7 +65,7 @@ func (h *PeerHandler) WantsMessage(_ string) bool {
|
||||
|
||||
// QueueCapacity returns the desired queue capacity for this handler
|
||||
func (h *PeerHandler) QueueCapacity() int {
|
||||
// Peer tracking is lightweight but involves database ops, use moderate queue
|
||||
// Batching allows us to use a larger queue
|
||||
return peerHandlerQueueSize
|
||||
}
|
||||
|
||||
@ -48,13 +79,81 @@ func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
}
|
||||
}
|
||||
|
||||
// Update peer in database
|
||||
if err := h.db.UpdatePeer(msg.Peer, peerASN, msg.Type, msg.ParsedTimestamp); err != nil {
|
||||
h.logger.Error("Failed to update peer",
|
||||
"peer", msg.Peer,
|
||||
"peer_asn", peerASN,
|
||||
"message_type", msg.Type,
|
||||
"error", err,
|
||||
)
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Add to batch
|
||||
h.peerBatch = append(h.peerBatch, peerUpdate{
|
||||
peerIP: msg.Peer,
|
||||
peerASN: peerASN,
|
||||
messageType: msg.Type,
|
||||
timestamp: msg.ParsedTimestamp,
|
||||
})
|
||||
|
||||
// Check if we need to flush
|
||||
if len(h.peerBatch) >= peerBatchSize {
|
||||
h.flushBatchLocked()
|
||||
}
|
||||
}
|
||||
|
||||
// flushLoop runs in a goroutine and periodically flushes batches
|
||||
func (h *PeerHandler) flushLoop() {
|
||||
defer h.wg.Done()
|
||||
ticker := time.NewTicker(peerBatchTimeout)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
h.mu.Lock()
|
||||
if time.Since(h.lastFlush) >= peerBatchTimeout {
|
||||
h.flushBatchLocked()
|
||||
}
|
||||
h.mu.Unlock()
|
||||
case <-h.stopCh:
|
||||
// Final flush
|
||||
h.mu.Lock()
|
||||
h.flushBatchLocked()
|
||||
h.mu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flushBatchLocked flushes the peer batch to the database (must be called with mutex held)
|
||||
func (h *PeerHandler) flushBatchLocked() {
|
||||
if len(h.peerBatch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Deduplicate by peer IP, keeping the latest update for each peer
|
||||
peerMap := make(map[string]peerUpdate)
|
||||
for _, update := range h.peerBatch {
|
||||
if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) {
|
||||
peerMap[update.peerIP] = update
|
||||
}
|
||||
}
|
||||
|
||||
// Apply updates
|
||||
for _, update := range peerMap {
|
||||
if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil {
|
||||
h.logger.Error("Failed to update peer",
|
||||
"peer", update.peerIP,
|
||||
"peer_asn", update.peerASN,
|
||||
"message_type", update.messageType,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Clear batch
|
||||
h.peerBatch = h.peerBatch[:0]
|
||||
h.lastFlush = time.Now()
|
||||
}
|
||||
|
||||
// Stop gracefully stops the handler and flushes remaining batches
|
||||
func (h *PeerHandler) Stop() {
|
||||
close(h.stopCh)
|
||||
h.wg.Wait()
|
||||
}
|
||||
|
@ -1,170 +0,0 @@
|
||||
package routewatch
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||
"git.eeqj.de/sneak/routewatch/internal/logger"
|
||||
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||
)
|
||||
|
||||
const (
|
||||
// batchedPeerHandlerQueueSize is the queue capacity for peer tracking operations
|
||||
batchedPeerHandlerQueueSize = 2000
|
||||
|
||||
// peerBatchSize is the number of peer updates to batch together
|
||||
peerBatchSize = 500
|
||||
|
||||
// peerBatchTimeout is the maximum time to wait before flushing a batch
|
||||
peerBatchTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// BatchedPeerHandler tracks BGP peers from all message types using batched operations
|
||||
type BatchedPeerHandler struct {
|
||||
db database.Store
|
||||
logger *logger.Logger
|
||||
|
||||
// Batching
|
||||
mu sync.Mutex
|
||||
peerBatch []peerUpdate
|
||||
lastFlush time.Time
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type peerUpdate struct {
|
||||
peerIP string
|
||||
peerASN int
|
||||
messageType string
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// NewBatchedPeerHandler creates a new batched peer tracking handler
|
||||
func NewBatchedPeerHandler(db database.Store, logger *logger.Logger) *BatchedPeerHandler {
|
||||
h := &BatchedPeerHandler{
|
||||
db: db,
|
||||
logger: logger,
|
||||
peerBatch: make([]peerUpdate, 0, peerBatchSize),
|
||||
lastFlush: time.Now(),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start the flush timer goroutine
|
||||
h.wg.Add(1)
|
||||
go h.flushLoop()
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
// WantsMessage returns true for all message types since we track peers from all messages
|
||||
func (h *BatchedPeerHandler) WantsMessage(_ string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// QueueCapacity returns the desired queue capacity for this handler
|
||||
func (h *BatchedPeerHandler) QueueCapacity() int {
|
||||
// Batching allows us to use a larger queue
|
||||
return batchedPeerHandlerQueueSize
|
||||
}
|
||||
|
||||
// HandleMessage processes a message to track peer information
|
||||
func (h *BatchedPeerHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
// Parse peer ASN from string
|
||||
peerASN := 0
|
||||
if msg.PeerASN != "" {
|
||||
if asn, err := strconv.Atoi(msg.PeerASN); err == nil {
|
||||
peerASN = asn
|
||||
}
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Add to batch
|
||||
h.peerBatch = append(h.peerBatch, peerUpdate{
|
||||
peerIP: msg.Peer,
|
||||
peerASN: peerASN,
|
||||
messageType: msg.Type,
|
||||
timestamp: msg.ParsedTimestamp,
|
||||
})
|
||||
|
||||
// Check if we need to flush
|
||||
if len(h.peerBatch) >= peerBatchSize {
|
||||
h.flushBatchLocked()
|
||||
}
|
||||
}
|
||||
|
||||
// flushLoop runs in a goroutine and periodically flushes batches
|
||||
func (h *BatchedPeerHandler) flushLoop() {
|
||||
defer h.wg.Done()
|
||||
ticker := time.NewTicker(peerBatchTimeout)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
h.mu.Lock()
|
||||
if time.Since(h.lastFlush) >= peerBatchTimeout {
|
||||
h.flushBatchLocked()
|
||||
}
|
||||
h.mu.Unlock()
|
||||
case <-h.stopCh:
|
||||
// Final flush
|
||||
h.mu.Lock()
|
||||
h.flushBatchLocked()
|
||||
h.mu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flushBatchLocked flushes the peer batch to the database (must be called with mutex held)
|
||||
func (h *BatchedPeerHandler) flushBatchLocked() {
|
||||
if len(h.peerBatch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// Deduplicate by peer IP, keeping the latest update for each peer
|
||||
peerMap := make(map[string]peerUpdate)
|
||||
for _, update := range h.peerBatch {
|
||||
if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) {
|
||||
peerMap[update.peerIP] = update
|
||||
}
|
||||
}
|
||||
|
||||
// Apply updates
|
||||
successCount := 0
|
||||
for _, update := range peerMap {
|
||||
if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil {
|
||||
h.logger.Error("Failed to update peer",
|
||||
"peer", update.peerIP,
|
||||
"peer_asn", update.peerASN,
|
||||
"message_type", update.messageType,
|
||||
"error", err,
|
||||
)
|
||||
} else {
|
||||
successCount++
|
||||
}
|
||||
}
|
||||
|
||||
// Clear batch
|
||||
h.peerBatch = h.peerBatch[:0]
|
||||
h.lastFlush = time.Now()
|
||||
|
||||
h.logger.Debug("Flushed peer batch",
|
||||
"duration", time.Since(start),
|
||||
"total_updates", len(peerMap),
|
||||
"successful", successCount,
|
||||
)
|
||||
}
|
||||
|
||||
// Stop gracefully stops the handler and flushes remaining batches
|
||||
func (h *BatchedPeerHandler) Stop() {
|
||||
close(h.stopCh)
|
||||
h.wg.Wait()
|
||||
}
|
273
internal/routewatch/prefixhandler.go
Normal file
273
internal/routewatch/prefixhandler.go
Normal file
@ -0,0 +1,273 @@
|
||||
package routewatch
|
||||
|
||||
import (
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||
"git.eeqj.de/sneak/routewatch/internal/logger"
|
||||
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
// prefixHandlerQueueSize is the queue capacity for prefix tracking operations
|
||||
prefixHandlerQueueSize = 50000
|
||||
|
||||
// prefixBatchSize is the number of prefix updates to batch together
|
||||
prefixBatchSize = 2000
|
||||
|
||||
// prefixBatchTimeout is the maximum time to wait before flushing a batch
|
||||
prefixBatchTimeout = 5 * time.Second
|
||||
|
||||
// IP version constants
|
||||
ipv4Version = 4
|
||||
ipv6Version = 6
|
||||
)
|
||||
|
||||
// PrefixHandler tracks BGP prefixes and maintains a live routing table in the database.
|
||||
// Routes are added on announcement and deleted on withdrawal.
|
||||
type PrefixHandler struct {
|
||||
db database.Store
|
||||
logger *logger.Logger
|
||||
|
||||
// Batching
|
||||
mu sync.Mutex
|
||||
batch []prefixUpdate
|
||||
lastFlush time.Time
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type prefixUpdate struct {
|
||||
prefix string
|
||||
originASN int
|
||||
peer string
|
||||
messageType string
|
||||
timestamp time.Time
|
||||
path []int
|
||||
}
|
||||
|
||||
// NewPrefixHandler creates a new batched prefix tracking handler
|
||||
func NewPrefixHandler(db database.Store, logger *logger.Logger) *PrefixHandler {
|
||||
h := &PrefixHandler{
|
||||
db: db,
|
||||
logger: logger,
|
||||
batch: make([]prefixUpdate, 0, prefixBatchSize),
|
||||
lastFlush: time.Now(),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start the flush timer goroutine
|
||||
h.wg.Add(1)
|
||||
go h.flushLoop()
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
// WantsMessage returns true if this handler wants to process messages of the given type
|
||||
func (h *PrefixHandler) WantsMessage(messageType string) bool {
|
||||
// We only care about UPDATE messages for the routing table
|
||||
return messageType == "UPDATE"
|
||||
}
|
||||
|
||||
// QueueCapacity returns the desired queue capacity for this handler
|
||||
func (h *PrefixHandler) QueueCapacity() int {
|
||||
// Batching allows us to use a larger queue
|
||||
return prefixHandlerQueueSize
|
||||
}
|
||||
|
||||
// HandleMessage processes a message to track prefix information
|
||||
func (h *PrefixHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
// Use the pre-parsed timestamp
|
||||
timestamp := msg.ParsedTimestamp
|
||||
|
||||
// Get origin ASN from path (last element)
|
||||
var originASN int
|
||||
if len(msg.Path) > 0 {
|
||||
originASN = msg.Path[len(msg.Path)-1]
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Process announcements
|
||||
for _, announcement := range msg.Announcements {
|
||||
for _, prefix := range announcement.Prefixes {
|
||||
h.batch = append(h.batch, prefixUpdate{
|
||||
prefix: prefix,
|
||||
originASN: originASN,
|
||||
peer: msg.Peer,
|
||||
messageType: "announcement",
|
||||
timestamp: timestamp,
|
||||
path: msg.Path,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Process withdrawals
|
||||
for _, prefix := range msg.Withdrawals {
|
||||
h.batch = append(h.batch, prefixUpdate{
|
||||
prefix: prefix,
|
||||
originASN: originASN, // Use the originASN from path if available
|
||||
peer: msg.Peer,
|
||||
messageType: "withdrawal",
|
||||
timestamp: timestamp,
|
||||
path: msg.Path,
|
||||
})
|
||||
}
|
||||
|
||||
// Check if we need to flush
|
||||
if len(h.batch) >= prefixBatchSize {
|
||||
h.flushBatchLocked()
|
||||
}
|
||||
}
|
||||
|
||||
// flushLoop runs in a goroutine and periodically flushes batches
|
||||
func (h *PrefixHandler) flushLoop() {
|
||||
defer h.wg.Done()
|
||||
ticker := time.NewTicker(prefixBatchTimeout)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
h.mu.Lock()
|
||||
if time.Since(h.lastFlush) >= prefixBatchTimeout {
|
||||
h.flushBatchLocked()
|
||||
}
|
||||
h.mu.Unlock()
|
||||
case <-h.stopCh:
|
||||
// Final flush
|
||||
h.mu.Lock()
|
||||
h.flushBatchLocked()
|
||||
h.mu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flushBatchLocked flushes the prefix batch to the database (must be called with mutex held)
|
||||
func (h *PrefixHandler) flushBatchLocked() {
|
||||
if len(h.batch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Group updates by prefix to deduplicate
|
||||
// For each prefix, keep the latest update
|
||||
prefixMap := make(map[string]prefixUpdate)
|
||||
for _, update := range h.batch {
|
||||
key := update.prefix
|
||||
if existing, ok := prefixMap[key]; !ok || update.timestamp.After(existing.timestamp) {
|
||||
prefixMap[key] = update
|
||||
}
|
||||
}
|
||||
|
||||
// Apply updates to database
|
||||
for _, update := range prefixMap {
|
||||
// Get or create prefix
|
||||
prefix, err := h.db.GetOrCreatePrefix(update.prefix, update.timestamp)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to get/create prefix",
|
||||
"prefix", update.prefix,
|
||||
"error", err,
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// For announcements, get ASN info and create announcement record
|
||||
if update.messageType == "announcement" && update.originASN > 0 {
|
||||
h.processAnnouncement(prefix, update)
|
||||
} else if update.messageType == "withdrawal" {
|
||||
h.processWithdrawal(prefix, update)
|
||||
}
|
||||
}
|
||||
|
||||
// Clear batch
|
||||
h.batch = h.batch[:0]
|
||||
h.lastFlush = time.Now()
|
||||
}
|
||||
|
||||
// parseCIDR extracts the mask length and IP version from a prefix string
|
||||
func parseCIDR(prefix string) (maskLength int, ipVersion int, err error) {
|
||||
_, ipNet, err := net.ParseCIDR(prefix)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
ones, _ := ipNet.Mask.Size()
|
||||
if strings.Contains(prefix, ":") {
|
||||
return ones, ipv6Version, nil
|
||||
}
|
||||
|
||||
return ones, ipv4Version, nil
|
||||
}
|
||||
|
||||
// processAnnouncement handles storing an announcement in the database
|
||||
func (h *PrefixHandler) processAnnouncement(_ *database.Prefix, update prefixUpdate) {
|
||||
// Parse CIDR to get mask length
|
||||
maskLength, ipVersion, err := parseCIDR(update.prefix)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to parse CIDR",
|
||||
"prefix", update.prefix,
|
||||
"error", err,
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Create live route record
|
||||
liveRoute := &database.LiveRoute{
|
||||
ID: uuid.New(),
|
||||
Prefix: update.prefix,
|
||||
MaskLength: maskLength,
|
||||
IPVersion: ipVersion,
|
||||
OriginASN: update.originASN,
|
||||
PeerIP: update.peer,
|
||||
ASPath: update.path,
|
||||
NextHop: update.peer, // Using peer as next hop
|
||||
LastUpdated: update.timestamp,
|
||||
}
|
||||
|
||||
if err := h.db.UpsertLiveRoute(liveRoute); err != nil {
|
||||
h.logger.Error("Failed to upsert live route",
|
||||
"prefix", update.prefix,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// processWithdrawal handles removing a route from the live routing table
|
||||
func (h *PrefixHandler) processWithdrawal(_ *database.Prefix, update prefixUpdate) {
|
||||
// For withdrawals, we need to delete the route from live_routes
|
||||
// Since we have the origin ASN from the update, we can delete the specific route
|
||||
if update.originASN > 0 {
|
||||
if err := h.db.DeleteLiveRoute(update.prefix, update.originASN, update.peer); err != nil {
|
||||
h.logger.Error("Failed to delete live route",
|
||||
"prefix", update.prefix,
|
||||
"origin_asn", update.originASN,
|
||||
"peer", update.peer,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
} else {
|
||||
// If no origin ASN, just delete all routes for this prefix from this peer
|
||||
if err := h.db.DeleteLiveRoute(update.prefix, 0, update.peer); err != nil {
|
||||
h.logger.Error("Failed to delete live route (no origin ASN)",
|
||||
"prefix", update.prefix,
|
||||
"peer", update.peer,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop gracefully stops the handler and flushes remaining batches
|
||||
func (h *PrefixHandler) Stop() {
|
||||
close(h.stopCh)
|
||||
h.wg.Wait()
|
||||
}
|
@ -64,18 +64,26 @@ type RoutingTable struct {
|
||||
lastMetricsReset time.Time
|
||||
|
||||
// Configuration
|
||||
snapshotDir string
|
||||
snapshotDir string
|
||||
routeExpirationTimeout time.Duration
|
||||
logger *logger.Logger
|
||||
|
||||
// Expiration management
|
||||
stopExpiration chan struct{}
|
||||
}
|
||||
|
||||
// New creates a new routing table, loading from snapshot if available
|
||||
func New(cfg *config.Config, logger *logger.Logger) *RoutingTable {
|
||||
rt := &RoutingTable{
|
||||
routes: make(map[RouteKey]*Route),
|
||||
byPrefix: make(map[uuid.UUID]map[RouteKey]*Route),
|
||||
byOriginASN: make(map[uuid.UUID]map[RouteKey]*Route),
|
||||
byPeerASN: make(map[int]map[RouteKey]*Route),
|
||||
lastMetricsReset: time.Now(),
|
||||
snapshotDir: cfg.GetStateDir(),
|
||||
routes: make(map[RouteKey]*Route),
|
||||
byPrefix: make(map[uuid.UUID]map[RouteKey]*Route),
|
||||
byOriginASN: make(map[uuid.UUID]map[RouteKey]*Route),
|
||||
byPeerASN: make(map[int]map[RouteKey]*Route),
|
||||
lastMetricsReset: time.Now(),
|
||||
snapshotDir: cfg.GetStateDir(),
|
||||
routeExpirationTimeout: cfg.RouteExpirationTimeout,
|
||||
logger: logger,
|
||||
stopExpiration: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Try to load from snapshot
|
||||
@ -83,6 +91,9 @@ func New(cfg *config.Config, logger *logger.Logger) *RoutingTable {
|
||||
logger.Warn("Failed to load routing table from snapshot", "error", err)
|
||||
}
|
||||
|
||||
// Start expiration goroutine
|
||||
go rt.expireRoutesLoop()
|
||||
|
||||
return rt
|
||||
}
|
||||
|
||||
@ -522,3 +533,72 @@ func (rt *RoutingTable) loadFromSnapshot(logger *logger.Logger) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// expireRoutesLoop periodically removes expired routes
|
||||
func (rt *RoutingTable) expireRoutesLoop() {
|
||||
// Run every minute to check for expired routes
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
rt.expireStaleRoutes()
|
||||
case <-rt.stopExpiration:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// expireStaleRoutes removes routes that haven't been updated recently
|
||||
func (rt *RoutingTable) expireStaleRoutes() {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
|
||||
now := time.Now().UTC()
|
||||
cutoffTime := now.Add(-rt.routeExpirationTimeout)
|
||||
expiredCount := 0
|
||||
|
||||
// Collect keys to delete (can't delete while iterating)
|
||||
var keysToDelete []RouteKey
|
||||
for key, route := range rt.routes {
|
||||
// Use AnnouncedAt as the last update time
|
||||
if route.AnnouncedAt.Before(cutoffTime) {
|
||||
keysToDelete = append(keysToDelete, key)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete expired routes
|
||||
for _, key := range keysToDelete {
|
||||
route, exists := rt.routes[key]
|
||||
if !exists {
|
||||
continue
|
||||
}
|
||||
|
||||
rt.removeFromIndexes(key, route)
|
||||
delete(rt.routes, key)
|
||||
expiredCount++
|
||||
|
||||
// Update metrics
|
||||
if isIPv6(route.Prefix) {
|
||||
rt.ipv6Routes--
|
||||
} else {
|
||||
rt.ipv4Routes--
|
||||
}
|
||||
}
|
||||
|
||||
if expiredCount > 0 {
|
||||
rt.logger.Info("Expired stale routes",
|
||||
"count", expiredCount,
|
||||
"timeout", rt.routeExpirationTimeout,
|
||||
"remaining_routes", len(rt.routes),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Stop gracefully stops the routing table background tasks
|
||||
func (rt *RoutingTable) Stop() {
|
||||
if rt.stopExpiration != nil {
|
||||
close(rt.stopExpiration)
|
||||
}
|
||||
}
|
||||
|
@ -114,23 +114,25 @@ func (s *Server) handleRoot() http.HandlerFunc {
|
||||
func (s *Server) handleStatusJSON() http.HandlerFunc {
|
||||
// Stats represents the statistics response
|
||||
type Stats struct {
|
||||
Uptime string `json:"uptime"`
|
||||
TotalMessages uint64 `json:"total_messages"`
|
||||
TotalBytes uint64 `json:"total_bytes"`
|
||||
MessagesPerSec float64 `json:"messages_per_sec"`
|
||||
MbitsPerSec float64 `json:"mbits_per_sec"`
|
||||
Connected bool `json:"connected"`
|
||||
ASNs int `json:"asns"`
|
||||
Prefixes int `json:"prefixes"`
|
||||
IPv4Prefixes int `json:"ipv4_prefixes"`
|
||||
IPv6Prefixes int `json:"ipv6_prefixes"`
|
||||
Peerings int `json:"peerings"`
|
||||
DatabaseSizeBytes int64 `json:"database_size_bytes"`
|
||||
LiveRoutes int `json:"live_routes"`
|
||||
IPv4Routes int `json:"ipv4_routes"`
|
||||
IPv6Routes int `json:"ipv6_routes"`
|
||||
IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"`
|
||||
IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"`
|
||||
Uptime string `json:"uptime"`
|
||||
TotalMessages uint64 `json:"total_messages"`
|
||||
TotalBytes uint64 `json:"total_bytes"`
|
||||
MessagesPerSec float64 `json:"messages_per_sec"`
|
||||
MbitsPerSec float64 `json:"mbits_per_sec"`
|
||||
Connected bool `json:"connected"`
|
||||
ASNs int `json:"asns"`
|
||||
Prefixes int `json:"prefixes"`
|
||||
IPv4Prefixes int `json:"ipv4_prefixes"`
|
||||
IPv6Prefixes int `json:"ipv6_prefixes"`
|
||||
Peerings int `json:"peerings"`
|
||||
DatabaseSizeBytes int64 `json:"database_size_bytes"`
|
||||
LiveRoutes int `json:"live_routes"`
|
||||
IPv4Routes int `json:"ipv4_routes"`
|
||||
IPv6Routes int `json:"ipv6_routes"`
|
||||
IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"`
|
||||
IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"`
|
||||
IPv4PrefixDistribution []database.PrefixDistribution `json:"ipv4_prefix_distribution"`
|
||||
IPv6PrefixDistribution []database.PrefixDistribution `json:"ipv6_prefix_distribution"`
|
||||
}
|
||||
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
@ -145,7 +147,6 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
|
||||
errChan := make(chan error)
|
||||
|
||||
go func() {
|
||||
s.logger.Debug("Starting database stats query")
|
||||
dbStats, err := s.db.GetStats()
|
||||
if err != nil {
|
||||
s.logger.Debug("Database stats query failed", "error", err)
|
||||
@ -153,7 +154,6 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
|
||||
|
||||
return
|
||||
}
|
||||
s.logger.Debug("Database stats query completed")
|
||||
statsChan <- dbStats
|
||||
}()
|
||||
|
||||
@ -200,23 +200,25 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
|
||||
rtStats := s.routingTable.GetDetailedStats()
|
||||
|
||||
stats := Stats{
|
||||
Uptime: uptime,
|
||||
TotalMessages: metrics.TotalMessages,
|
||||
TotalBytes: metrics.TotalBytes,
|
||||
MessagesPerSec: metrics.MessagesPerSec,
|
||||
MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit,
|
||||
Connected: metrics.Connected,
|
||||
ASNs: dbStats.ASNs,
|
||||
Prefixes: dbStats.Prefixes,
|
||||
IPv4Prefixes: dbStats.IPv4Prefixes,
|
||||
IPv6Prefixes: dbStats.IPv6Prefixes,
|
||||
Peerings: dbStats.Peerings,
|
||||
DatabaseSizeBytes: dbStats.FileSizeBytes,
|
||||
LiveRoutes: rtStats.TotalRoutes,
|
||||
IPv4Routes: rtStats.IPv4Routes,
|
||||
IPv6Routes: rtStats.IPv6Routes,
|
||||
IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate,
|
||||
IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate,
|
||||
Uptime: uptime,
|
||||
TotalMessages: metrics.TotalMessages,
|
||||
TotalBytes: metrics.TotalBytes,
|
||||
MessagesPerSec: metrics.MessagesPerSec,
|
||||
MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit,
|
||||
Connected: metrics.Connected,
|
||||
ASNs: dbStats.ASNs,
|
||||
Prefixes: dbStats.Prefixes,
|
||||
IPv4Prefixes: dbStats.IPv4Prefixes,
|
||||
IPv6Prefixes: dbStats.IPv6Prefixes,
|
||||
Peerings: dbStats.Peerings,
|
||||
DatabaseSizeBytes: dbStats.FileSizeBytes,
|
||||
LiveRoutes: dbStats.LiveRoutes,
|
||||
IPv4Routes: rtStats.IPv4Routes,
|
||||
IPv6Routes: rtStats.IPv6Routes,
|
||||
IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate,
|
||||
IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate,
|
||||
IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution,
|
||||
IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution,
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
@ -246,24 +248,26 @@ func (s *Server) handleStats() http.HandlerFunc {
|
||||
|
||||
// StatsResponse represents the API statistics response
|
||||
type StatsResponse struct {
|
||||
Uptime string `json:"uptime"`
|
||||
TotalMessages uint64 `json:"total_messages"`
|
||||
TotalBytes uint64 `json:"total_bytes"`
|
||||
MessagesPerSec float64 `json:"messages_per_sec"`
|
||||
MbitsPerSec float64 `json:"mbits_per_sec"`
|
||||
Connected bool `json:"connected"`
|
||||
ASNs int `json:"asns"`
|
||||
Prefixes int `json:"prefixes"`
|
||||
IPv4Prefixes int `json:"ipv4_prefixes"`
|
||||
IPv6Prefixes int `json:"ipv6_prefixes"`
|
||||
Peerings int `json:"peerings"`
|
||||
DatabaseSizeBytes int64 `json:"database_size_bytes"`
|
||||
LiveRoutes int `json:"live_routes"`
|
||||
IPv4Routes int `json:"ipv4_routes"`
|
||||
IPv6Routes int `json:"ipv6_routes"`
|
||||
IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"`
|
||||
IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"`
|
||||
HandlerStats []HandlerStatsInfo `json:"handler_stats"`
|
||||
Uptime string `json:"uptime"`
|
||||
TotalMessages uint64 `json:"total_messages"`
|
||||
TotalBytes uint64 `json:"total_bytes"`
|
||||
MessagesPerSec float64 `json:"messages_per_sec"`
|
||||
MbitsPerSec float64 `json:"mbits_per_sec"`
|
||||
Connected bool `json:"connected"`
|
||||
ASNs int `json:"asns"`
|
||||
Prefixes int `json:"prefixes"`
|
||||
IPv4Prefixes int `json:"ipv4_prefixes"`
|
||||
IPv6Prefixes int `json:"ipv6_prefixes"`
|
||||
Peerings int `json:"peerings"`
|
||||
DatabaseSizeBytes int64 `json:"database_size_bytes"`
|
||||
LiveRoutes int `json:"live_routes"`
|
||||
IPv4Routes int `json:"ipv4_routes"`
|
||||
IPv6Routes int `json:"ipv6_routes"`
|
||||
IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"`
|
||||
IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"`
|
||||
HandlerStats []HandlerStatsInfo `json:"handler_stats"`
|
||||
IPv4PrefixDistribution []database.PrefixDistribution `json:"ipv4_prefix_distribution"`
|
||||
IPv6PrefixDistribution []database.PrefixDistribution `json:"ipv6_prefix_distribution"`
|
||||
}
|
||||
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
@ -287,7 +291,6 @@ func (s *Server) handleStats() http.HandlerFunc {
|
||||
errChan := make(chan error)
|
||||
|
||||
go func() {
|
||||
s.logger.Debug("Starting database stats query")
|
||||
dbStats, err := s.db.GetStats()
|
||||
if err != nil {
|
||||
s.logger.Debug("Database stats query failed", "error", err)
|
||||
@ -295,7 +298,6 @@ func (s *Server) handleStats() http.HandlerFunc {
|
||||
|
||||
return
|
||||
}
|
||||
s.logger.Debug("Database stats query completed")
|
||||
statsChan <- dbStats
|
||||
}()
|
||||
|
||||
@ -343,24 +345,26 @@ func (s *Server) handleStats() http.HandlerFunc {
|
||||
}
|
||||
|
||||
stats := StatsResponse{
|
||||
Uptime: uptime,
|
||||
TotalMessages: metrics.TotalMessages,
|
||||
TotalBytes: metrics.TotalBytes,
|
||||
MessagesPerSec: metrics.MessagesPerSec,
|
||||
MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit,
|
||||
Connected: metrics.Connected,
|
||||
ASNs: dbStats.ASNs,
|
||||
Prefixes: dbStats.Prefixes,
|
||||
IPv4Prefixes: dbStats.IPv4Prefixes,
|
||||
IPv6Prefixes: dbStats.IPv6Prefixes,
|
||||
Peerings: dbStats.Peerings,
|
||||
DatabaseSizeBytes: dbStats.FileSizeBytes,
|
||||
LiveRoutes: rtStats.TotalRoutes,
|
||||
IPv4Routes: rtStats.IPv4Routes,
|
||||
IPv6Routes: rtStats.IPv6Routes,
|
||||
IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate,
|
||||
IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate,
|
||||
HandlerStats: handlerStatsInfo,
|
||||
Uptime: uptime,
|
||||
TotalMessages: metrics.TotalMessages,
|
||||
TotalBytes: metrics.TotalBytes,
|
||||
MessagesPerSec: metrics.MessagesPerSec,
|
||||
MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit,
|
||||
Connected: metrics.Connected,
|
||||
ASNs: dbStats.ASNs,
|
||||
Prefixes: dbStats.Prefixes,
|
||||
IPv4Prefixes: dbStats.IPv4Prefixes,
|
||||
IPv6Prefixes: dbStats.IPv6Prefixes,
|
||||
Peerings: dbStats.Peerings,
|
||||
DatabaseSizeBytes: dbStats.FileSizeBytes,
|
||||
LiveRoutes: dbStats.LiveRoutes,
|
||||
IPv4Routes: rtStats.IPv4Routes,
|
||||
IPv6Routes: rtStats.IPv6Routes,
|
||||
IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate,
|
||||
IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate,
|
||||
HandlerStats: handlerStatsInfo,
|
||||
IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution,
|
||||
IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution,
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
@ -153,6 +153,22 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="status-grid">
|
||||
<div class="status-card">
|
||||
<h2>IPv4 Prefix Distribution</h2>
|
||||
<div id="ipv4-prefix-distribution">
|
||||
<!-- Will be populated dynamically -->
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="status-card">
|
||||
<h2>IPv6 Prefix Distribution</h2>
|
||||
<div id="ipv6-prefix-distribution">
|
||||
<!-- Will be populated dynamically -->
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div id="handler-stats-container" class="status-grid">
|
||||
<!-- Handler stats will be dynamically added here -->
|
||||
</div>
|
||||
@ -170,6 +186,29 @@
|
||||
return num.toLocaleString();
|
||||
}
|
||||
|
||||
function updatePrefixDistribution(elementId, distribution) {
|
||||
const container = document.getElementById(elementId);
|
||||
container.innerHTML = '';
|
||||
|
||||
if (!distribution || distribution.length === 0) {
|
||||
container.innerHTML = '<div class="metric"><span class="metric-label">No data</span></div>';
|
||||
return;
|
||||
}
|
||||
|
||||
// Sort by mask length
|
||||
distribution.sort((a, b) => a.mask_length - b.mask_length);
|
||||
|
||||
distribution.forEach(item => {
|
||||
const metric = document.createElement('div');
|
||||
metric.className = 'metric';
|
||||
metric.innerHTML = `
|
||||
<span class="metric-label">/${item.mask_length}</span>
|
||||
<span class="metric-value">${formatNumber(item.count)}</span>
|
||||
`;
|
||||
container.appendChild(metric);
|
||||
});
|
||||
}
|
||||
|
||||
function updateHandlerStats(handlerStats) {
|
||||
const container = document.getElementById('handler-stats-container');
|
||||
container.innerHTML = '';
|
||||
@ -249,6 +288,10 @@
|
||||
// Update handler stats
|
||||
updateHandlerStats(data.handler_stats || []);
|
||||
|
||||
// Update prefix distribution
|
||||
updatePrefixDistribution('ipv4-prefix-distribution', data.ipv4_prefix_distribution);
|
||||
updatePrefixDistribution('ipv6-prefix-distribution', data.ipv6_prefix_distribution);
|
||||
|
||||
// Clear any errors
|
||||
document.getElementById('error').style.display = 'none';
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user