Compare commits

..

No commits in common. "main" and "optimize-sqlite-settings" have entirely different histories.

21 changed files with 271289 additions and 208194 deletions

View File

@ -21,7 +21,7 @@ clean:
rm -rf bin/ rm -rf bin/
run: build run: build
DEBUG=routewatch ./bin/routewatch 2>&1 | tee log.txt ./bin/routewatch
asupdate: asupdate:
@echo "Updating AS info data..." @echo "Updating AS info data..."

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,6 @@
package database package database
import ( import (
"context"
"time" "time"
) )
@ -27,7 +26,6 @@ type Store interface {
// Prefix operations // Prefix operations
GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefix, error) GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefix, error)
UpdatePrefixesBatch(prefixes map[string]time.Time) error
// Announcement operations // Announcement operations
RecordAnnouncement(announcement *Announcement) error RecordAnnouncement(announcement *Announcement) error
@ -37,7 +35,6 @@ type Store interface {
// Statistics // Statistics
GetStats() (Stats, error) GetStats() (Stats, error)
GetStatsContext(ctx context.Context) (Stats, error)
// Peer operations // Peer operations
UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error
@ -49,23 +46,14 @@ type Store interface {
DeleteLiveRoute(prefix string, originASN int, peerIP string) error DeleteLiveRoute(prefix string, originASN int, peerIP string) error
DeleteLiveRouteBatch(deletions []LiveRouteDeletion) error DeleteLiveRouteBatch(deletions []LiveRouteDeletion) error
GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error) GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error)
GetPrefixDistributionContext(ctx context.Context) (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error)
GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error)
GetLiveRouteCountsContext(ctx context.Context) (ipv4Count, ipv6Count int, err error)
// IP lookup operations // IP lookup operations
GetASInfoForIP(ip string) (*ASInfo, error) GetASInfoForIP(ip string) (*ASInfo, error)
GetASInfoForIPContext(ctx context.Context, ip string) (*ASInfo, error)
// AS and prefix detail operations // AS and prefix detail operations
GetASDetails(asn int) (*ASN, []LiveRoute, error) GetASDetails(asn int) (*ASN, []LiveRoute, error)
GetASDetailsContext(ctx context.Context, asn int) (*ASN, []LiveRoute, error)
GetASPeers(asn int) ([]ASPeer, error)
GetASPeersContext(ctx context.Context, asn int) ([]ASPeer, error)
GetPrefixDetails(prefix string) ([]LiveRoute, error) GetPrefixDetails(prefix string) ([]LiveRoute, error)
GetPrefixDetailsContext(ctx context.Context, prefix string) ([]LiveRoute, error)
GetRandomPrefixesByLength(maskLength, ipVersion, limit int) ([]LiveRoute, error)
GetRandomPrefixesByLengthContext(ctx context.Context, maskLength, ipVersion, limit int) ([]LiveRoute, error)
// Lifecycle // Lifecycle
Close() error Close() error

View File

