Compare commits

..

No commits in common. "main" and "fix-min-time-calculation" have entirely different histories.

17 changed files with 178692 additions and 207617 deletions

File diff suppressed because it is too large Load Diff

View File

@ -27,7 +27,6 @@ type Store interface {
// Prefix operations // Prefix operations
GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefix, error) GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefix, error)
UpdatePrefixesBatch(prefixes map[string]time.Time) error
// Announcement operations // Announcement operations
RecordAnnouncement(announcement *Announcement) error RecordAnnouncement(announcement *Announcement) error
@ -60,8 +59,6 @@ type Store interface {
// AS and prefix detail operations // AS and prefix detail operations
GetASDetails(asn int) (*ASN, []LiveRoute, error) GetASDetails(asn int) (*ASN, []LiveRoute, error)
GetASDetailsContext(ctx context.Context, asn int) (*ASN, []LiveRoute, error) GetASDetailsContext(ctx context.Context, asn int) (*ASN, []LiveRoute, error)
GetASPeers(asn int) ([]ASPeer, error)
GetASPeersContext(ctx context.Context, asn int) ([]ASPeer, error)
GetPrefixDetails(prefix string) ([]LiveRoute, error) GetPrefixDetails(prefix string) ([]LiveRoute, error)
GetPrefixDetailsContext(ctx context.Context, prefix string) ([]LiveRoute, error) GetPrefixDetailsContext(ctx context.Context, prefix string) ([]LiveRoute, error)
GetRandomPrefixesByLength(maskLength, ipVersion, limit int) ([]LiveRoute, error) GetRandomPrefixesByLength(maskLength, ipVersion, limit int) ([]LiveRoute, error)

View File

@ -8,7 +8,8 @@ import (
// ASN represents an Autonomous System Number // ASN represents an Autonomous System Number
type ASN struct { type ASN struct {
ASN int `json:"asn"` ID uuid.UUID `json:"id"`
Number int `json:"number"`
Handle string `json:"handle"` Handle string `json:"handle"`
Description string `json:"description"` Description string `json:"description"`
FirstSeen time.Time `json:"first_seen"` FirstSeen time.Time `json:"first_seen"`
@ -28,8 +29,8 @@ type Prefix struct {
type Announcement struct { type Announcement struct {
ID uuid.UUID `json:"id"` ID uuid.UUID `json:"id"`
PrefixID uuid.UUID `json:"prefix_id"` PrefixID uuid.UUID `json:"prefix_id"`
PeerASN int `json:"peer_asn"` ASNID uuid.UUID `json:"asn_id"`
OriginASN int `json:"origin_asn"` OriginASNID uuid.UUID `json:"origin_asn_id"`
Path string `json:"path"` // JSON-encoded AS path Path string `json:"path"` // JSON-encoded AS path
NextHop string `json:"next_hop"` NextHop string `json:"next_hop"`
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
@ -39,8 +40,8 @@ type Announcement struct {
// ASNPeering represents a peering relationship between two ASNs // ASNPeering represents a peering relationship between two ASNs
type ASNPeering struct { type ASNPeering struct {
ID uuid.UUID `json:"id"` ID uuid.UUID `json:"id"`
ASA int `json:"as_a"` FromASNID uuid.UUID `json:"from_asn_id"`
ASB int `json:"as_b"` ToASNID uuid.UUID `json:"to_asn_id"`
FirstSeen time.Time `json:"first_seen"` FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"` LastSeen time.Time `json:"last_seen"`
} }
@ -82,7 +83,6 @@ type LiveRouteDeletion struct {
Prefix string Prefix string
OriginASN int OriginASN int
PeerIP string PeerIP string
IPVersion int
} }
// PeerUpdate represents parameters for updating a peer // PeerUpdate represents parameters for updating a peer

View File

@ -1,27 +1,16 @@
-- IMPORTANT: This is the ONLY place where schema changes should be made.
-- We do NOT support migrations. All schema changes MUST be in this file.
-- DO NOT make schema changes anywhere else in the codebase.
CREATE TABLE IF NOT EXISTS asns ( CREATE TABLE IF NOT EXISTS asns (
asn INTEGER PRIMARY KEY, id TEXT PRIMARY KEY,
number INTEGER UNIQUE NOT NULL,
handle TEXT, handle TEXT,
description TEXT, description TEXT,
first_seen DATETIME NOT NULL, first_seen DATETIME NOT NULL,
last_seen DATETIME NOT NULL last_seen DATETIME NOT NULL
); );
-- IPv4 prefixes table CREATE TABLE IF NOT EXISTS prefixes (
CREATE TABLE IF NOT EXISTS prefixes_v4 (
id TEXT PRIMARY KEY,
prefix TEXT UNIQUE NOT NULL,
first_seen DATETIME NOT NULL,
last_seen DATETIME NOT NULL
);
-- IPv6 prefixes table
CREATE TABLE IF NOT EXISTS prefixes_v6 (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
prefix TEXT UNIQUE NOT NULL, prefix TEXT UNIQUE NOT NULL,
ip_version INTEGER NOT NULL, -- 4 for IPv4, 6 for IPv6
first_seen DATETIME NOT NULL, first_seen DATETIME NOT NULL,
last_seen DATETIME NOT NULL last_seen DATETIME NOT NULL
); );
@ -29,14 +18,15 @@ CREATE TABLE IF NOT EXISTS prefixes_v6 (
CREATE TABLE IF NOT EXISTS announcements ( CREATE TABLE IF NOT EXISTS announcements (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
prefix_id TEXT NOT NULL, prefix_id TEXT NOT NULL,
peer_asn INTEGER NOT NULL, asn_id TEXT NOT NULL,
origin_asn INTEGER NOT NULL, origin_asn_id TEXT NOT NULL,
path TEXT NOT NULL, path TEXT NOT NULL,
next_hop TEXT, next_hop TEXT,
timestamp DATETIME NOT NULL, timestamp DATETIME NOT NULL,
is_withdrawal BOOLEAN NOT NULL DEFAULT 0, is_withdrawal BOOLEAN NOT NULL DEFAULT 0,
FOREIGN KEY (peer_asn) REFERENCES asns(asn), FOREIGN KEY (prefix_id) REFERENCES prefixes(id),
FOREIGN KEY (origin_asn) REFERENCES asns(asn) FOREIGN KEY (asn_id) REFERENCES asns(id),
FOREIGN KEY (origin_asn_id) REFERENCES asns(id)
); );
CREATE TABLE IF NOT EXISTS peerings ( CREATE TABLE IF NOT EXISTS peerings (
@ -58,71 +48,49 @@ CREATE TABLE IF NOT EXISTS bgp_peers (
last_message_type TEXT last_message_type TEXT
); );
-- Indexes for prefixes_v4 table CREATE INDEX IF NOT EXISTS idx_prefixes_ip_version ON prefixes(ip_version);
CREATE INDEX IF NOT EXISTS idx_prefixes_v4_prefix ON prefixes_v4(prefix); CREATE INDEX IF NOT EXISTS idx_prefixes_version_prefix ON prefixes(ip_version, prefix);
-- Indexes for prefixes_v6 table
CREATE INDEX IF NOT EXISTS idx_prefixes_v6_prefix ON prefixes_v6(prefix);
CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp); CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp);
CREATE INDEX IF NOT EXISTS idx_announcements_prefix_id ON announcements(prefix_id); CREATE INDEX IF NOT EXISTS idx_announcements_prefix_id ON announcements(prefix_id);
CREATE INDEX IF NOT EXISTS idx_announcements_peer_asn ON announcements(peer_asn); CREATE INDEX IF NOT EXISTS idx_announcements_asn_id ON announcements(asn_id);
CREATE INDEX IF NOT EXISTS idx_announcements_origin_asn ON announcements(origin_asn);
CREATE INDEX IF NOT EXISTS idx_peerings_as_a ON peerings(as_a); CREATE INDEX IF NOT EXISTS idx_peerings_as_a ON peerings(as_a);
CREATE INDEX IF NOT EXISTS idx_peerings_as_b ON peerings(as_b); CREATE INDEX IF NOT EXISTS idx_peerings_as_b ON peerings(as_b);
CREATE INDEX IF NOT EXISTS idx_peerings_lookup ON peerings(as_a, as_b); CREATE INDEX IF NOT EXISTS idx_peerings_lookup ON peerings(as_a, as_b);
-- Additional indexes for prefixes table
CREATE INDEX IF NOT EXISTS idx_prefixes_prefix ON prefixes(prefix);
-- Indexes for asns table -- Indexes for asns table
CREATE INDEX IF NOT EXISTS idx_asns_asn ON asns(asn); CREATE INDEX IF NOT EXISTS idx_asns_number ON asns(number);
-- Indexes for bgp_peers table -- Indexes for bgp_peers table
CREATE INDEX IF NOT EXISTS idx_bgp_peers_asn ON bgp_peers(peer_asn); CREATE INDEX IF NOT EXISTS idx_bgp_peers_asn ON bgp_peers(peer_asn);
CREATE INDEX IF NOT EXISTS idx_bgp_peers_last_seen ON bgp_peers(last_seen); CREATE INDEX IF NOT EXISTS idx_bgp_peers_last_seen ON bgp_peers(last_seen);
CREATE INDEX IF NOT EXISTS idx_bgp_peers_ip ON bgp_peers(peer_ip); CREATE INDEX IF NOT EXISTS idx_bgp_peers_ip ON bgp_peers(peer_ip);
-- IPv4 routing table maintained by PrefixHandler -- Live routing table maintained by PrefixHandler
CREATE TABLE IF NOT EXISTS live_routes_v4 ( CREATE TABLE IF NOT EXISTS live_routes (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
prefix TEXT NOT NULL, prefix TEXT NOT NULL,
mask_length INTEGER NOT NULL, -- CIDR mask length (0-32) mask_length INTEGER NOT NULL, -- CIDR mask length (0-32 for IPv4, 0-128 for IPv6)
ip_version INTEGER NOT NULL, -- 4 or 6
origin_asn INTEGER NOT NULL, origin_asn INTEGER NOT NULL,
peer_ip TEXT NOT NULL, peer_ip TEXT NOT NULL,
as_path TEXT NOT NULL, -- JSON array as_path TEXT NOT NULL, -- JSON array
next_hop TEXT NOT NULL, next_hop TEXT NOT NULL,
last_updated DATETIME NOT NULL, last_updated DATETIME NOT NULL,
-- IPv4 range columns for fast lookups -- IPv4 range columns for fast lookups (NULL for IPv6)
ip_start INTEGER NOT NULL, -- Start of IPv4 range as 32-bit unsigned int v4_ip_start INTEGER, -- Start of IPv4 range as 32-bit unsigned int
ip_end INTEGER NOT NULL, -- End of IPv4 range as 32-bit unsigned int v4_ip_end INTEGER, -- End of IPv4 range as 32-bit unsigned int
UNIQUE(prefix, origin_asn, peer_ip) UNIQUE(prefix, origin_asn, peer_ip)
); );
-- IPv6 routing table maintained by PrefixHandler -- Indexes for live_routes table
CREATE TABLE IF NOT EXISTS live_routes_v6 ( CREATE INDEX IF NOT EXISTS idx_live_routes_prefix ON live_routes(prefix);
id TEXT PRIMARY KEY, CREATE INDEX IF NOT EXISTS idx_live_routes_mask_length ON live_routes(mask_length);
prefix TEXT NOT NULL, CREATE INDEX IF NOT EXISTS idx_live_routes_ip_version_mask ON live_routes(ip_version, mask_length);
mask_length INTEGER NOT NULL, -- CIDR mask length (0-128) CREATE INDEX IF NOT EXISTS idx_live_routes_last_updated ON live_routes(last_updated);
origin_asn INTEGER NOT NULL,
peer_ip TEXT NOT NULL,
as_path TEXT NOT NULL, -- JSON array
next_hop TEXT NOT NULL,
last_updated DATETIME NOT NULL,
-- Note: IPv6 doesn't use integer range columns
UNIQUE(prefix, origin_asn, peer_ip)
);
-- Indexes for live_routes_v4 table
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_prefix ON live_routes_v4(prefix);
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_mask_length ON live_routes_v4(mask_length);
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_origin_asn ON live_routes_v4(origin_asn);
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_last_updated ON live_routes_v4(last_updated);
-- Indexes for IPv4 range queries -- Indexes for IPv4 range queries
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_ip_range ON live_routes_v4(ip_start, ip_end); CREATE INDEX IF NOT EXISTS idx_live_routes_ipv4_range ON live_routes(v4_ip_start, v4_ip_end) WHERE ip_version = 4;
-- Index to optimize prefix distribution queries -- Index to optimize COUNT(DISTINCT prefix) queries
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_mask_prefix ON live_routes_v4(mask_length, prefix); CREATE INDEX IF NOT EXISTS idx_live_routes_ip_mask_prefix ON live_routes(ip_version, mask_length, prefix);
-- Indexes for live_routes_v6 table
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_prefix ON live_routes_v6(prefix);
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_mask_length ON live_routes_v6(mask_length);
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_origin_asn ON live_routes_v6(origin_asn);
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_last_updated ON live_routes_v6(last_updated);
-- Index to optimize prefix distribution queries
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_mask_prefix ON live_routes_v6(mask_length, prefix);

View File

@ -8,7 +8,7 @@ import (
"git.eeqj.de/sneak/routewatch/internal/logger" "git.eeqj.de/sneak/routewatch/internal/logger"
) )
const slowQueryThreshold = 25 * time.Millisecond const slowQueryThreshold = 50 * time.Millisecond
// logSlowQuery logs queries that take longer than slowQueryThreshold // logSlowQuery logs queries that take longer than slowQueryThreshold
func logSlowQuery(logger *logger.Logger, query string, start time.Time) { func logSlowQuery(logger *logger.Logger, query string, start time.Time) {

View File

@ -61,7 +61,8 @@ func (m *mockStore) GetOrCreateASN(number int, timestamp time.Time) (*database.A
} }
asn := &database.ASN{ asn := &database.ASN{
ASN: number, ID: uuid.New(),
Number: number,
FirstSeen: timestamp, FirstSeen: timestamp,
LastSeen: timestamp, LastSeen: timestamp,
} }
@ -71,37 +72,6 @@ func (m *mockStore) GetOrCreateASN(number int, timestamp time.Time) (*database.A
return asn, nil return asn, nil
} }
// UpdatePrefixesBatch mock implementation
func (m *mockStore) UpdatePrefixesBatch(prefixes map[string]time.Time) error {
m.mu.Lock()
defer m.mu.Unlock()
for prefix, timestamp := range prefixes {
if p, exists := m.Prefixes[prefix]; exists {
p.LastSeen = timestamp
} else {
const (
ipVersionV4 = 4
ipVersionV6 = 6
)
ipVersion := ipVersionV4
if strings.Contains(prefix, ":") {
ipVersion = ipVersionV6
}
m.Prefixes[prefix] = &database.Prefix{
ID: uuid.New(),
Prefix: prefix,
IPVersion: ipVersion,
FirstSeen: timestamp,
LastSeen: timestamp,
}
}
}
return nil
}
// GetOrCreatePrefix mock implementation // GetOrCreatePrefix mock implementation
func (m *mockStore) GetOrCreatePrefix(prefix string, timestamp time.Time) (*database.Prefix, error) { func (m *mockStore) GetOrCreatePrefix(prefix string, timestamp time.Time) (*database.Prefix, error) {
m.mu.Lock() m.mu.Lock()
@ -291,17 +261,6 @@ func (m *mockStore) GetRandomPrefixesByLengthContext(ctx context.Context, maskLe
return m.GetRandomPrefixesByLength(maskLength, ipVersion, limit) return m.GetRandomPrefixesByLength(maskLength, ipVersion, limit)
} }
// GetASPeers mock implementation
func (m *mockStore) GetASPeers(asn int) ([]database.ASPeer, error) {
// Return empty peers for now
return []database.ASPeer{}, nil
}
// GetASPeersContext mock implementation with context support
func (m *mockStore) GetASPeersContext(ctx context.Context, asn int) ([]database.ASPeer, error) {
return m.GetASPeers(asn)
}
// UpsertLiveRouteBatch mock implementation // UpsertLiveRouteBatch mock implementation
func (m *mockStore) UpsertLiveRouteBatch(routes []*database.LiveRoute) error { func (m *mockStore) UpsertLiveRouteBatch(routes []*database.LiveRoute) error {
m.mu.Lock() m.mu.Lock()
@ -343,7 +302,8 @@ func (m *mockStore) GetOrCreateASNBatch(asns map[int]time.Time) error {
for number, timestamp := range asns { for number, timestamp := range asns {
if _, exists := m.ASNs[number]; !exists { if _, exists := m.ASNs[number]; !exists {
m.ASNs[number] = &database.ASN{ m.ASNs[number] = &database.ASN{
ASN: number, ID: uuid.New(),
Number: number,
FirstSeen: timestamp, FirstSeen: timestamp,
LastSeen: timestamp, LastSeen: timestamp,
} }

View File

@ -21,7 +21,7 @@ const (
pathExpirationTime = 30 * time.Minute pathExpirationTime = 30 * time.Minute
// peeringProcessInterval is how often to process AS paths into peerings // peeringProcessInterval is how often to process AS paths into peerings
peeringProcessInterval = 30 * time.Second peeringProcessInterval = 2 * time.Minute
// pathPruneInterval is how often to prune old AS paths // pathPruneInterval is how often to prune old AS paths
pathPruneInterval = 5 * time.Minute pathPruneInterval = 5 * time.Minute

View File

@ -19,7 +19,7 @@ const (
prefixHandlerQueueSize = 100000 prefixHandlerQueueSize = 100000
// prefixBatchSize is the number of prefix updates to batch together // prefixBatchSize is the number of prefix updates to batch together
prefixBatchSize = 25000 prefixBatchSize = 20000
// prefixBatchTimeout is the maximum time to wait before flushing a batch // prefixBatchTimeout is the maximum time to wait before flushing a batch
// DO NOT reduce this timeout - larger batches are more efficient // DO NOT reduce this timeout - larger batches are more efficient
@ -182,15 +182,9 @@ func (h *PrefixHandler) flushBatchLocked() {
var routesToUpsert []*database.LiveRoute var routesToUpsert []*database.LiveRoute
var routesToDelete []database.LiveRouteDeletion var routesToDelete []database.LiveRouteDeletion
// Collect unique prefixes to update // Skip the prefix table updates entirely - just update live_routes
prefixesToUpdate := make(map[string]time.Time) // The prefix table is not critical for routing lookups
for _, update := range prefixMap { for _, update := range prefixMap {
// Track prefix for both announcements and withdrawals
if _, exists := prefixesToUpdate[update.prefix]; !exists || update.timestamp.After(prefixesToUpdate[update.prefix]) {
prefixesToUpdate[update.prefix] = update.timestamp
}
if update.messageType == "announcement" && update.originASN > 0 { if update.messageType == "announcement" && update.originASN > 0 {
// Create live route for batch upsert // Create live route for batch upsert
route := h.createLiveRoute(update) route := h.createLiveRoute(update)
@ -198,20 +192,11 @@ func (h *PrefixHandler) flushBatchLocked() {
routesToUpsert = append(routesToUpsert, route) routesToUpsert = append(routesToUpsert, route)
} }
} else if update.messageType == "withdrawal" { } else if update.messageType == "withdrawal" {
// Parse CIDR to get IP version
_, ipVersion, err := parseCIDR(update.prefix)
if err != nil {
h.logger.Error("Failed to parse CIDR for withdrawal", "prefix", update.prefix, "error", err)
continue
}
// Create deletion record for batch delete // Create deletion record for batch delete
routesToDelete = append(routesToDelete, database.LiveRouteDeletion{ routesToDelete = append(routesToDelete, database.LiveRouteDeletion{
Prefix: update.prefix, Prefix: update.prefix,
OriginASN: update.originASN, OriginASN: update.originASN,
PeerIP: update.peer, PeerIP: update.peer,
IPVersion: ipVersion,
}) })
} }
} }
@ -234,13 +219,6 @@ func (h *PrefixHandler) flushBatchLocked() {
} }
} }
// Update prefix tables
if len(prefixesToUpdate) > 0 {
if err := h.db.UpdatePrefixesBatch(prefixesToUpdate); err != nil {
h.logger.Error("Failed to update prefix batch", "error", err, "count", len(prefixesToUpdate))
}
}
elapsed := time.Since(startTime) elapsed := time.Since(startTime)
h.logger.Debug("Flushed prefix batch", h.logger.Debug("Flushed prefix batch",
"batch_size", batchSize, "batch_size", batchSize,

View File

@ -80,8 +80,8 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
} }
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
// Create a 4 second timeout context for this request // Create a 1 second timeout context for this request
ctx, cancel := context.WithTimeout(r.Context(), 4*time.Second) ctx, cancel := context.WithTimeout(r.Context(), 1*time.Second)
defer cancel() defer cancel()
metrics := s.streamer.GetMetrics() metrics := s.streamer.GetMetrics()
@ -174,15 +174,14 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
func (s *Server) handleStats() http.HandlerFunc { func (s *Server) handleStats() http.HandlerFunc {
// HandlerStatsInfo represents handler statistics in the API response // HandlerStatsInfo represents handler statistics in the API response
type HandlerStatsInfo struct { type HandlerStatsInfo struct {
Name string `json:"name"` Name string `json:"name"`
QueueLength int `json:"queue_length"` QueueLength int `json:"queue_length"`
QueueCapacity int `json:"queue_capacity"` QueueCapacity int `json:"queue_capacity"`
QueueHighWaterMark int `json:"queue_high_water_mark"` ProcessedCount uint64 `json:"processed_count"`
ProcessedCount uint64 `json:"processed_count"` DroppedCount uint64 `json:"dropped_count"`
DroppedCount uint64 `json:"dropped_count"` AvgProcessTimeMs float64 `json:"avg_process_time_ms"`
AvgProcessTimeMs float64 `json:"avg_process_time_ms"` MinProcessTimeMs float64 `json:"min_process_time_ms"`
MinProcessTimeMs float64 `json:"min_process_time_ms"` MaxProcessTimeMs float64 `json:"max_process_time_ms"`
MaxProcessTimeMs float64 `json:"max_process_time_ms"`
} }
// StatsResponse represents the API statistics response // StatsResponse represents the API statistics response
@ -214,8 +213,8 @@ func (s *Server) handleStats() http.HandlerFunc {
} }
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
// Create a 4 second timeout context for this request // Create a 1 second timeout context for this request
ctx, cancel := context.WithTimeout(r.Context(), 4*time.Second) ctx, cancel := context.WithTimeout(r.Context(), 1*time.Second)
defer cancel() defer cancel()
// Check if context is already cancelled // Check if context is already cancelled
@ -252,7 +251,7 @@ func (s *Server) handleStats() http.HandlerFunc {
return return
case err := <-errChan: case err := <-errChan:
s.logger.Error("Failed to get database stats", "error", err) s.logger.Error("Failed to get database stats", "error", err)
writeJSONError(w, http.StatusInternalServerError, err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
case dbStats = <-statsChan: case dbStats = <-statsChan:
@ -282,15 +281,14 @@ func (s *Server) handleStats() http.HandlerFunc {
const microsecondsPerMillisecond = 1000.0 const microsecondsPerMillisecond = 1000.0
for _, hs := range handlerStats { for _, hs := range handlerStats {
handlerStatsInfo = append(handlerStatsInfo, HandlerStatsInfo{ handlerStatsInfo = append(handlerStatsInfo, HandlerStatsInfo{
Name: hs.Name, Name: hs.Name,
QueueLength: hs.QueueLength, QueueLength: hs.QueueLength,
QueueCapacity: hs.QueueCapacity, QueueCapacity: hs.QueueCapacity,
QueueHighWaterMark: hs.QueueHighWaterMark, ProcessedCount: hs.ProcessedCount,
ProcessedCount: hs.ProcessedCount, DroppedCount: hs.DroppedCount,
DroppedCount: hs.DroppedCount, AvgProcessTimeMs: float64(hs.AvgProcessTime.Microseconds()) / microsecondsPerMillisecond,
AvgProcessTimeMs: float64(hs.AvgProcessTime.Microseconds()) / microsecondsPerMillisecond, MinProcessTimeMs: float64(hs.MinProcessTime.Microseconds()) / microsecondsPerMillisecond,
MinProcessTimeMs: float64(hs.MinProcessTime.Microseconds()) / microsecondsPerMillisecond, MaxProcessTimeMs: float64(hs.MaxProcessTime.Microseconds()) / microsecondsPerMillisecond,
MaxProcessTimeMs: float64(hs.MaxProcessTime.Microseconds()) / microsecondsPerMillisecond,
}) })
} }
@ -493,14 +491,6 @@ func (s *Server) handleASDetail() http.HandlerFunc {
return return
} }
// Get peers
peers, err := s.db.GetASPeersContext(r.Context(), asn)
if err != nil {
s.logger.Error("Failed to get AS peers", "error", err)
// Continue without peers rather than failing the whole request
peers = []database.ASPeer{}
}
// Group prefixes by IP version // Group prefixes by IP version
const ipVersionV4 = 4 const ipVersionV4 = 4
var ipv4Prefixes, ipv6Prefixes []database.LiveRoute var ipv4Prefixes, ipv6Prefixes []database.LiveRoute
@ -557,8 +547,6 @@ func (s *Server) handleASDetail() http.HandlerFunc {
TotalCount int TotalCount int
IPv4Count int IPv4Count int
IPv6Count int IPv6Count int
Peers []database.ASPeer
PeerCount int
}{ }{
ASN: asInfo, ASN: asInfo,
IPv4Prefixes: ipv4Prefixes, IPv4Prefixes: ipv4Prefixes,
@ -566,8 +554,6 @@ func (s *Server) handleASDetail() http.HandlerFunc {
TotalCount: len(prefixes), TotalCount: len(prefixes),
IPv4Count: len(ipv4Prefixes), IPv4Count: len(ipv4Prefixes),
IPv6Count: len(ipv6Prefixes), IPv6Count: len(ipv6Prefixes),
Peers: peers,
PeerCount: len(peers),
} }
// Check if context is still valid before writing response // Check if context is still valid before writing response
@ -619,7 +605,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
// Group by origin AS and collect unique AS info // Group by origin AS and collect unique AS info
type ASNInfo struct { type ASNInfo struct {
ASN int Number int
Handle string Handle string
Description string Description string
PeerCount int PeerCount int
@ -636,7 +622,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
description = asInfo.Description description = asInfo.Description
} }
originMap[route.OriginASN] = &ASNInfo{ originMap[route.OriginASN] = &ASNInfo{
ASN: route.OriginASN, Number: route.OriginASN,
Handle: handle, Handle: handle,
Description: description, Description: description,
PeerCount: 0, PeerCount: 0,
@ -669,7 +655,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
// Create enhanced routes with AS path handles // Create enhanced routes with AS path handles
type ASPathEntry struct { type ASPathEntry struct {
ASN int Number int
Handle string Handle string
} }
type EnhancedRoute struct { type EnhancedRoute struct {
@ -688,7 +674,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
for j, asn := range route.ASPath { for j, asn := range route.ASPath {
handle := asinfo.GetHandle(asn) handle := asinfo.GetHandle(asn)
enhancedRoute.ASPathWithHandle[j] = ASPathEntry{ enhancedRoute.ASPathWithHandle[j] = ASPathEntry{
ASN: asn, Number: asn,
Handle: handle, Handle: handle,
} }
} }

View File

@ -217,68 +217,3 @@ func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler {
}) })
} }
} }
// JSONValidationMiddleware ensures all JSON API responses are valid JSON
func JSONValidationMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Create a custom response writer to capture the response
rw := &responseWriter{
ResponseWriter: w,
body: &bytes.Buffer{},
statusCode: http.StatusOK,
}
// Serve the request
next.ServeHTTP(rw, r)
// Check if it's meant to be a JSON response
contentType := rw.Header().Get("Content-Type")
isJSON := contentType == "application/json" || contentType == ""
// If it's not JSON or has content, pass through
if !isJSON && rw.body.Len() > 0 {
w.WriteHeader(rw.statusCode)
_, _ = w.Write(rw.body.Bytes())
return
}
// For JSON responses, validate the JSON
if rw.body.Len() > 0 {
var testParse interface{}
if err := json.Unmarshal(rw.body.Bytes(), &testParse); err == nil {
// Valid JSON, write it out
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(rw.statusCode)
_, _ = w.Write(rw.body.Bytes())
return
}
}
// If we get here, either there's no body or invalid JSON
// Write a proper error response
w.Header().Set("Content-Type", "application/json")
// Determine appropriate status code
statusCode := rw.statusCode
if statusCode == http.StatusOK {
statusCode = http.StatusInternalServerError
}
w.WriteHeader(statusCode)
errorMsg := "Internal server error"
if statusCode == http.StatusRequestTimeout {
errorMsg = "Request timeout"
}
response := map[string]interface{}{
"status": "error",
"error": map[string]interface{}{
"msg": errorMsg,
"code": statusCode,
},
}
_ = json.NewEncoder(w).Encode(response)
})
}

View File

@ -16,14 +16,14 @@ func (s *Server) setupRoutes() {
r.Use(middleware.RealIP) r.Use(middleware.RealIP)
r.Use(middleware.Logger) r.Use(middleware.Logger)
r.Use(middleware.Recoverer) r.Use(middleware.Recoverer)
const requestTimeout = 8 * time.Second const requestTimeout = 2 * time.Second
r.Use(TimeoutMiddleware(requestTimeout)) r.Use(TimeoutMiddleware(requestTimeout))
r.Use(JSONResponseMiddleware) r.Use(JSONResponseMiddleware)
// Routes // Routes
r.Get("/", s.handleRoot()) r.Get("/", s.handleRoot())
r.Get("/status", s.handleStatusHTML()) r.Get("/status", s.handleStatusHTML())
r.Get("/status.json", JSONValidationMiddleware(s.handleStatusJSON()).ServeHTTP) r.Get("/status.json", s.handleStatusJSON())
// AS and prefix detail pages // AS and prefix detail pages
r.Get("/as/{asn}", s.handleASDetail()) r.Get("/as/{asn}", s.handleASDetail())
@ -33,7 +33,6 @@ func (s *Server) setupRoutes() {
// API routes // API routes
r.Route("/api/v1", func(r chi.Router) { r.Route("/api/v1", func(r chi.Router) {
r.Use(JSONValidationMiddleware)
r.Get("/stats", s.handleStats()) r.Get("/stats", s.handleStats())
r.Get("/ip/{ip}", s.handleIPLookup()) r.Get("/ip/{ip}", s.handleIPLookup())
r.Get("/as/{asn}", s.handleASDetailJSON()) r.Get("/as/{asn}", s.handleASDetailJSON())

View File

@ -42,7 +42,7 @@ func (s *Server) Start() error {
port = "8080" port = "8080"
} }
const readHeaderTimeout = 40 * time.Second const readHeaderTimeout = 10 * time.Second
s.srv = &http.Server{ s.srv = &http.Server{
Addr: ":" + port, Addr: ":" + port,
Handler: s.router, Handler: s.router,

View File

@ -8,7 +8,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math" "math"
"math/rand"
"net/http" "net/http"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -30,10 +29,6 @@ const (
bytesPerKB = 1024 bytesPerKB = 1024
bytesPerMB = 1024 * 1024 bytesPerMB = 1024 * 1024
maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers
// Backpressure constants
backpressureThreshold = 0.5 // Start dropping at 50% queue utilization
backpressureSlope = 2.0 // Slope for linear drop probability increase
) )
// MessageHandler is an interface for handling RIS messages // MessageHandler is an interface for handling RIS messages
@ -54,13 +49,12 @@ type RawMessageHandler func(line string)
// handlerMetrics tracks performance metrics for a handler // handlerMetrics tracks performance metrics for a handler
type handlerMetrics struct { type handlerMetrics struct {
processedCount uint64 // Total messages processed processedCount uint64 // Total messages processed
droppedCount uint64 // Total messages dropped droppedCount uint64 // Total messages dropped
totalTime time.Duration // Total processing time (for average calculation) totalTime time.Duration // Total processing time (for average calculation)
minTime time.Duration // Minimum processing time minTime time.Duration // Minimum processing time
maxTime time.Duration // Maximum processing time maxTime time.Duration // Maximum processing time
queueHighWaterMark int // Maximum queue length seen mu sync.Mutex // Protects the metrics
mu sync.Mutex // Protects the metrics
} }
// handlerInfo wraps a handler with its queue and metrics // handlerInfo wraps a handler with its queue and metrics
@ -80,8 +74,7 @@ type Streamer struct {
cancel context.CancelFunc cancel context.CancelFunc
running bool running bool
metrics *metrics.Tracker metrics *metrics.Tracker
totalDropped uint64 // Total dropped messages across all handlers totalDropped uint64 // Total dropped messages across all handlers
random *rand.Rand // Random number generator for backpressure drops
} }
// New creates a new RIS streamer // New creates a new RIS streamer
@ -93,8 +86,6 @@ func New(logger *logger.Logger, metrics *metrics.Tracker) *Streamer {
}, },
handlers: make([]*handlerInfo, 0), handlers: make([]*handlerInfo, 0),
metrics: metrics, metrics: metrics,
//nolint:gosec // Non-cryptographic randomness is fine for backpressure
random: rand.New(rand.NewSource(time.Now().UnixNano())),
} }
} }
@ -215,15 +206,14 @@ func (s *Streamer) GetMetricsTracker() *metrics.Tracker {
// HandlerStats represents metrics for a single handler // HandlerStats represents metrics for a single handler
type HandlerStats struct { type HandlerStats struct {
Name string Name string
QueueLength int QueueLength int
QueueCapacity int QueueCapacity int
QueueHighWaterMark int ProcessedCount uint64
ProcessedCount uint64 DroppedCount uint64
DroppedCount uint64 AvgProcessTime time.Duration
AvgProcessTime time.Duration MinProcessTime time.Duration
MinProcessTime time.Duration MaxProcessTime time.Duration
MaxProcessTime time.Duration
} }
// GetHandlerStats returns current handler statistics // GetHandlerStats returns current handler statistics
@ -237,14 +227,13 @@ func (s *Streamer) GetHandlerStats() []HandlerStats {
info.metrics.mu.Lock() info.metrics.mu.Lock()
hs := HandlerStats{ hs := HandlerStats{
Name: fmt.Sprintf("%T", info.handler), Name: fmt.Sprintf("%T", info.handler),
QueueLength: len(info.queue), QueueLength: len(info.queue),
QueueCapacity: cap(info.queue), QueueCapacity: cap(info.queue),
QueueHighWaterMark: info.metrics.queueHighWaterMark, ProcessedCount: info.metrics.processedCount,
ProcessedCount: info.metrics.processedCount, DroppedCount: info.metrics.droppedCount,
DroppedCount: info.metrics.droppedCount, MinProcessTime: info.metrics.minTime,
MinProcessTime: info.metrics.minTime, MaxProcessTime: info.metrics.maxTime,
MaxProcessTime: info.metrics.maxTime,
} }
// Calculate average time // Calculate average time
@ -537,33 +526,15 @@ func (s *Streamer) stream(ctx context.Context) error {
// Dispatch to interested handlers // Dispatch to interested handlers
s.mu.RLock() s.mu.RLock()
for _, info := range s.handlers { for _, info := range s.handlers {
if !info.handler.WantsMessage(msg.Type) { if info.handler.WantsMessage(msg.Type) {
continue select {
} case info.queue <- &msg:
// Message queued successfully
// Check if we should drop due to backpressure default:
if s.shouldDropForBackpressure(info) { // Queue is full, drop the message
atomic.AddUint64(&info.metrics.droppedCount, 1) atomic.AddUint64(&info.metrics.droppedCount, 1)
atomic.AddUint64(&s.totalDropped, 1) atomic.AddUint64(&s.totalDropped, 1)
continue
}
// Try to queue the message
select {
case info.queue <- &msg:
// Message queued successfully
// Update high water mark if needed
queueLen := len(info.queue)
info.metrics.mu.Lock()
if queueLen > info.metrics.queueHighWaterMark {
info.metrics.queueHighWaterMark = queueLen
} }
info.metrics.mu.Unlock()
default:
// Queue is full, drop the message
atomic.AddUint64(&info.metrics.droppedCount, 1)
atomic.AddUint64(&s.totalDropped, 1)
} }
} }
s.mu.RUnlock() s.mu.RUnlock()
@ -575,25 +546,3 @@ func (s *Streamer) stream(ctx context.Context) error {
return nil return nil
} }
// shouldDropForBackpressure determines if a message should be dropped based on queue utilization
func (s *Streamer) shouldDropForBackpressure(info *handlerInfo) bool {
// Calculate queue utilization
queueLen := len(info.queue)
queueCap := cap(info.queue)
utilization := float64(queueLen) / float64(queueCap)
// No drops below threshold
if utilization < backpressureThreshold {
return false
}
// Calculate drop probability (0.0 at threshold, 1.0 at 100% full)
dropProbability := (utilization - backpressureThreshold) * backpressureSlope
if dropProbability > 1.0 {
dropProbability = 1.0
}
// Random drop based on probability
return s.random.Float64() < dropProbability
}

View File

@ -3,7 +3,7 @@
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AS{{.ASN.ASN}} - {{.ASN.Handle}} - RouteWatch</title> <title>AS{{.ASN.Number}} - {{.ASN.Handle}} - RouteWatch</title>
<style> <style>
body { body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
@ -136,7 +136,7 @@
<div class="container"> <div class="container">
<a href="/status" class="nav-link">← Back to Status</a> <a href="/status" class="nav-link">← Back to Status</a>
<h1>AS{{.ASN.ASN}}{{if .ASN.Handle}} - {{.ASN.Handle}}{{end}}</h1> <h1>AS{{.ASN.Number}}{{if .ASN.Handle}} - {{.ASN.Handle}}{{end}}</h1>
{{if .ASN.Description}} {{if .ASN.Description}}
<p class="subtitle">{{.ASN.Description}}</p> <p class="subtitle">{{.ASN.Description}}</p>
{{end}} {{end}}
@ -154,10 +154,6 @@
<div class="info-label">IPv6 Prefixes</div> <div class="info-label">IPv6 Prefixes</div>
<div class="info-value">{{.IPv6Count}}</div> <div class="info-value">{{.IPv6Count}}</div>
</div> </div>
<div class="info-card">
<div class="info-label">Peer ASNs</div>
<div class="info-value">{{.PeerCount}}</div>
</div>
<div class="info-card"> <div class="info-card">
<div class="info-label">First Seen</div> <div class="info-label">First Seen</div>
<div class="info-value">{{.ASN.FirstSeen.Format "2006-01-02"}}</div> <div class="info-value">{{.ASN.FirstSeen.Format "2006-01-02"}}</div>
@ -227,44 +223,6 @@
<p>No prefixes announced by this AS</p> <p>No prefixes announced by this AS</p>
</div> </div>
{{end}} {{end}}
{{if .Peers}}
<div class="prefix-section">
<div class="prefix-header">
<h2>Peer ASNs</h2>
<span class="prefix-count">{{.PeerCount}}</span>
</div>
<table class="prefix-table">
<thead>
<tr>
<th>ASN</th>
<th>Handle</th>
<th>Description</th>
<th>First Seen</th>
<th>Last Seen</th>
</tr>
</thead>
<tbody>
{{range .Peers}}
<tr>
<td><a href="/as/{{.ASN}}" class="prefix-link">AS{{.ASN}}</a></td>
<td>{{if .Handle}}{{.Handle}}{{else}}-{{end}}</td>
<td>{{if .Description}}{{.Description}}{{else}}-{{end}}</td>
<td>{{.FirstSeen.Format "2006-01-02"}}</td>
<td>{{.LastSeen.Format "2006-01-02"}}</td>
</tr>
{{end}}
</tbody>
</table>
</div>
{{else}}
<div class="prefix-section">
<h2>Peer ASNs</h2>
<div class="empty-state">
<p>No peering relationships found for this AS</p>
</div>
</div>
{{end}}
</div> </div>
</body> </body>
</html> </html>

View File

@ -207,7 +207,7 @@
<div class="origin-list"> <div class="origin-list">
{{range .Origins}} {{range .Origins}}
<div class="origin-item"> <div class="origin-item">
<a href="/as/{{.ASN}}" class="as-link">AS{{.ASN}}</a> <a href="/as/{{.Number}}" class="as-link">AS{{.Number}}</a>
{{if .Handle}} ({{.Handle}}){{end}} {{if .Handle}} ({{.Handle}}){{end}}
<span style="color: #7f8c8d; margin-left: 10px;">{{.PeerCount}} peer{{if ne .PeerCount 1}}s{{end}}</span> <span style="color: #7f8c8d; margin-left: 10px;">{{.PeerCount}} peer{{if ne .PeerCount 1}}s{{end}}</span>
</div> </div>
@ -240,7 +240,7 @@
<a href="/as/{{.OriginASN}}" class="as-link">AS{{.OriginASN}}</a> <a href="/as/{{.OriginASN}}" class="as-link">AS{{.OriginASN}}</a>
</td> </td>
<td class="peer-ip">{{.PeerIP}}</td> <td class="peer-ip">{{.PeerIP}}</td>
<td class="as-path">{{range $i, $as := .ASPathWithHandle}}{{if $i}} → {{end}}<a href="/as/{{$as.ASN}}" class="as-link">{{if $as.Handle}}{{$as.Handle}}{{else}}AS{{$as.ASN}}{{end}}</a>{{end}}</td> <td class="as-path">{{range $i, $as := .ASPathWithHandle}}{{if $i}} → {{end}}<a href="/as/{{$as.Number}}" class="as-link">{{if $as.Handle}}{{$as.Handle}}{{else}}AS{{$as.Number}}{{end}}</a>{{end}}</td>
<td class="peer-ip">{{.NextHop}}</td> <td class="peer-ip">{{.NextHop}}</td>
<td>{{.LastUpdated.Format "2006-01-02 15:04:05"}}</td> <td>{{.LastUpdated.Format "2006-01-02 15:04:05"}}</td>
<td class="age">{{.LastUpdated | timeSince}}</td> <td class="age">{{.LastUpdated | timeSince}}</td>

View File

@ -264,10 +264,6 @@
<span class="metric-label">Queue</span> <span class="metric-label">Queue</span>
<span class="metric-value">${handler.queue_length}/${handler.queue_capacity}</span> <span class="metric-value">${handler.queue_length}/${handler.queue_capacity}</span>
</div> </div>
<div class="metric">
<span class="metric-label">High Water Mark</span>
<span class="metric-value">${handler.queue_high_water_mark}/${handler.queue_capacity} (${Math.round(handler.queue_high_water_mark * 100 / handler.queue_capacity)}%)</span>
</div>
<div class="metric"> <div class="metric">
<span class="metric-label">Processed</span> <span class="metric-label">Processed</span>
<span class="metric-value">${formatNumber(handler.processed_count)}</span> <span class="metric-value">${formatNumber(handler.processed_count)}</span>
@ -290,39 +286,6 @@
}); });
} }
function resetAllFields() {
// Reset all metric fields to '-'
document.getElementById('connected').textContent = '-';
document.getElementById('connected').className = 'metric-value';
document.getElementById('uptime').textContent = '-';
document.getElementById('go_version').textContent = '-';
document.getElementById('goroutines').textContent = '-';
document.getElementById('memory_usage').textContent = '-';
document.getElementById('total_messages').textContent = '-';
document.getElementById('messages_per_sec').textContent = '-';
document.getElementById('total_bytes').textContent = '-';
document.getElementById('mbits_per_sec').textContent = '-';
document.getElementById('asns').textContent = '-';
document.getElementById('prefixes').textContent = '-';
document.getElementById('ipv4_prefixes').textContent = '-';
document.getElementById('ipv6_prefixes').textContent = '-';
document.getElementById('peerings').textContent = '-';
document.getElementById('peers').textContent = '-';
document.getElementById('database_size').textContent = '-';
document.getElementById('live_routes').textContent = '-';
document.getElementById('ipv4_routes').textContent = '-';
document.getElementById('ipv6_routes').textContent = '-';
document.getElementById('ipv4_updates_per_sec').textContent = '-';
document.getElementById('ipv6_updates_per_sec').textContent = '-';
// Clear handler stats
document.getElementById('handler-stats-container').innerHTML = '';
// Clear prefix distributions
document.getElementById('ipv4-prefix-distribution').innerHTML = '<div class="metric"><span class="metric-label">No data</span></div>';
document.getElementById('ipv6-prefix-distribution').innerHTML = '<div class="metric"><span class="metric-label">No data</span></div>';
}
function updateStatus() { function updateStatus() {
fetch('/api/v1/stats') fetch('/api/v1/stats')
.then(response => response.json()) .then(response => response.json())
@ -331,7 +294,6 @@
if (response.status === 'error') { if (response.status === 'error') {
document.getElementById('error').textContent = 'Error: ' + response.error.msg; document.getElementById('error').textContent = 'Error: ' + response.error.msg;
document.getElementById('error').style.display = 'block'; document.getElementById('error').style.display = 'block';
resetAllFields();
return; return;
} }
@ -378,13 +340,12 @@
.catch(error => { .catch(error => {
document.getElementById('error').textContent = 'Error fetching status: ' + error; document.getElementById('error').textContent = 'Error fetching status: ' + error;
document.getElementById('error').style.display = 'block'; document.getElementById('error').style.display = 'block';
resetAllFields();
}); });
} }
// Update immediately and then every 2 seconds // Update immediately and then every 500ms
updateStatus(); updateStatus();
setInterval(updateStatus, 2000); setInterval(updateStatus, 500);
</script> </script>
</body> </body>
</html> </html>

385054
log.txt

File diff suppressed because it is too large Load Diff