@ -8,7 +8,8 @@ import (
// ASN represents an Autonomous System Number // ASN represents an Autonomous System Number
type ASN struct { type ASN struct {
ASN int `json:"asn"` ID uuid.UUID `json:"id"`
Number int `json:"number"`
Handle string `json:"handle"` Handle string `json:"handle"`
Description string `json:"description"` Description string `json:"description"`
FirstSeen time.Time `json:"first_seen"` FirstSeen time.Time `json:"first_seen"`
@ -28,8 +29,8 @@ type Prefix struct {
type Announcement struct { type Announcement struct {
ID uuid.UUID `json:"id"` ID uuid.UUID `json:"id"`
PrefixID uuid.UUID `json:"prefix_id"` PrefixID uuid.UUID `json:"prefix_id"`
PeerASN int `json:"peer_asn"` ASNID uuid.UUID `json:"asn_id"`
OriginASN int `json:"origin_asn"` OriginASNID uuid.UUID `json:"origin_asn_id"`
Path string `json:"path"` // JSON-encoded AS path Path string `json:"path"` // JSON-encoded AS path
NextHop string `json:"next_hop"` NextHop string `json:"next_hop"`
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
@ -39,8 +40,8 @@ type Announcement struct {
// ASNPeering represents a peering relationship between two ASNs // ASNPeering represents a peering relationship between two ASNs
type ASNPeering struct { type ASNPeering struct {
ID uuid.UUID `json:"id"` ID uuid.UUID `json:"id"`
ASA int `json:"as_a"` FromASNID uuid.UUID `json:"from_asn_id"`
ASB int `json:"as_b"` ToASNID uuid.UUID `json:"to_asn_id"`
FirstSeen time.Time `json:"first_seen"` FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"` LastSeen time.Time `json:"last_seen"`
} }
@ -82,7 +83,6 @@ type LiveRouteDeletion struct {
Prefix string Prefix string
OriginASN int OriginASN int
PeerIP string PeerIP string
IPVersion int
} }
// PeerUpdate represents parameters for updating a peer // PeerUpdate represents parameters for updating a peer

View File

@ -1,27 +1,16 @@
-- IMPORTANT: This is the ONLY place where schema changes should be made.
-- We do NOT support migrations. All schema changes MUST be in this file.
-- DO NOT make schema changes anywhere else in the codebase.
CREATE TABLE IF NOT EXISTS asns ( CREATE TABLE IF NOT EXISTS asns (
asn INTEGER PRIMARY KEY, id TEXT PRIMARY KEY,
number INTEGER UNIQUE NOT NULL,
handle TEXT, handle TEXT,
description TEXT, description TEXT,
first_seen DATETIME NOT NULL, first_seen DATETIME NOT NULL,
last_seen DATETIME NOT NULL last_seen DATETIME NOT NULL
); );
-- IPv4 prefixes table CREATE TABLE IF NOT EXISTS prefixes (
CREATE TABLE IF NOT EXISTS prefixes_v4 (
id TEXT PRIMARY KEY,
prefix TEXT UNIQUE NOT NULL,
first_seen DATETIME NOT NULL,
last_seen DATETIME NOT NULL
);
-- IPv6 prefixes table
CREATE TABLE IF NOT EXISTS prefixes_v6 (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
prefix TEXT UNIQUE NOT NULL, prefix TEXT UNIQUE NOT NULL,
ip_version INTEGER NOT NULL, -- 4 for IPv4, 6 for IPv6
first_seen DATETIME NOT NULL, first_seen DATETIME NOT NULL,
last_seen DATETIME NOT NULL last_seen DATETIME NOT NULL
); );
@ -29,14 +18,15 @@ CREATE TABLE IF NOT EXISTS prefixes_v6 (
CREATE TABLE IF NOT EXISTS announcements ( CREATE TABLE IF NOT EXISTS announcements (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
prefix_id TEXT NOT NULL, prefix_id TEXT NOT NULL,
peer_asn INTEGER NOT NULL, asn_id TEXT NOT NULL,
origin_asn INTEGER NOT NULL, origin_asn_id TEXT NOT NULL,
path TEXT NOT NULL, path TEXT NOT NULL,
next_hop TEXT, next_hop TEXT,
timestamp DATETIME NOT NULL, timestamp DATETIME NOT NULL,
is_withdrawal BOOLEAN NOT NULL DEFAULT 0, is_withdrawal BOOLEAN NOT NULL DEFAULT 0,
FOREIGN KEY (peer_asn) REFERENCES asns(asn), FOREIGN KEY (prefix_id) REFERENCES prefixes(id),
FOREIGN KEY (origin_asn) REFERENCES asns(asn) FOREIGN KEY (asn_id) REFERENCES asns(id),
FOREIGN KEY (origin_asn_id) REFERENCES asns(id)
); );
CREATE TABLE IF NOT EXISTS peerings ( CREATE TABLE IF NOT EXISTS peerings (
@ -58,71 +48,47 @@ CREATE TABLE IF NOT EXISTS bgp_peers (
last_message_type TEXT last_message_type TEXT
); );
-- Indexes for prefixes_v4 table CREATE INDEX IF NOT EXISTS idx_prefixes_ip_version ON prefixes(ip_version);
CREATE INDEX IF NOT EXISTS idx_prefixes_v4_prefix ON prefixes_v4(prefix); CREATE INDEX IF NOT EXISTS idx_prefixes_version_prefix ON prefixes(ip_version, prefix);
-- Indexes for prefixes_v6 table
CREATE INDEX IF NOT EXISTS idx_prefixes_v6_prefix ON prefixes_v6(prefix);
CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp); CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp);
CREATE INDEX IF NOT EXISTS idx_announcements_prefix_id ON announcements(prefix_id); CREATE INDEX IF NOT EXISTS idx_announcements_prefix_id ON announcements(prefix_id);
CREATE INDEX IF NOT EXISTS idx_announcements_peer_asn ON announcements(peer_asn); CREATE INDEX IF NOT EXISTS idx_announcements_asn_id ON announcements(asn_id);
CREATE INDEX IF NOT EXISTS idx_announcements_origin_asn ON announcements(origin_asn);
CREATE INDEX IF NOT EXISTS idx_peerings_as_a ON peerings(as_a); CREATE INDEX IF NOT EXISTS idx_peerings_as_a ON peerings(as_a);
CREATE INDEX IF NOT EXISTS idx_peerings_as_b ON peerings(as_b); CREATE INDEX IF NOT EXISTS idx_peerings_as_b ON peerings(as_b);
CREATE INDEX IF NOT EXISTS idx_peerings_lookup ON peerings(as_a, as_b); CREATE INDEX IF NOT EXISTS idx_peerings_lookup ON peerings(as_a, as_b);
-- Additional indexes for prefixes table
CREATE INDEX IF NOT EXISTS idx_prefixes_prefix ON prefixes(prefix);
-- Indexes for asns table -- Indexes for asns table
CREATE INDEX IF NOT EXISTS idx_asns_asn ON asns(asn); CREATE INDEX IF NOT EXISTS idx_asns_number ON asns(number);
-- Indexes for bgp_peers table -- Indexes for bgp_peers table
CREATE INDEX IF NOT EXISTS idx_bgp_peers_asn ON bgp_peers(peer_asn); CREATE INDEX IF NOT EXISTS idx_bgp_peers_asn ON bgp_peers(peer_asn);
CREATE INDEX IF NOT EXISTS idx_bgp_peers_last_seen ON bgp_peers(last_seen); CREATE INDEX IF NOT EXISTS idx_bgp_peers_last_seen ON bgp_peers(last_seen);
CREATE INDEX IF NOT EXISTS idx_bgp_peers_ip ON bgp_peers(peer_ip); CREATE INDEX IF NOT EXISTS idx_bgp_peers_ip ON bgp_peers(peer_ip);
-- IPv4 routing table maintained by PrefixHandler -- Live routing table maintained by PrefixHandler
CREATE TABLE IF NOT EXISTS live_routes_v4 ( CREATE TABLE IF NOT EXISTS live_routes (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
prefix TEXT NOT NULL, prefix TEXT NOT NULL,
mask_length INTEGER NOT NULL, -- CIDR mask length (0-32) mask_length INTEGER NOT NULL, -- CIDR mask length (0-32 for IPv4, 0-128 for IPv6)
ip_version INTEGER NOT NULL, -- 4 or 6
origin_asn INTEGER NOT NULL, origin_asn INTEGER NOT NULL,
peer_ip TEXT NOT NULL, peer_ip TEXT NOT NULL,
as_path TEXT NOT NULL, -- JSON array as_path TEXT NOT NULL, -- JSON array
next_hop TEXT NOT NULL, next_hop TEXT NOT NULL,
last_updated DATETIME NOT NULL, last_updated DATETIME NOT NULL,
-- IPv4 range columns for fast lookups -- IPv4 range columns for fast lookups (NULL for IPv6)
ip_start INTEGER NOT NULL, -- Start of IPv4 range as 32-bit unsigned int v4_ip_start INTEGER, -- Start of IPv4 range as 32-bit unsigned int
ip_end INTEGER NOT NULL, -- End of IPv4 range as 32-bit unsigned int v4_ip_end INTEGER, -- End of IPv4 range as 32-bit unsigned int
UNIQUE(prefix, origin_asn, peer_ip) UNIQUE(prefix, origin_asn, peer_ip)
); );
-- IPv6 routing table maintained by PrefixHandler -- Indexes for live_routes table
CREATE TABLE IF NOT EXISTS live_routes_v6 ( CREATE INDEX IF NOT EXISTS idx_live_routes_prefix ON live_routes(prefix);
id TEXT PRIMARY KEY, CREATE INDEX IF NOT EXISTS idx_live_routes_mask_length ON live_routes(mask_length);
prefix TEXT NOT NULL, CREATE INDEX IF NOT EXISTS idx_live_routes_ip_version_mask ON live_routes(ip_version, mask_length);
mask_length INTEGER NOT NULL, -- CIDR mask length (0-128) CREATE INDEX IF NOT EXISTS idx_live_routes_last_updated ON live_routes(last_updated);
origin_asn INTEGER NOT NULL,
peer_ip TEXT NOT NULL,
as_path TEXT NOT NULL, -- JSON array
next_hop TEXT NOT NULL,
last_updated DATETIME NOT NULL,
-- Note: IPv6 doesn't use integer range columns
UNIQUE(prefix, origin_asn, peer_ip)
);
-- Indexes for live_routes_v4 table
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_prefix ON live_routes_v4(prefix);
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_mask_length ON live_routes_v4(mask_length);
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_origin_asn ON live_routes_v4(origin_asn);
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_last_updated ON live_routes_v4(last_updated);
-- Indexes for IPv4 range queries -- Indexes for IPv4 range queries
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_ip_range ON live_routes_v4(ip_start, ip_end); CREATE INDEX IF NOT EXISTS idx_live_routes_ipv4_range ON live_routes(v4_ip_start, v4_ip_end) WHERE ip_version = 4;
-- Index to optimize prefix distribution queries
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_mask_prefix ON live_routes_v4(mask_length, prefix);
-- Indexes for live_routes_v6 table
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_prefix ON live_routes_v6(prefix);
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_mask_length ON live_routes_v6(mask_length);
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_origin_asn ON live_routes_v6(origin_asn);
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_last_updated ON live_routes_v6(last_updated);
-- Index to optimize prefix distribution queries
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_mask_prefix ON live_routes_v6(mask_length, prefix);

View File

@ -8,7 +8,7 @@ import (
"git.eeqj.de/sneak/routewatch/internal/logger" "git.eeqj.de/sneak/routewatch/internal/logger"
) )
const slowQueryThreshold = 25 * time.Millisecond const slowQueryThreshold = 50 * time.Millisecond
// logSlowQuery logs queries that take longer than slowQueryThreshold // logSlowQuery logs queries that take longer than slowQueryThreshold
func logSlowQuery(logger *logger.Logger, query string, start time.Time) { func logSlowQuery(logger *logger.Logger, query string, start time.Time) {
@ -19,7 +19,6 @@ func logSlowQuery(logger *logger.Logger, query string, start time.Time) {
} }
// queryRow wraps QueryRow with slow query logging // queryRow wraps QueryRow with slow query logging
// nolint:unused // kept for consistency with other query wrappers
func (d *Database) queryRow(query string, args ...interface{}) *sql.Row { func (d *Database) queryRow(query string, args ...interface{}) *sql.Row {
start := time.Now() start := time.Now()
defer logSlowQuery(d.logger, query, start) defer logSlowQuery(d.logger, query, start)

View File

@ -61,7 +61,8 @@ func (m *mockStore) GetOrCreateASN(number int, timestamp time.Time) (*database.A
} }
asn := &database.ASN{ asn := &database.ASN{
ASN: number, ID: uuid.New(),
Number: number,
FirstSeen: timestamp, FirstSeen: timestamp,
LastSeen: timestamp, LastSeen: timestamp,
} }
@ -71,37 +72,6 @@ func (m *mockStore) GetOrCreateASN(number int, timestamp time.Time) (*database.A
return asn, nil return asn, nil
} }
// UpdatePrefixesBatch mock implementation
func (m *mockStore) UpdatePrefixesBatch(prefixes map[string]time.Time) error {
m.mu.Lock()
defer m.mu.Unlock()
for prefix, timestamp := range prefixes {
if p, exists := m.Prefixes[prefix]; exists {
p.LastSeen = timestamp
} else {
const (
ipVersionV4 = 4
ipVersionV6 = 6
)
ipVersion := ipVersionV4
if strings.Contains(prefix, ":") {
ipVersion = ipVersionV6
}
m.Prefixes[prefix] = &database.Prefix{
ID: uuid.New(),
Prefix: prefix,
IPVersion: ipVersion,
FirstSeen: timestamp,
LastSeen: timestamp,
}
}
}
return nil
}
// GetOrCreatePrefix mock implementation // GetOrCreatePrefix mock implementation
func (m *mockStore) GetOrCreatePrefix(prefix string, timestamp time.Time) (*database.Prefix, error) { func (m *mockStore) GetOrCreatePrefix(prefix string, timestamp time.Time) (*database.Prefix, error) {
m.mu.Lock() m.mu.Lock()
@ -193,11 +163,6 @@ func (m *mockStore) GetStats() (database.Stats, error) {
}, nil }, nil
} }
// GetStatsContext returns statistics about the mock store with context support
func (m *mockStore) GetStatsContext(ctx context.Context) (database.Stats, error) {
return m.GetStats()
}
// UpsertLiveRoute mock implementation // UpsertLiveRoute mock implementation
func (m *mockStore) UpsertLiveRoute(route *database.LiveRoute) error { func (m *mockStore) UpsertLiveRoute(route *database.LiveRoute) error {
// Simple mock - just return nil // Simple mock - just return nil
@ -216,22 +181,12 @@ func (m *mockStore) GetPrefixDistribution() (ipv4 []database.PrefixDistribution,
return nil, nil, nil return nil, nil, nil
} }
// GetPrefixDistributionContext mock implementation with context support
func (m *mockStore) GetPrefixDistributionContext(ctx context.Context) (ipv4 []database.PrefixDistribution, ipv6 []database.PrefixDistribution, err error) {
return m.GetPrefixDistribution()
}
// GetLiveRouteCounts mock implementation // GetLiveRouteCounts mock implementation
func (m *mockStore) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) { func (m *mockStore) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) {
// Return mock counts // Return mock counts
return m.RouteCount / 2, m.RouteCount / 2, nil return m.RouteCount / 2, m.RouteCount / 2, nil
} }
// GetLiveRouteCountsContext mock implementation with context support
func (m *mockStore) GetLiveRouteCountsContext(ctx context.Context) (ipv4Count, ipv6Count int, err error) {
return m.GetLiveRouteCounts()
}
// GetASInfoForIP mock implementation // GetASInfoForIP mock implementation
func (m *mockStore) GetASInfoForIP(ip string) (*database.ASInfo, error) { func (m *mockStore) GetASInfoForIP(ip string) (*database.ASInfo, error) {
// Simple mock - return a test AS // Simple mock - return a test AS
@ -246,11 +201,6 @@ func (m *mockStore) GetASInfoForIP(ip string) (*database.ASInfo, error) {
}, nil }, nil
} }
// GetASInfoForIPContext mock implementation with context support
func (m *mockStore) GetASInfoForIPContext(ctx context.Context, ip string) (*database.ASInfo, error) {
return m.GetASInfoForIP(ip)
}
// GetASDetails mock implementation // GetASDetails mock implementation
func (m *mockStore) GetASDetails(asn int) (*database.ASN, []database.LiveRoute, error) { func (m *mockStore) GetASDetails(asn int) (*database.ASN, []database.LiveRoute, error) {
m.mu.Lock() m.mu.Lock()
@ -265,43 +215,12 @@ func (m *mockStore) GetASDetails(asn int) (*database.ASN, []database.LiveRoute,
return nil, nil, database.ErrNoRoute return nil, nil, database.ErrNoRoute
} }
// GetASDetailsContext mock implementation with context support
func (m *mockStore) GetASDetailsContext(ctx context.Context, asn int) (*database.ASN, []database.LiveRoute, error) {
return m.GetASDetails(asn)
}
// GetPrefixDetails mock implementation // GetPrefixDetails mock implementation
func (m *mockStore) GetPrefixDetails(prefix string) ([]database.LiveRoute, error) { func (m *mockStore) GetPrefixDetails(prefix string) ([]database.LiveRoute, error) {
// Return empty routes for now // Return empty routes for now
return []database.LiveRoute{}, nil return []database.LiveRoute{}, nil
} }
// GetPrefixDetailsContext mock implementation with context support
func (m *mockStore) GetPrefixDetailsContext(ctx context.Context, prefix string) ([]database.LiveRoute, error) {
return m.GetPrefixDetails(prefix)
}
func (m *mockStore) GetRandomPrefixesByLength(maskLength, ipVersion, limit int) ([]database.LiveRoute, error) {
// Return empty routes for now
return []database.LiveRoute{}, nil
}
// GetRandomPrefixesByLengthContext mock implementation with context support
func (m *mockStore) GetRandomPrefixesByLengthContext(ctx context.Context, maskLength, ipVersion, limit int) ([]database.LiveRoute, error) {
return m.GetRandomPrefixesByLength(maskLength, ipVersion, limit)
}
// GetASPeers mock implementation
func (m *mockStore) GetASPeers(asn int) ([]database.ASPeer, error) {
// Return empty peers for now
return []database.ASPeer{}, nil
}
// GetASPeersContext mock implementation with context support
func (m *mockStore) GetASPeersContext(ctx context.Context, asn int) ([]database.ASPeer, error) {
return m.GetASPeers(asn)
}
// UpsertLiveRouteBatch mock implementation // UpsertLiveRouteBatch mock implementation
func (m *mockStore) UpsertLiveRouteBatch(routes []*database.LiveRoute) error { func (m *mockStore) UpsertLiveRouteBatch(routes []*database.LiveRoute) error {
m.mu.Lock() m.mu.Lock()
@ -343,7 +262,8 @@ func (m *mockStore) GetOrCreateASNBatch(asns map[int]time.Time) error {
for number, timestamp := range asns { for number, timestamp := range asns {
if _, exists := m.ASNs[number]; !exists { if _, exists := m.ASNs[number]; !exists {
m.ASNs[number] = &database.ASN{ m.ASNs[number] = &database.ASN{
ASN: number, ID: uuid.New(),
Number: number,
FirstSeen: timestamp, FirstSeen: timestamp,
LastSeen: timestamp, LastSeen: timestamp,
} }

View File

@ -11,14 +11,12 @@ import (
const ( const (
// asHandlerQueueSize is the queue capacity for ASN operations // asHandlerQueueSize is the queue capacity for ASN operations
// DO NOT set this higher than 100000 without explicit instructions
asHandlerQueueSize = 100000 asHandlerQueueSize = 100000
// asnBatchSize is the number of ASN operations to batch together // asnBatchSize is the number of ASN operations to batch together
asnBatchSize = 30000 asnBatchSize = 10000
// asnBatchTimeout is the maximum time to wait before flushing a batch // asnBatchTimeout is the maximum time to wait before flushing a batch
// DO NOT reduce this timeout - larger batches are more efficient
asnBatchTimeout = 2 * time.Second asnBatchTimeout = 2 * time.Second
) )

View File

@ -21,7 +21,7 @@ const (
pathExpirationTime = 30 * time.Minute pathExpirationTime = 30 * time.Minute
// peeringProcessInterval is how often to process AS paths into peerings // peeringProcessInterval is how often to process AS paths into peerings
peeringProcessInterval = 30 * time.Second peeringProcessInterval = 2 * time.Minute
// pathPruneInterval is how often to prune old AS paths // pathPruneInterval is how often to prune old AS paths
pathPruneInterval = 5 * time.Minute pathPruneInterval = 5 * time.Minute

View File

@ -15,14 +15,12 @@ import (
const ( const (
// prefixHandlerQueueSize is the queue capacity for prefix tracking operations // prefixHandlerQueueSize is the queue capacity for prefix tracking operations
// DO NOT set this higher than 100000 without explicit instructions
prefixHandlerQueueSize = 100000 prefixHandlerQueueSize = 100000
// prefixBatchSize is the number of prefix updates to batch together // prefixBatchSize is the number of prefix updates to batch together
prefixBatchSize = 25000 prefixBatchSize = 5000
// prefixBatchTimeout is the maximum time to wait before flushing a batch // prefixBatchTimeout is the maximum time to wait before flushing a batch
// DO NOT reduce this timeout - larger batches are more efficient
prefixBatchTimeout = 1 * time.Second prefixBatchTimeout = 1 * time.Second
// IP version constants // IP version constants
@ -182,15 +180,9 @@ func (h *PrefixHandler) flushBatchLocked() {
var routesToUpsert []*database.LiveRoute var routesToUpsert []*database.LiveRoute
var routesToDelete []database.LiveRouteDeletion var routesToDelete []database.LiveRouteDeletion
// Collect unique prefixes to update // Skip the prefix table updates entirely - just update live_routes
prefixesToUpdate := make(map[string]time.Time) // The prefix table is not critical for routing lookups
for _, update := range prefixMap { for _, update := range prefixMap {
// Track prefix for both announcements and withdrawals
if _, exists := prefixesToUpdate[update.prefix]; !exists || update.timestamp.After(prefixesToUpdate[update.prefix]) {
prefixesToUpdate[update.prefix] = update.timestamp
}
if update.messageType == "announcement" && update.originASN > 0 { if update.messageType == "announcement" && update.originASN > 0 {
// Create live route for batch upsert // Create live route for batch upsert
route := h.createLiveRoute(update) route := h.createLiveRoute(update)
@ -198,20 +190,11 @@ func (h *PrefixHandler) flushBatchLocked() {
routesToUpsert = append(routesToUpsert, route) routesToUpsert = append(routesToUpsert, route)
} }
} else if update.messageType == "withdrawal" { } else if update.messageType == "withdrawal" {
// Parse CIDR to get IP version
_, ipVersion, err := parseCIDR(update.prefix)
if err != nil {
h.logger.Error("Failed to parse CIDR for withdrawal", "prefix", update.prefix, "error", err)
continue
}
// Create deletion record for batch delete // Create deletion record for batch delete
routesToDelete = append(routesToDelete, database.LiveRouteDeletion{ routesToDelete = append(routesToDelete, database.LiveRouteDeletion{
Prefix: update.prefix, Prefix: update.prefix,
OriginASN: update.originASN, OriginASN: update.originASN,
PeerIP: update.peer, PeerIP: update.peer,
IPVersion: ipVersion,
}) })
} }
} }
@ -234,13 +217,6 @@ func (h *PrefixHandler) flushBatchLocked() {
} }
} }
// Update prefix tables
if len(prefixesToUpdate) > 0 {
if err := h.db.UpdatePrefixesBatch(prefixesToUpdate); err != nil {
h.logger.Error("Failed to update prefix batch", "error", err, "count", len(prefixesToUpdate))
}
}
elapsed := time.Since(startTime) elapsed := time.Since(startTime)
h.logger.Debug("Flushed prefix batch", h.logger.Debug("Flushed prefix batch",
"batch_size", batchSize, "batch_size", batchSize,

View File

@ -15,7 +15,6 @@ import (
"git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/templates" "git.eeqj.de/sneak/routewatch/internal/templates"
asinfo "git.eeqj.de/sneak/routewatch/pkg/asinfo"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
) )
@ -80,8 +79,8 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
} }
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
// Create a 4 second timeout context for this request // Create a 1 second timeout context for this request
ctx, cancel := context.WithTimeout(r.Context(), 4*time.Second) ctx, cancel := context.WithTimeout(r.Context(), 1*time.Second)
defer cancel() defer cancel()
metrics := s.streamer.GetMetrics() metrics := s.streamer.GetMetrics()
@ -91,7 +90,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
errChan := make(chan error) errChan := make(chan error)
go func() { go func() {
dbStats, err := s.db.GetStatsContext(ctx) dbStats, err := s.db.GetStats()
if err != nil { if err != nil {
s.logger.Debug("Database stats query failed", "error", err) s.logger.Debug("Database stats query failed", "error", err)
errChan <- err errChan <- err
@ -125,7 +124,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
const bitsPerMegabit = 1000000.0 const bitsPerMegabit = 1000000.0
// Get route counts from database // Get route counts from database
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCountsContext(ctx) ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts()
if err != nil { if err != nil {
s.logger.Warn("Failed to get live route counts", "error", err) s.logger.Warn("Failed to get live route counts", "error", err)
// Continue with zero counts // Continue with zero counts
@ -174,15 +173,14 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
func (s *Server) handleStats() http.HandlerFunc { func (s *Server) handleStats() http.HandlerFunc {
// HandlerStatsInfo represents handler statistics in the API response // HandlerStatsInfo represents handler statistics in the API response
type HandlerStatsInfo struct { type HandlerStatsInfo struct {
Name string `json:"name"` Name string `json:"name"`
QueueLength int `json:"queue_length"` QueueLength int `json:"queue_length"`
QueueCapacity int `json:"queue_capacity"` QueueCapacity int `json:"queue_capacity"`
QueueHighWaterMark int `json:"queue_high_water_mark"` ProcessedCount uint64 `json:"processed_count"`
ProcessedCount uint64 `json:"processed_count"` DroppedCount uint64 `json:"dropped_count"`
DroppedCount uint64 `json:"dropped_count"` AvgProcessTimeMs float64 `json:"avg_process_time_ms"`
AvgProcessTimeMs float64 `json:"avg_process_time_ms"` MinProcessTimeMs float64 `json:"min_process_time_ms"`
MinProcessTimeMs float64 `json:"min_process_time_ms"` MaxProcessTimeMs float64 `json:"max_process_time_ms"`
MaxProcessTimeMs float64 `json:"max_process_time_ms"`
} }
// StatsResponse represents the API statistics response // StatsResponse represents the API statistics response
@ -214,8 +212,8 @@ func (s *Server) handleStats() http.HandlerFunc {
} }
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
// Create a 4 second timeout context for this request // Create a 1 second timeout context for this request
ctx, cancel := context.WithTimeout(r.Context(), 4*time.Second) ctx, cancel := context.WithTimeout(r.Context(), 1*time.Second)
defer cancel() defer cancel()
// Check if context is already cancelled // Check if context is already cancelled
@ -234,7 +232,7 @@ func (s *Server) handleStats() http.HandlerFunc {
errChan := make(chan error) errChan := make(chan error)
go func() { go func() {
dbStats, err := s.db.GetStatsContext(ctx) dbStats, err := s.db.GetStats()
if err != nil { if err != nil {
s.logger.Debug("Database stats query failed", "error", err) s.logger.Debug("Database stats query failed", "error", err)
errChan <- err errChan <- err
@ -248,11 +246,12 @@ func (s *Server) handleStats() http.HandlerFunc {
select { select {
case <-ctx.Done(): case <-ctx.Done():
s.logger.Error("Database stats timeout") s.logger.Error("Database stats timeout")
// Don't write response here - timeout middleware already handles it http.Error(w, "Database timeout", http.StatusRequestTimeout)
return return
case err := <-errChan: case err := <-errChan:
s.logger.Error("Failed to get database stats", "error", err) s.logger.Error("Failed to get database stats", "error", err)
writeJSONError(w, http.StatusInternalServerError, err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
case dbStats = <-statsChan: case dbStats = <-statsChan:
@ -267,7 +266,7 @@ func (s *Server) handleStats() http.HandlerFunc {
const bitsPerMegabit = 1000000.0 const bitsPerMegabit = 1000000.0
// Get route counts from database // Get route counts from database
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCountsContext(ctx) ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts()
if err != nil { if err != nil {
s.logger.Warn("Failed to get live route counts", "error", err) s.logger.Warn("Failed to get live route counts", "error", err)
// Continue with zero counts // Continue with zero counts
@ -282,15 +281,14 @@ func (s *Server) handleStats() http.HandlerFunc {
const microsecondsPerMillisecond = 1000.0 const microsecondsPerMillisecond = 1000.0
for _, hs := range handlerStats { for _, hs := range handlerStats {
handlerStatsInfo = append(handlerStatsInfo, HandlerStatsInfo{ handlerStatsInfo = append(handlerStatsInfo, HandlerStatsInfo{
Name: hs.Name, Name: hs.Name,
QueueLength: hs.QueueLength, QueueLength: hs.QueueLength,
QueueCapacity: hs.QueueCapacity, QueueCapacity: hs.QueueCapacity,
QueueHighWaterMark: hs.QueueHighWaterMark, ProcessedCount: hs.ProcessedCount,
ProcessedCount: hs.ProcessedCount, DroppedCount: hs.DroppedCount,
DroppedCount: hs.DroppedCount, AvgProcessTimeMs: float64(hs.AvgProcessTime.Microseconds()) / microsecondsPerMillisecond,
AvgProcessTimeMs: float64(hs.AvgProcessTime.Microseconds()) / microsecondsPerMillisecond, MinProcessTimeMs: float64(hs.MinProcessTime.Microseconds()) / microsecondsPerMillisecond,
MinProcessTimeMs: float64(hs.MinProcessTime.Microseconds()) / microsecondsPerMillisecond, MaxProcessTimeMs: float64(hs.MaxProcessTime.Microseconds()) / microsecondsPerMillisecond,
MaxProcessTimeMs: float64(hs.MaxProcessTime.Microseconds()) / microsecondsPerMillisecond,
}) })
} }
@ -355,7 +353,7 @@ func (s *Server) handleIPLookup() http.HandlerFunc {
} }
// Look up AS information for the IP // Look up AS information for the IP
asInfo, err := s.db.GetASInfoForIPContext(r.Context(), ip) asInfo, err := s.db.GetASInfoForIP(ip)
if err != nil { if err != nil {
// Check if it's an invalid IP error // Check if it's an invalid IP error
if errors.Is(err, database.ErrInvalidIP) { if errors.Is(err, database.ErrInvalidIP) {
@ -386,7 +384,7 @@ func (s *Server) handleASDetailJSON() http.HandlerFunc {
return return
} }
asInfo, prefixes, err := s.db.GetASDetailsContext(r.Context(), asn) asInfo, prefixes, err := s.db.GetASDetails(asn)
if err != nil { if err != nil {
if errors.Is(err, database.ErrNoRoute) { if errors.Is(err, database.ErrNoRoute) {
writeJSONError(w, http.StatusNotFound, err.Error()) writeJSONError(w, http.StatusNotFound, err.Error())
@ -439,7 +437,7 @@ func (s *Server) handlePrefixDetailJSON() http.HandlerFunc {
return return
} }
routes, err := s.db.GetPrefixDetailsContext(r.Context(), prefix) routes, err := s.db.GetPrefixDetails(prefix)
if err != nil { if err != nil {
if errors.Is(err, database.ErrNoRoute) { if errors.Is(err, database.ErrNoRoute) {
writeJSONError(w, http.StatusNotFound, err.Error()) writeJSONError(w, http.StatusNotFound, err.Error())
@ -481,7 +479,7 @@ func (s *Server) handleASDetail() http.HandlerFunc {
return return
} }
asInfo, prefixes, err := s.db.GetASDetailsContext(r.Context(), asn) asInfo, prefixes, err := s.db.GetASDetails(asn)
if err != nil { if err != nil {
if errors.Is(err, database.ErrNoRoute) { if errors.Is(err, database.ErrNoRoute) {
http.Error(w, "AS not found", http.StatusNotFound) http.Error(w, "AS not found", http.StatusNotFound)
@ -493,14 +491,6 @@ func (s *Server) handleASDetail() http.HandlerFunc {
return return
} }
// Get peers
peers, err := s.db.GetASPeersContext(r.Context(), asn)
if err != nil {
s.logger.Error("Failed to get AS peers", "error", err)
// Continue without peers rather than failing the whole request
peers = []database.ASPeer{}
}
// Group prefixes by IP version // Group prefixes by IP version
const ipVersionV4 = 4 const ipVersionV4 = 4
var ipv4Prefixes, ipv6Prefixes []database.LiveRoute var ipv4Prefixes, ipv6Prefixes []database.LiveRoute
@ -557,8 +547,6 @@ func (s *Server) handleASDetail() http.HandlerFunc {
TotalCount int TotalCount int
IPv4Count int IPv4Count int
IPv6Count int IPv6Count int
Peers []database.ASPeer
PeerCount int
}{ }{
ASN: asInfo, ASN: asInfo,
IPv4Prefixes: ipv4Prefixes, IPv4Prefixes: ipv4Prefixes,
@ -566,16 +554,6 @@ func (s *Server) handleASDetail() http.HandlerFunc {
TotalCount: len(prefixes), TotalCount: len(prefixes),
IPv4Count: len(ipv4Prefixes), IPv4Count: len(ipv4Prefixes),
IPv6Count: len(ipv6Prefixes), IPv6Count: len(ipv6Prefixes),
Peers: peers,
PeerCount: len(peers),
}
// Check if context is still valid before writing response
select {
case <-r.Context().Done():
// Request was cancelled, don't write response
return
default:
} }
w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Type", "text/html; charset=utf-8")
@ -605,7 +583,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
return return
} }
routes, err := s.db.GetPrefixDetailsContext(r.Context(), prefix) routes, err := s.db.GetPrefixDetails(prefix)
if err != nil { if err != nil {
if errors.Is(err, database.ErrNoRoute) { if errors.Is(err, database.ErrNoRoute) {
http.Error(w, "Prefix not found", http.StatusNotFound) http.Error(w, "Prefix not found", http.StatusNotFound)
@ -619,7 +597,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
// Group by origin AS and collect unique AS info // Group by origin AS and collect unique AS info
type ASNInfo struct { type ASNInfo struct {
ASN int Number int
Handle string Handle string
Description string Description string
PeerCount int PeerCount int
@ -628,7 +606,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
for _, route := range routes { for _, route := range routes {
if _, exists := originMap[route.OriginASN]; !exists { if _, exists := originMap[route.OriginASN]; !exists {
// Get AS info from database // Get AS info from database
asInfo, _, _ := s.db.GetASDetailsContext(r.Context(), route.OriginASN) asInfo, _, _ := s.db.GetASDetails(route.OriginASN)
handle := "" handle := ""
description := "" description := ""
if asInfo != nil { if asInfo != nil {
@ -636,7 +614,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
description = asInfo.Description description = asInfo.Description
} }
originMap[route.OriginASN] = &ASNInfo{ originMap[route.OriginASN] = &ASNInfo{
ASN: route.OriginASN, Number: route.OriginASN,
Handle: handle, Handle: handle,
Description: description, Description: description,
PeerCount: 0, PeerCount: 0,
@ -667,41 +645,12 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
origins = append(origins, origin) origins = append(origins, origin)
} }
// Create enhanced routes with AS path handles
type ASPathEntry struct {
ASN int
Handle string
}
type EnhancedRoute struct {
database.LiveRoute
ASPathWithHandle []ASPathEntry
}
enhancedRoutes := make([]EnhancedRoute, len(routes))
for i, route := range routes {
enhancedRoute := EnhancedRoute{
LiveRoute: route,
ASPathWithHandle: make([]ASPathEntry, len(route.ASPath)),
}
// Look up handle for each AS in the path
for j, asn := range route.ASPath {
handle := asinfo.GetHandle(asn)
enhancedRoute.ASPathWithHandle[j] = ASPathEntry{
ASN: asn,
Handle: handle,
}
}
enhancedRoutes[i] = enhancedRoute
}
// Prepare template data // Prepare template data
data := struct { data := struct {
Prefix string Prefix string
MaskLength int MaskLength int
IPVersion int IPVersion int
Routes []EnhancedRoute Routes []database.LiveRoute
Origins []*ASNInfo Origins []*ASNInfo
PeerCount int PeerCount int
OriginCount int OriginCount int
@ -709,20 +658,12 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
Prefix: prefix, Prefix: prefix,
MaskLength: maskLength, MaskLength: maskLength,
IPVersion: ipVersion, IPVersion: ipVersion,
Routes: enhancedRoutes, Routes: routes,
Origins: origins, Origins: origins,
PeerCount: len(routes), PeerCount: len(routes),
OriginCount: len(originMap), OriginCount: len(originMap),
} }
// Check if context is still valid before writing response
select {
case <-r.Context().Done():
// Request was cancelled, don't write response
return
default:
}
w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Type", "text/html; charset=utf-8")
tmpl := templates.PrefixDetailTemplate() tmpl := templates.PrefixDetailTemplate()
if err := tmpl.Execute(w, data); err != nil { if err := tmpl.Execute(w, data); err != nil {
@ -761,123 +702,3 @@ func (s *Server) handleIPRedirect() http.HandlerFunc {
http.Redirect(w, r, "/prefix/"+url.QueryEscape(asInfo.Prefix), http.StatusSeeOther) http.Redirect(w, r, "/prefix/"+url.QueryEscape(asInfo.Prefix), http.StatusSeeOther)
} }
} }
// handlePrefixLength shows a random sample of prefixes with the specified mask length
func (s *Server) handlePrefixLength() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
lengthStr := chi.URLParam(r, "length")
if lengthStr == "" {
http.Error(w, "Length parameter is required", http.StatusBadRequest)
return
}
maskLength, err := strconv.Atoi(lengthStr)
if err != nil {
http.Error(w, "Invalid mask length", http.StatusBadRequest)
return
}
// Determine IP version based on mask length
const (
maxIPv4MaskLength = 32
maxIPv6MaskLength = 128
)
var ipVersion int
if maskLength <= maxIPv4MaskLength {
ipVersion = 4
} else if maskLength <= maxIPv6MaskLength {
ipVersion = 6
} else {
http.Error(w, "Invalid mask length", http.StatusBadRequest)
return
}
// Get random sample of prefixes
const maxPrefixes = 500
prefixes, err := s.db.GetRandomPrefixesByLengthContext(r.Context(), maskLength, ipVersion, maxPrefixes)
if err != nil {
s.logger.Error("Failed to get prefixes by length", "error", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
// Sort prefixes for display
sort.Slice(prefixes, func(i, j int) bool {
// First compare by IP version
if prefixes[i].IPVersion != prefixes[j].IPVersion {
return prefixes[i].IPVersion < prefixes[j].IPVersion
}
// Then by prefix
return prefixes[i].Prefix < prefixes[j].Prefix
})
// Create enhanced prefixes with AS descriptions
type EnhancedPrefix struct {
database.LiveRoute
OriginASDescription string
Age string
}
enhancedPrefixes := make([]EnhancedPrefix, len(prefixes))
for i, prefix := range prefixes {
enhancedPrefixes[i] = EnhancedPrefix{
LiveRoute: prefix,
Age: formatAge(prefix.LastUpdated),
}
// Get AS description
if asInfo, ok := asinfo.Get(prefix.OriginASN); ok {
enhancedPrefixes[i].OriginASDescription = asInfo.Description
}
}
// Render template
data := map[string]interface{}{
"MaskLength": maskLength,
"IPVersion": ipVersion,
"Prefixes": enhancedPrefixes,
"Count": len(prefixes),
}
// Check if context is still valid before writing response
select {
case <-r.Context().Done():
// Request was cancelled, don't write response
return
default:
}
tmpl := templates.PrefixLengthTemplate()
if err := tmpl.Execute(w, data); err != nil {
s.logger.Error("Failed to render prefix length template", "error", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}
}
}
// formatAge returns a human-readable age string
func formatAge(timestamp time.Time) string {
age := time.Since(timestamp)
const hoursPerDay = 24
if age < time.Minute {
return "< 1m"
} else if age < time.Hour {
minutes := int(age.Minutes())
return strconv.Itoa(minutes) + "m"
} else if age < hoursPerDay*time.Hour {
hours := int(age.Hours())
return strconv.Itoa(hours) + "h"
}
days := int(age.Hours() / hoursPerDay)
return strconv.Itoa(days) + "d"
}

View File

@ -108,7 +108,6 @@ type timeoutWriter struct {
http.ResponseWriter http.ResponseWriter
mu sync.Mutex mu sync.Mutex
written bool written bool
header http.Header // cached header to prevent concurrent access
} }
func (tw *timeoutWriter) Write(b []byte) (int, error) { func (tw *timeoutWriter) Write(b []byte) (int, error) {
@ -134,18 +133,6 @@ func (tw *timeoutWriter) WriteHeader(statusCode int) {
} }
func (tw *timeoutWriter) Header() http.Header { func (tw *timeoutWriter) Header() http.Header {
tw.mu.Lock()
defer tw.mu.Unlock()
if tw.written {
// Return a copy to prevent modifications after timeout
if tw.header == nil {
tw.header = make(http.Header)
}
return tw.header
}
return tw.ResponseWriter.Header() return tw.ResponseWriter.Header()
} }
@ -166,7 +153,6 @@ func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler {
tw := &timeoutWriter{ tw := &timeoutWriter{
ResponseWriter: w, ResponseWriter: w,
header: make(http.Header),
} }
done := make(chan struct{}) done := make(chan struct{})
@ -192,12 +178,8 @@ func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler {
tw.markWritten() // Prevent the handler from writing after timeout tw.markWritten() // Prevent the handler from writing after timeout
execTime := time.Since(startTime) execTime := time.Since(startTime)
// Write directly to the underlying writer since we've marked tw as written
// This is safe because markWritten() prevents the handler from writing
tw.mu.Lock()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusRequestTimeout) w.WriteHeader(http.StatusRequestTimeout)
tw.mu.Unlock()
response := map[string]interface{}{ response := map[string]interface{}{
"status": "error", "status": "error",
@ -217,68 +199,3 @@ func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler {
}) })
} }
} }
// JSONValidationMiddleware ensures all JSON API responses are valid JSON
func JSONValidationMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Create a custom response writer to capture the response
rw := &responseWriter{
ResponseWriter: w,
body: &bytes.Buffer{},
statusCode: http.StatusOK,
}
// Serve the request
next.ServeHTTP(rw, r)
// Check if it's meant to be a JSON response
contentType := rw.Header().Get("Content-Type")
isJSON := contentType == "application/json" || contentType == ""
// If it's not JSON or has content, pass through
if !isJSON && rw.body.Len() > 0 {
w.WriteHeader(rw.statusCode)
_, _ = w.Write(rw.body.Bytes())
return
}
// For JSON responses, validate the JSON
if rw.body.Len() > 0 {
var testParse interface{}
if err := json.Unmarshal(rw.body.Bytes(), &testParse); err == nil {
// Valid JSON, write it out
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(rw.statusCode)
_, _ = w.Write(rw.body.Bytes())
return
}
}
// If we get here, either there's no body or invalid JSON
// Write a proper error response
w.Header().Set("Content-Type", "application/json")
// Determine appropriate status code
statusCode := rw.statusCode
if statusCode == http.StatusOK {
statusCode = http.StatusInternalServerError
}
w.WriteHeader(statusCode)
errorMsg := "Internal server error"
if statusCode == http.StatusRequestTimeout {
errorMsg = "Request timeout"
}
response := map[string]interface{}{
"status": "error",
"error": map[string]interface{}{
"msg": errorMsg,
"code": statusCode,
},
}
_ = json.NewEncoder(w).Encode(response)
})
}

View File

@ -16,24 +16,22 @@ func (s *Server) setupRoutes() {
r.Use(middleware.RealIP) r.Use(middleware.RealIP)
r.Use(middleware.Logger) r.Use(middleware.Logger)
r.Use(middleware.Recoverer) r.Use(middleware.Recoverer)
const requestTimeout = 8 * time.Second const requestTimeout = 2 * time.Second
r.Use(TimeoutMiddleware(requestTimeout)) r.Use(TimeoutMiddleware(requestTimeout))
r.Use(JSONResponseMiddleware) r.Use(JSONResponseMiddleware)
// Routes // Routes
r.Get("/", s.handleRoot()) r.Get("/", s.handleRoot())
r.Get("/status", s.handleStatusHTML()) r.Get("/status", s.handleStatusHTML())
r.Get("/status.json", JSONValidationMiddleware(s.handleStatusJSON()).ServeHTTP) r.Get("/status.json", s.handleStatusJSON())
// AS and prefix detail pages // AS and prefix detail pages
r.Get("/as/{asn}", s.handleASDetail()) r.Get("/as/{asn}", s.handleASDetail())
r.Get("/prefix/{prefix}", s.handlePrefixDetail()) r.Get("/prefix/{prefix}", s.handlePrefixDetail())
r.Get("/prefixlength/{length}", s.handlePrefixLength())
r.Get("/ip/{ip}", s.handleIPRedirect()) r.Get("/ip/{ip}", s.handleIPRedirect())
// API routes // API routes
r.Route("/api/v1", func(r chi.Router) { r.Route("/api/v1", func(r chi.Router) {
r.Use(JSONValidationMiddleware)
r.Get("/stats", s.handleStats()) r.Get("/stats", s.handleStats())
r.Get("/ip/{ip}", s.handleIPLookup()) r.Get("/ip/{ip}", s.handleIPLookup())
r.Get("/as/{asn}", s.handleASDetailJSON()) r.Get("/as/{asn}", s.handleASDetailJSON())

View File

@ -42,7 +42,7 @@ func (s *Server) Start() error {
port = "8080" port = "8080"
} }
const readHeaderTimeout = 40 * time.Second const readHeaderTimeout = 10 * time.Second
s.srv = &http.Server{ s.srv = &http.Server{
Addr: ":" + port, Addr: ":" + port,
Handler: s.router, Handler: s.router,

View File

@ -7,9 +7,8 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math"
"math/rand"
"net/http" "net/http"
"os"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -20,20 +19,13 @@ import (
) )
const ( const (
risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json&" + risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json"
"client=https%3A%2F%2Fgit.eeqj.de%2Fsneak%2Froutewatch"
metricsWindowSize = 60 // seconds for rolling average metricsWindowSize = 60 // seconds for rolling average
metricsUpdateRate = time.Second metricsUpdateRate = time.Second
minBackoffDelay = 5 * time.Second
maxBackoffDelay = 320 * time.Second
metricsLogInterval = 10 * time.Second metricsLogInterval = 10 * time.Second
bytesPerKB = 1024 bytesPerKB = 1024
bytesPerMB = 1024 * 1024 bytesPerMB = 1024 * 1024
maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers
// Backpressure constants
backpressureThreshold = 0.5 // Start dropping at 50% queue utilization
backpressureSlope = 2.0 // Slope for linear drop probability increase
) )
// MessageHandler is an interface for handling RIS messages // MessageHandler is an interface for handling RIS messages
@ -54,13 +46,12 @@ type RawMessageHandler func(line string)
// handlerMetrics tracks performance metrics for a handler // handlerMetrics tracks performance metrics for a handler
type handlerMetrics struct { type handlerMetrics struct {
processedCount uint64 // Total messages processed processedCount uint64 // Total messages processed
droppedCount uint64 // Total messages dropped droppedCount uint64 // Total messages dropped
totalTime time.Duration // Total processing time (for average calculation) totalTime time.Duration // Total processing time (for average calculation)
minTime time.Duration // Minimum processing time minTime time.Duration // Minimum processing time
maxTime time.Duration // Maximum processing time maxTime time.Duration // Maximum processing time
queueHighWaterMark int // Maximum queue length seen mu sync.Mutex // Protects the metrics
mu sync.Mutex // Protects the metrics
} }
// handlerInfo wraps a handler with its queue and metrics // handlerInfo wraps a handler with its queue and metrics
@ -80,8 +71,7 @@ type Streamer struct {
cancel context.CancelFunc cancel context.CancelFunc
running bool running bool
metrics *metrics.Tracker metrics *metrics.Tracker
totalDropped uint64 // Total dropped messages across all handlers totalDropped uint64 // Total dropped messages across all handlers
random *rand.Rand // Random number generator for backpressure drops
} }
// New creates a new RIS streamer // New creates a new RIS streamer
@ -93,8 +83,6 @@ func New(logger *logger.Logger, metrics *metrics.Tracker) *Streamer {
}, },
handlers: make([]*handlerInfo, 0), handlers: make([]*handlerInfo, 0),
metrics: metrics, metrics: metrics,
//nolint:gosec // Non-cryptographic randomness is fine for backpressure
random: rand.New(rand.NewSource(time.Now().UnixNano())),
} }
} }
@ -107,9 +95,6 @@ func (s *Streamer) RegisterHandler(handler MessageHandler) {
info := &handlerInfo{ info := &handlerInfo{
handler: handler, handler: handler,
queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()), queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()),
metrics: handlerMetrics{
minTime: time.Duration(math.MaxInt64), // Initialize to max so first value sets the floor
},
} }
s.handlers = append(s.handlers, info) s.handlers = append(s.handlers, info)
@ -146,7 +131,9 @@ func (s *Streamer) Start() error {
} }
go func() { go func() {
s.streamWithReconnect(ctx) if err := s.stream(ctx); err != nil {
s.logger.Error("Streaming error", "error", err)
}
s.mu.Lock() s.mu.Lock()
s.running = false s.running = false
s.mu.Unlock() s.mu.Unlock()
@ -183,7 +170,7 @@ func (s *Streamer) runHandlerWorker(info *handlerInfo) {
info.metrics.totalTime += elapsed info.metrics.totalTime += elapsed
// Update min time // Update min time
if elapsed < info.metrics.minTime { if info.metrics.minTime == 0 || elapsed < info.metrics.minTime {
info.metrics.minTime = elapsed info.metrics.minTime = elapsed
} }
@ -215,15 +202,14 @@ func (s *Streamer) GetMetricsTracker() *metrics.Tracker {
// HandlerStats represents metrics for a single handler // HandlerStats represents metrics for a single handler
type HandlerStats struct { type HandlerStats struct {
Name string Name string
QueueLength int QueueLength int
QueueCapacity int QueueCapacity int
QueueHighWaterMark int ProcessedCount uint64
ProcessedCount uint64 DroppedCount uint64
DroppedCount uint64 AvgProcessTime time.Duration
AvgProcessTime time.Duration MinProcessTime time.Duration
MinProcessTime time.Duration MaxProcessTime time.Duration
MaxProcessTime time.Duration
} }
// GetHandlerStats returns current handler statistics // GetHandlerStats returns current handler statistics
@ -237,14 +223,13 @@ func (s *Streamer) GetHandlerStats() []HandlerStats {
info.metrics.mu.Lock() info.metrics.mu.Lock()
hs := HandlerStats{ hs := HandlerStats{
Name: fmt.Sprintf("%T", info.handler), Name: fmt.Sprintf("%T", info.handler),
QueueLength: len(info.queue), QueueLength: len(info.queue),
QueueCapacity: cap(info.queue), QueueCapacity: cap(info.queue),
QueueHighWaterMark: info.metrics.queueHighWaterMark, ProcessedCount: info.metrics.processedCount,
ProcessedCount: info.metrics.processedCount, DroppedCount: info.metrics.droppedCount,
DroppedCount: info.metrics.droppedCount, MinProcessTime: info.metrics.minTime,
MinProcessTime: info.metrics.minTime, MaxProcessTime: info.metrics.maxTime,
MaxProcessTime: info.metrics.maxTime,
} }
// Calculate average time // Calculate average time
@ -335,72 +320,6 @@ func (s *Streamer) updateMetrics(messageBytes int) {
s.metrics.RecordMessage(int64(messageBytes)) s.metrics.RecordMessage(int64(messageBytes))
} }
// streamWithReconnect handles streaming with automatic reconnection and exponential backoff
func (s *Streamer) streamWithReconnect(ctx context.Context) {
backoffDelay := minBackoffDelay
consecutiveFailures := 0
for {
select {
case <-ctx.Done():
s.logger.Info("Stream context cancelled, stopping reconnection attempts")
return
default:
}
// Attempt to stream
startTime := time.Now()
err := s.stream(ctx)
streamDuration := time.Since(startTime)
if err == nil {
// Clean exit (context cancelled)
return
}
// Log the error
s.logger.Error("Stream disconnected",
"error", err,
"consecutive_failures", consecutiveFailures+1,
"stream_duration", streamDuration)
s.metrics.SetConnected(false)
// Check if context is cancelled
if ctx.Err() != nil {
return
}
// If we streamed for more than 30 seconds, reset the backoff
// This indicates we had a successful connection that received data
if streamDuration > 30*time.Second {
s.logger.Info("Resetting backoff delay due to successful connection",
"stream_duration", streamDuration)
backoffDelay = minBackoffDelay
consecutiveFailures = 0
} else {
// Increment consecutive failures
consecutiveFailures++
}
// Wait with exponential backoff
s.logger.Info("Waiting before reconnection attempt",
"delay_seconds", backoffDelay.Seconds(),
"consecutive_failures", consecutiveFailures)
select {
case <-ctx.Done():
return
case <-time.After(backoffDelay):
// Double the backoff delay for next time, up to max
backoffDelay *= 2
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
}
}
}
}
func (s *Streamer) stream(ctx context.Context) error { func (s *Streamer) stream(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, "GET", risLiveURL, nil) req, err := http.NewRequestWithContext(ctx, "GET", risLiveURL, nil)
if err != nil { if err != nil {
@ -471,13 +390,10 @@ func (s *Streamer) stream(ctx context.Context) error {
// Parse the message first // Parse the message first
var wrapper ristypes.RISLiveMessage var wrapper ristypes.RISLiveMessage
if err := json.Unmarshal(line, &wrapper); err != nil { if err := json.Unmarshal(line, &wrapper); err != nil {
// Log the error and return to trigger reconnection // Output the raw line and panic on parse failure
s.logger.Error("Failed to parse JSON", fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err)
"error", err, fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(line))
"line", string(line), panic(fmt.Sprintf("JSON parse error: %v", err))
"line_length", len(line))
return fmt.Errorf("JSON parse error: %w", err)
} }
// Check if it's a ris_message wrapper // Check if it's a ris_message wrapper
@ -527,43 +443,32 @@ func (s *Streamer) stream(ctx context.Context) error {
// Peer state changes - silently ignore // Peer state changes - silently ignore
continue continue
default: default:
s.logger.Error("Unknown message type", fmt.Fprintf(
"type", msg.Type, os.Stderr,
"line", string(line), "UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n",
msg.Type,
string(line),
)
panic(
fmt.Sprintf(
"Unknown RIS message type: %s",
msg.Type,
),
) )
panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type))
} }
// Dispatch to interested handlers // Dispatch to interested handlers
s.mu.RLock() s.mu.RLock()
for _, info := range s.handlers { for _, info := range s.handlers {
if !info.handler.WantsMessage(msg.Type) { if info.handler.WantsMessage(msg.Type) {
continue select {
} case info.queue <- &msg:
// Message queued successfully
// Check if we should drop due to backpressure default:
if s.shouldDropForBackpressure(info) { // Queue is full, drop the message
atomic.AddUint64(&info.metrics.droppedCount, 1) atomic.AddUint64(&info.metrics.droppedCount, 1)
atomic.AddUint64(&s.totalDropped, 1) atomic.AddUint64(&s.totalDropped, 1)
continue
}
// Try to queue the message
select {
case info.queue <- &msg:
// Message queued successfully
// Update high water mark if needed
queueLen := len(info.queue)
info.metrics.mu.Lock()
if queueLen > info.metrics.queueHighWaterMark {
info.metrics.queueHighWaterMark = queueLen
} }
info.metrics.mu.Unlock()
default:
// Queue is full, drop the message
atomic.AddUint64(&info.metrics.droppedCount, 1)
atomic.AddUint64(&s.totalDropped, 1)
} }
} }
s.mu.RUnlock() s.mu.RUnlock()
@ -575,25 +480,3 @@ func (s *Streamer) stream(ctx context.Context) error {
return nil return nil
} }
// shouldDropForBackpressure determines if a message should be dropped based on queue utilization
func (s *Streamer) shouldDropForBackpressure(info *handlerInfo) bool {
// Calculate queue utilization
queueLen := len(info.queue)
queueCap := cap(info.queue)
utilization := float64(queueLen) / float64(queueCap)
// No drops below threshold
if utilization < backpressureThreshold {
return false
}
// Calculate drop probability (0.0 at threshold, 1.0 at 100% full)
dropProbability := (utilization - backpressureThreshold) * backpressureSlope
if dropProbability > 1.0 {
dropProbability = 1.0
}
// Random drop based on probability
return s.random.Float64() < dropProbability
}

View File

@ -3,7 +3,7 @@
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AS{{.ASN.ASN}} - {{.ASN.Handle}} - RouteWatch</title> <title>AS{{.ASN.Number}} - {{.ASN.Handle}} - RouteWatch</title>
<style> <style>
body { body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
@ -136,7 +136,7 @@
<div class="container"> <div class="container">
<a href="/status" class="nav-link">← Back to Status</a> <a href="/status" class="nav-link">← Back to Status</a>
<h1>AS{{.ASN.ASN}}{{if .ASN.Handle}} - {{.ASN.Handle}}{{end}}</h1> <h1>AS{{.ASN.Number}}{{if .ASN.Handle}} - {{.ASN.Handle}}{{end}}</h1>
{{if .ASN.Description}} {{if .ASN.Description}}
<p class="subtitle">{{.ASN.Description}}</p> <p class="subtitle">{{.ASN.Description}}</p>
{{end}} {{end}}
@ -154,10 +154,6 @@
<div class="info-label">IPv6 Prefixes</div> <div class="info-label">IPv6 Prefixes</div>
<div class="info-value">{{.IPv6Count}}</div> <div class="info-value">{{.IPv6Count}}</div>
</div> </div>
<div class="info-card">
<div class="info-label">Peer ASNs</div>
<div class="info-value">{{.PeerCount}}</div>
</div>
<div class="info-card"> <div class="info-card">
<div class="info-label">First Seen</div> <div class="info-label">First Seen</div>
<div class="info-value">{{.ASN.FirstSeen.Format "2006-01-02"}}</div> <div class="info-value">{{.ASN.FirstSeen.Format "2006-01-02"}}</div>
@ -227,44 +223,6 @@
<p>No prefixes announced by this AS</p> <p>No prefixes announced by this AS</p>
</div> </div>
{{end}} {{end}}
{{if .Peers}}
<div class="prefix-section">
<div class="prefix-header">
<h2>Peer ASNs</h2>
<span class="prefix-count">{{.PeerCount}}</span>
</div>
<table class="prefix-table">
<thead>
<tr>
<th>ASN</th>
<th>Handle</th>
<th>Description</th>
<th>First Seen</th>
<th>Last Seen</th>
</tr>
</thead>
<tbody>
{{range .Peers}}
<tr>
<td><a href="/as/{{.ASN}}" class="prefix-link">AS{{.ASN}}</a></td>
<td>{{if .Handle}}{{.Handle}}{{else}}-{{end}}</td>
<td>{{if .Description}}{{.Description}}{{else}}-{{end}}</td>
<td>{{.FirstSeen.Format "2006-01-02"}}</td>
<td>{{.LastSeen.Format "2006-01-02"}}</td>
</tr>
{{end}}
</tbody>
</table>
</div>
{{else}}
<div class="prefix-section">
<h2>Peer ASNs</h2>
<div class="empty-state">
<p>No peering relationships found for this AS</p>
</div>
</div>
{{end}}
</div> </div>
</body> </body>
</html> </html>

View File

@ -13,8 +13,7 @@
color: #333; color: #333;
} }
.container { .container {
width: 90%; max-width: 1200px;
max-width: 1600px;
margin: 0 auto; margin: 0 auto;
background: white; background: white;
padding: 30px; padding: 30px;
@ -92,7 +91,6 @@
.route-table td { .route-table td {
padding: 12px; padding: 12px;
border-bottom: 1px solid #e0e0e0; border-bottom: 1px solid #e0e0e0;
white-space: nowrap;
} }
.route-table tr:hover { .route-table tr:hover {
background: #f8f9fa; background: #f8f9fa;
@ -116,13 +114,9 @@
font-family: monospace; font-family: monospace;
font-size: 13px; font-size: 13px;
color: #666; color: #666;
max-width: 600px; max-width: 300px;
word-wrap: break-word; overflow-x: auto;
white-space: normal !important; white-space: nowrap;
line-height: 1.5;
}
.as-path .as-link {
font-weight: 600;
} }
.age { .age {
color: #7f8c8d; color: #7f8c8d;
@ -174,7 +168,7 @@
font-size: 14px; font-size: 14px;
} }
.as-path { .as-path {
max-width: 100%; max-width: 150px;
} }
} }
</style> </style>
@ -207,7 +201,7 @@
<div class="origin-list"> <div class="origin-list">
{{range .Origins}} {{range .Origins}}
<div class="origin-item"> <div class="origin-item">
<a href="/as/{{.ASN}}" class="as-link">AS{{.ASN}}</a> <a href="/as/{{.Number}}" class="as-link">AS{{.Number}}</a>
{{if .Handle}} ({{.Handle}}){{end}} {{if .Handle}} ({{.Handle}}){{end}}
<span style="color: #7f8c8d; margin-left: 10px;">{{.PeerCount}} peer{{if ne .PeerCount 1}}s{{end}}</span> <span style="color: #7f8c8d; margin-left: 10px;">{{.PeerCount}} peer{{if ne .PeerCount 1}}s{{end}}</span>
</div> </div>
@ -240,7 +234,7 @@
<a href="/as/{{.OriginASN}}" class="as-link">AS{{.OriginASN}}</a> <a href="/as/{{.OriginASN}}" class="as-link">AS{{.OriginASN}}</a>
</td> </td>
<td class="peer-ip">{{.PeerIP}}</td> <td class="peer-ip">{{.PeerIP}}</td>
<td class="as-path">{{range $i, $as := .ASPathWithHandle}}{{if $i}} → {{end}}<a href="/as/{{$as.ASN}}" class="as-link">{{if $as.Handle}}{{$as.Handle}}{{else}}AS{{$as.ASN}}{{end}}</a>{{end}}</td> <td class="as-path">{{range $i, $as := .ASPath}}{{if $i}} → {{end}}{{$as}}{{end}}</td>
<td class="peer-ip">{{.NextHop}}</td> <td class="peer-ip">{{.NextHop}}</td>
<td>{{.LastUpdated.Format "2006-01-02 15:04:05"}}</td> <td>{{.LastUpdated.Format "2006-01-02 15:04:05"}}</td>
<td class="age">{{.LastUpdated | timeSince}}</td> <td class="age">{{.LastUpdated | timeSince}}</td>

View File

@ -1,108 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Prefixes with /{{ .MaskLength }} - RouteWatch</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
background: #f5f5f5;
}
h1 {
color: #333;
margin-bottom: 10px;
}
.subtitle {
color: #666;
margin-bottom: 30px;
}
.info-card {
background: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
margin-bottom: 20px;
}
table {
width: 100%;
border-collapse: collapse;
background: white;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
border-radius: 8px;
overflow: hidden;
}
th {
background: #f8f9fa;
padding: 12px;
text-align: left;
font-weight: 600;
color: #333;
border-bottom: 2px solid #dee2e6;
}
td {
padding: 12px;
border-bottom: 1px solid #eee;
}
tr:last-child td {
border-bottom: none;
}
tr:hover {
background: #f8f9fa;
}
a {
color: #0066cc;
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
.prefix-link {
font-family: 'SF Mono', Monaco, 'Cascadia Mono', 'Roboto Mono', Consolas, 'Courier New', monospace;
}
.as-link {
white-space: nowrap;
}
.age {
color: #666;
font-size: 0.9em;
}
.back-link {
display: inline-block;
margin-bottom: 20px;
color: #0066cc;
}
</style>
</head>
<body>
<a href="/status" class="back-link">← Back to Status</a>
<h1>IPv{{ .IPVersion }} Prefixes with /{{ .MaskLength }}</h1>
<p class="subtitle">Showing {{ .Count }} randomly selected prefixes</p>
<table>
<thead>
<tr>
<th>Prefix</th>
<th>Age</th>
<th>Origin AS</th>
</tr>
</thead>
<tbody>
{{ range .Prefixes }}
<tr>
<td><a href="/prefix/{{ .Prefix | urlEncode }}" class="prefix-link">{{ .Prefix }}</a></td>
<td class="age">{{ .Age }}</td>
<td>
<a href="/as/{{ .OriginASN }}" class="as-link">
AS{{ .OriginASN }}{{ if .OriginASDescription }} ({{ .OriginASDescription }}){{ end }}
</a>
</td>
</tr>
{{ end }}
</tbody>
</table>
</body>
</html>

View File

@ -49,16 +49,6 @@
font-family: 'SF Mono', Monaco, 'Cascadia Mono', 'Roboto Mono', Consolas, 'Courier New', monospace; font-family: 'SF Mono', Monaco, 'Cascadia Mono', 'Roboto Mono', Consolas, 'Courier New', monospace;
color: #333; color: #333;
} }
.metric-value.metric-link {
text-decoration: underline;
text-decoration-style: dashed;
text-underline-offset: 2px;
cursor: pointer;
}
.metric-value.metric-link:hover {
color: #0066cc;
text-decoration-style: solid;
}
.connected { .connected {
color: #22c55e; color: #22c55e;
} }
@ -241,7 +231,7 @@
metric.className = 'metric'; metric.className = 'metric';
metric.innerHTML = ` metric.innerHTML = `
<span class="metric-label">/${item.mask_length}</span> <span class="metric-label">/${item.mask_length}</span>
<a href="/prefixlength/${item.mask_length}" class="metric-value metric-link">${formatNumber(item.count)}</a> <span class="metric-value">${formatNumber(item.count)}</span>
`; `;
container.appendChild(metric); container.appendChild(metric);
}); });
@ -264,10 +254,6 @@
<span class="metric-label">Queue</span> <span class="metric-label">Queue</span>
<span class="metric-value">${handler.queue_length}/${handler.queue_capacity}</span> <span class="metric-value">${handler.queue_length}/${handler.queue_capacity}</span>
</div> </div>
<div class="metric">
<span class="metric-label">High Water Mark</span>
<span class="metric-value">${handler.queue_high_water_mark}/${handler.queue_capacity} (${Math.round(handler.queue_high_water_mark * 100 / handler.queue_capacity)}%)</span>
</div>
<div class="metric"> <div class="metric">
<span class="metric-label">Processed</span> <span class="metric-label">Processed</span>
<span class="metric-value">${formatNumber(handler.processed_count)}</span> <span class="metric-value">${formatNumber(handler.processed_count)}</span>
@ -290,39 +276,6 @@
}); });
} }
function resetAllFields() {
// Reset all metric fields to '-'
document.getElementById('connected').textContent = '-';
document.getElementById('connected').className = 'metric-value';
document.getElementById('uptime').textContent = '-';
document.getElementById('go_version').textContent = '-';
document.getElementById('goroutines').textContent = '-';
document.getElementById('memory_usage').textContent = '-';
document.getElementById('total_messages').textContent = '-';
document.getElementById('messages_per_sec').textContent = '-';
document.getElementById('total_bytes').textContent = '-';
document.getElementById('mbits_per_sec').textContent = '-';
document.getElementById('asns').textContent = '-';
document.getElementById('prefixes').textContent = '-';
document.getElementById('ipv4_prefixes').textContent = '-';
document.getElementById('ipv6_prefixes').textContent = '-';
document.getElementById('peerings').textContent = '-';
document.getElementById('peers').textContent = '-';
document.getElementById('database_size').textContent = '-';
document.getElementById('live_routes').textContent = '-';
document.getElementById('ipv4_routes').textContent = '-';
document.getElementById('ipv6_routes').textContent = '-';
document.getElementById('ipv4_updates_per_sec').textContent = '-';
document.getElementById('ipv6_updates_per_sec').textContent = '-';
// Clear handler stats
document.getElementById('handler-stats-container').innerHTML = '';
// Clear prefix distributions
document.getElementById('ipv4-prefix-distribution').innerHTML = '<div class="metric"><span class="metric-label">No data</span></div>';
document.getElementById('ipv6-prefix-distribution').innerHTML = '<div class="metric"><span class="metric-label">No data</span></div>';
}
function updateStatus() { function updateStatus() {
fetch('/api/v1/stats') fetch('/api/v1/stats')
.then(response => response.json()) .then(response => response.json())
@ -331,7 +284,6 @@
if (response.status === 'error') { if (response.status === 'error') {
document.getElementById('error').textContent = 'Error: ' + response.error.msg; document.getElementById('error').textContent = 'Error: ' + response.error.msg;
document.getElementById('error').style.display = 'block'; document.getElementById('error').style.display = 'block';
resetAllFields();
return; return;
} }
@ -378,13 +330,12 @@
.catch(error => { .catch(error => {
document.getElementById('error').textContent = 'Error fetching status: ' + error; document.getElementById('error').textContent = 'Error fetching status: ' + error;
document.getElementById('error').style.display = 'block'; document.getElementById('error').style.display = 'block';
resetAllFields();
}); });
} }
// Update immediately and then every 2 seconds // Update immediately and then every 500ms
updateStatus(); updateStatus();
setInterval(updateStatus, 2000); setInterval(updateStatus, 500);
</script> </script>
</body> </body>
</html> </html>

View File

@ -18,15 +18,11 @@ var asDetailHTML string
//go:embed prefix_detail.html //go:embed prefix_detail.html
var prefixDetailHTML string var prefixDetailHTML string
//go:embed prefix_length.html
var prefixLengthHTML string
// Templates contains all parsed templates // Templates contains all parsed templates
type Templates struct { type Templates struct {
Status *template.Template Status *template.Template
ASDetail *template.Template ASDetail *template.Template
PrefixDetail *template.Template PrefixDetail *template.Template
PrefixLength *template.Template
} }
var ( var (
@ -103,12 +99,6 @@ func initTemplates() {
if err != nil { if err != nil {
panic("failed to parse prefix detail template: " + err.Error()) panic("failed to parse prefix detail template: " + err.Error())
} }
// Parse prefix length template
defaultTemplates.PrefixLength, err = template.New("prefixLength").Funcs(funcs).Parse(prefixLengthHTML)
if err != nil {
panic("failed to parse prefix length template: " + err.Error())
}
} }
// Get returns the singleton Templates instance // Get returns the singleton Templates instance
@ -132,8 +122,3 @@ func ASDetailTemplate() *template.Template {
func PrefixDetailTemplate() *template.Template { func PrefixDetailTemplate() *template.Template {
return Get().PrefixDetail return Get().PrefixDetail
} }
// PrefixLengthTemplate returns the parsed prefix length template
func PrefixLengthTemplate() *template.Template {
return Get().PrefixLength
}

477614
log.txt

File diff suppressed because it is too large Load Diff