Compare commits

..

No commits in common. "main" and "optimize-sqlite-settings" have entirely different histories.

21 changed files with 271289 additions and 208194 deletions

View File

@ -21,7 +21,7 @@ clean:
rm -rf bin/
run: build
DEBUG=routewatch ./bin/routewatch 2>&1 | tee log.txt
./bin/routewatch
asupdate:
@echo "Updating AS info data..."

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,6 @@
package database
import (
"context"
"time"
)
@ -27,7 +26,6 @@ type Store interface {
// Prefix operations
GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefix, error)
UpdatePrefixesBatch(prefixes map[string]time.Time) error
// Announcement operations
RecordAnnouncement(announcement *Announcement) error
@ -37,7 +35,6 @@ type Store interface {
// Statistics
GetStats() (Stats, error)
GetStatsContext(ctx context.Context) (Stats, error)
// Peer operations
UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error
@ -49,23 +46,14 @@ type Store interface {
DeleteLiveRoute(prefix string, originASN int, peerIP string) error
DeleteLiveRouteBatch(deletions []LiveRouteDeletion) error
GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error)
GetPrefixDistributionContext(ctx context.Context) (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error)
GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error)
GetLiveRouteCountsContext(ctx context.Context) (ipv4Count, ipv6Count int, err error)
// IP lookup operations
GetASInfoForIP(ip string) (*ASInfo, error)
GetASInfoForIPContext(ctx context.Context, ip string) (*ASInfo, error)
// AS and prefix detail operations
GetASDetails(asn int) (*ASN, []LiveRoute, error)
GetASDetailsContext(ctx context.Context, asn int) (*ASN, []LiveRoute, error)
GetASPeers(asn int) ([]ASPeer, error)
GetASPeersContext(ctx context.Context, asn int) ([]ASPeer, error)
GetPrefixDetails(prefix string) ([]LiveRoute, error)
GetPrefixDetailsContext(ctx context.Context, prefix string) ([]LiveRoute, error)
GetRandomPrefixesByLength(maskLength, ipVersion, limit int) ([]LiveRoute, error)
GetRandomPrefixesByLengthContext(ctx context.Context, maskLength, ipVersion, limit int) ([]LiveRoute, error)
// Lifecycle
Close() error

View File

@ -8,7 +8,8 @@ import (
// ASN represents an Autonomous System Number
type ASN struct {
ASN int `json:"asn"`
ID uuid.UUID `json:"id"`
Number int `json:"number"`
Handle string `json:"handle"`
Description string `json:"description"`
FirstSeen time.Time `json:"first_seen"`
@ -28,8 +29,8 @@ type Prefix struct {
type Announcement struct {
ID uuid.UUID `json:"id"`
PrefixID uuid.UUID `json:"prefix_id"`
PeerASN int `json:"peer_asn"`
OriginASN int `json:"origin_asn"`
ASNID uuid.UUID `json:"asn_id"`
OriginASNID uuid.UUID `json:"origin_asn_id"`
Path string `json:"path"` // JSON-encoded AS path
NextHop string `json:"next_hop"`
Timestamp time.Time `json:"timestamp"`
@ -39,8 +40,8 @@ type Announcement struct {
// ASNPeering represents a peering relationship between two ASNs
type ASNPeering struct {
ID uuid.UUID `json:"id"`
ASA int `json:"as_a"`
ASB int `json:"as_b"`
FromASNID uuid.UUID `json:"from_asn_id"`
ToASNID uuid.UUID `json:"to_asn_id"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
}
@ -82,7 +83,6 @@ type LiveRouteDeletion struct {
Prefix string
OriginASN int
PeerIP string
IPVersion int
}
// PeerUpdate represents parameters for updating a peer

View File

@ -1,27 +1,16 @@
-- IMPORTANT: This is the ONLY place where schema changes should be made.
-- We do NOT support migrations. All schema changes MUST be in this file.
-- DO NOT make schema changes anywhere else in the codebase.
CREATE TABLE IF NOT EXISTS asns (
asn INTEGER PRIMARY KEY,
id TEXT PRIMARY KEY,
number INTEGER UNIQUE NOT NULL,
handle TEXT,
description TEXT,
first_seen DATETIME NOT NULL,
last_seen DATETIME NOT NULL
);
-- IPv4 prefixes table
CREATE TABLE IF NOT EXISTS prefixes_v4 (
id TEXT PRIMARY KEY,
prefix TEXT UNIQUE NOT NULL,
first_seen DATETIME NOT NULL,
last_seen DATETIME NOT NULL
);
-- IPv6 prefixes table
CREATE TABLE IF NOT EXISTS prefixes_v6 (
CREATE TABLE IF NOT EXISTS prefixes (
id TEXT PRIMARY KEY,
prefix TEXT UNIQUE NOT NULL,
ip_version INTEGER NOT NULL, -- 4 for IPv4, 6 for IPv6
first_seen DATETIME NOT NULL,
last_seen DATETIME NOT NULL
);
@ -29,14 +18,15 @@ CREATE TABLE IF NOT EXISTS prefixes_v6 (
CREATE TABLE IF NOT EXISTS announcements (
id TEXT PRIMARY KEY,
prefix_id TEXT NOT NULL,
peer_asn INTEGER NOT NULL,
origin_asn INTEGER NOT NULL,
asn_id TEXT NOT NULL,
origin_asn_id TEXT NOT NULL,
path TEXT NOT NULL,
next_hop TEXT,
timestamp DATETIME NOT NULL,
is_withdrawal BOOLEAN NOT NULL DEFAULT 0,
FOREIGN KEY (peer_asn) REFERENCES asns(asn),
FOREIGN KEY (origin_asn) REFERENCES asns(asn)
FOREIGN KEY (prefix_id) REFERENCES prefixes(id),
FOREIGN KEY (asn_id) REFERENCES asns(id),
FOREIGN KEY (origin_asn_id) REFERENCES asns(id)
);
CREATE TABLE IF NOT EXISTS peerings (
@ -58,71 +48,47 @@ CREATE TABLE IF NOT EXISTS bgp_peers (
last_message_type TEXT
);
-- Indexes for prefixes_v4 table
CREATE INDEX IF NOT EXISTS idx_prefixes_v4_prefix ON prefixes_v4(prefix);
-- Indexes for prefixes_v6 table
CREATE INDEX IF NOT EXISTS idx_prefixes_v6_prefix ON prefixes_v6(prefix);
CREATE INDEX IF NOT EXISTS idx_prefixes_ip_version ON prefixes(ip_version);
CREATE INDEX IF NOT EXISTS idx_prefixes_version_prefix ON prefixes(ip_version, prefix);
CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp);
CREATE INDEX IF NOT EXISTS idx_announcements_prefix_id ON announcements(prefix_id);
CREATE INDEX IF NOT EXISTS idx_announcements_peer_asn ON announcements(peer_asn);
CREATE INDEX IF NOT EXISTS idx_announcements_origin_asn ON announcements(origin_asn);
CREATE INDEX IF NOT EXISTS idx_announcements_asn_id ON announcements(asn_id);
CREATE INDEX IF NOT EXISTS idx_peerings_as_a ON peerings(as_a);
CREATE INDEX IF NOT EXISTS idx_peerings_as_b ON peerings(as_b);
CREATE INDEX IF NOT EXISTS idx_peerings_lookup ON peerings(as_a, as_b);
-- Additional indexes for prefixes table
CREATE INDEX IF NOT EXISTS idx_prefixes_prefix ON prefixes(prefix);
-- Indexes for asns table
CREATE INDEX IF NOT EXISTS idx_asns_asn ON asns(asn);
CREATE INDEX IF NOT EXISTS idx_asns_number ON asns(number);
-- Indexes for bgp_peers table
CREATE INDEX IF NOT EXISTS idx_bgp_peers_asn ON bgp_peers(peer_asn);
CREATE INDEX IF NOT EXISTS idx_bgp_peers_last_seen ON bgp_peers(last_seen);
CREATE INDEX IF NOT EXISTS idx_bgp_peers_ip ON bgp_peers(peer_ip);
-- IPv4 routing table maintained by PrefixHandler
CREATE TABLE IF NOT EXISTS live_routes_v4 (
-- Live routing table maintained by PrefixHandler
CREATE TABLE IF NOT EXISTS live_routes (
id TEXT PRIMARY KEY,
prefix TEXT NOT NULL,
mask_length INTEGER NOT NULL, -- CIDR mask length (0-32)
mask_length INTEGER NOT NULL, -- CIDR mask length (0-32 for IPv4, 0-128 for IPv6)
ip_version INTEGER NOT NULL, -- 4 or 6
origin_asn INTEGER NOT NULL,
peer_ip TEXT NOT NULL,
as_path TEXT NOT NULL, -- JSON array
next_hop TEXT NOT NULL,
last_updated DATETIME NOT NULL,
-- IPv4 range columns for fast lookups
ip_start INTEGER NOT NULL, -- Start of IPv4 range as 32-bit unsigned int
ip_end INTEGER NOT NULL, -- End of IPv4 range as 32-bit unsigned int
-- IPv4 range columns for fast lookups (NULL for IPv6)
v4_ip_start INTEGER, -- Start of IPv4 range as 32-bit unsigned int
v4_ip_end INTEGER, -- End of IPv4 range as 32-bit unsigned int
UNIQUE(prefix, origin_asn, peer_ip)
);
-- IPv6 routing table maintained by PrefixHandler
CREATE TABLE IF NOT EXISTS live_routes_v6 (
id TEXT PRIMARY KEY,
prefix TEXT NOT NULL,
mask_length INTEGER NOT NULL, -- CIDR mask length (0-128)
origin_asn INTEGER NOT NULL,
peer_ip TEXT NOT NULL,
as_path TEXT NOT NULL, -- JSON array
next_hop TEXT NOT NULL,
last_updated DATETIME NOT NULL,
-- Note: IPv6 doesn't use integer range columns
UNIQUE(prefix, origin_asn, peer_ip)
);
-- Indexes for live_routes_v4 table
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_prefix ON live_routes_v4(prefix);
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_mask_length ON live_routes_v4(mask_length);
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_origin_asn ON live_routes_v4(origin_asn);
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_last_updated ON live_routes_v4(last_updated);
-- Indexes for live_routes table
CREATE INDEX IF NOT EXISTS idx_live_routes_prefix ON live_routes(prefix);
CREATE INDEX IF NOT EXISTS idx_live_routes_mask_length ON live_routes(mask_length);
CREATE INDEX IF NOT EXISTS idx_live_routes_ip_version_mask ON live_routes(ip_version, mask_length);
CREATE INDEX IF NOT EXISTS idx_live_routes_last_updated ON live_routes(last_updated);
-- Indexes for IPv4 range queries
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_ip_range ON live_routes_v4(ip_start, ip_end);
-- Index to optimize prefix distribution queries
CREATE INDEX IF NOT EXISTS idx_live_routes_v4_mask_prefix ON live_routes_v4(mask_length, prefix);
-- Indexes for live_routes_v6 table
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_prefix ON live_routes_v6(prefix);
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_mask_length ON live_routes_v6(mask_length);
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_origin_asn ON live_routes_v6(origin_asn);
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_last_updated ON live_routes_v6(last_updated);
-- Index to optimize prefix distribution queries
CREATE INDEX IF NOT EXISTS idx_live_routes_v6_mask_prefix ON live_routes_v6(mask_length, prefix);
CREATE INDEX IF NOT EXISTS idx_live_routes_ipv4_range ON live_routes(v4_ip_start, v4_ip_end) WHERE ip_version = 4;

View File

@ -8,7 +8,7 @@ import (
"git.eeqj.de/sneak/routewatch/internal/logger"
)
const slowQueryThreshold = 25 * time.Millisecond
const slowQueryThreshold = 50 * time.Millisecond
// logSlowQuery logs queries that take longer than slowQueryThreshold
func logSlowQuery(logger *logger.Logger, query string, start time.Time) {
@ -19,7 +19,6 @@ func logSlowQuery(logger *logger.Logger, query string, start time.Time) {
}
// queryRow wraps QueryRow with slow query logging
// nolint:unused // kept for consistency with other query wrappers
func (d *Database) queryRow(query string, args ...interface{}) *sql.Row {
start := time.Now()
defer logSlowQuery(d.logger, query, start)

View File

@ -61,7 +61,8 @@ func (m *mockStore) GetOrCreateASN(number int, timestamp time.Time) (*database.A
}
asn := &database.ASN{
ASN: number,
ID: uuid.New(),
Number: number,
FirstSeen: timestamp,
LastSeen: timestamp,
}
@ -71,37 +72,6 @@ func (m *mockStore) GetOrCreateASN(number int, timestamp time.Time) (*database.A
return asn, nil
}
// UpdatePrefixesBatch mock implementation
func (m *mockStore) UpdatePrefixesBatch(prefixes map[string]time.Time) error {
m.mu.Lock()
defer m.mu.Unlock()
for prefix, timestamp := range prefixes {
if p, exists := m.Prefixes[prefix]; exists {
p.LastSeen = timestamp
} else {
const (
ipVersionV4 = 4
ipVersionV6 = 6
)
ipVersion := ipVersionV4
if strings.Contains(prefix, ":") {
ipVersion = ipVersionV6
}
m.Prefixes[prefix] = &database.Prefix{
ID: uuid.New(),
Prefix: prefix,
IPVersion: ipVersion,
FirstSeen: timestamp,
LastSeen: timestamp,
}
}
}
return nil
}
// GetOrCreatePrefix mock implementation
func (m *mockStore) GetOrCreatePrefix(prefix string, timestamp time.Time) (*database.Prefix, error) {
m.mu.Lock()
@ -193,11 +163,6 @@ func (m *mockStore) GetStats() (database.Stats, error) {
}, nil
}
// GetStatsContext returns statistics about the mock store with context support
func (m *mockStore) GetStatsContext(ctx context.Context) (database.Stats, error) {
return m.GetStats()
}
// UpsertLiveRoute mock implementation
func (m *mockStore) UpsertLiveRoute(route *database.LiveRoute) error {
// Simple mock - just return nil
@ -216,22 +181,12 @@ func (m *mockStore) GetPrefixDistribution() (ipv4 []database.PrefixDistribution,
return nil, nil, nil
}
// GetPrefixDistributionContext mock implementation with context support
func (m *mockStore) GetPrefixDistributionContext(ctx context.Context) (ipv4 []database.PrefixDistribution, ipv6 []database.PrefixDistribution, err error) {
return m.GetPrefixDistribution()
}
// GetLiveRouteCounts mock implementation
func (m *mockStore) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) {
// Return mock counts
return m.RouteCount / 2, m.RouteCount / 2, nil
}
// GetLiveRouteCountsContext mock implementation with context support
func (m *mockStore) GetLiveRouteCountsContext(ctx context.Context) (ipv4Count, ipv6Count int, err error) {
return m.GetLiveRouteCounts()
}
// GetASInfoForIP mock implementation
func (m *mockStore) GetASInfoForIP(ip string) (*database.ASInfo, error) {
// Simple mock - return a test AS
@ -246,11 +201,6 @@ func (m *mockStore) GetASInfoForIP(ip string) (*database.ASInfo, error) {
}, nil
}
// GetASInfoForIPContext mock implementation with context support
func (m *mockStore) GetASInfoForIPContext(ctx context.Context, ip string) (*database.ASInfo, error) {
return m.GetASInfoForIP(ip)
}
// GetASDetails mock implementation
func (m *mockStore) GetASDetails(asn int) (*database.ASN, []database.LiveRoute, error) {
m.mu.Lock()
@ -265,43 +215,12 @@ func (m *mockStore) GetASDetails(asn int) (*database.ASN, []database.LiveRoute,
return nil, nil, database.ErrNoRoute
}
// GetASDetailsContext mock implementation with context support
func (m *mockStore) GetASDetailsContext(ctx context.Context, asn int) (*database.ASN, []database.LiveRoute, error) {
return m.GetASDetails(asn)
}
// GetPrefixDetails mock implementation
func (m *mockStore) GetPrefixDetails(prefix string) ([]database.LiveRoute, error) {
// Return empty routes for now
return []database.LiveRoute{}, nil
}
// GetPrefixDetailsContext mock implementation with context support
func (m *mockStore) GetPrefixDetailsContext(ctx context.Context, prefix string) ([]database.LiveRoute, error) {
return m.GetPrefixDetails(prefix)
}
func (m *mockStore) GetRandomPrefixesByLength(maskLength, ipVersion, limit int) ([]database.LiveRoute, error) {
// Return empty routes for now
return []database.LiveRoute{}, nil
}
// GetRandomPrefixesByLengthContext mock implementation with context support
func (m *mockStore) GetRandomPrefixesByLengthContext(ctx context.Context, maskLength, ipVersion, limit int) ([]database.LiveRoute, error) {
return m.GetRandomPrefixesByLength(maskLength, ipVersion, limit)
}
// GetASPeers mock implementation
func (m *mockStore) GetASPeers(asn int) ([]database.ASPeer, error) {
// Return empty peers for now
return []database.ASPeer{}, nil
}
// GetASPeersContext mock implementation with context support
func (m *mockStore) GetASPeersContext(ctx context.Context, asn int) ([]database.ASPeer, error) {
return m.GetASPeers(asn)
}
// UpsertLiveRouteBatch mock implementation
func (m *mockStore) UpsertLiveRouteBatch(routes []*database.LiveRoute) error {
m.mu.Lock()
@ -343,7 +262,8 @@ func (m *mockStore) GetOrCreateASNBatch(asns map[int]time.Time) error {
for number, timestamp := range asns {
if _, exists := m.ASNs[number]; !exists {
m.ASNs[number] = &database.ASN{
ASN: number,
ID: uuid.New(),
Number: number,
FirstSeen: timestamp,
LastSeen: timestamp,
}

View File

@ -11,14 +11,12 @@ import (
const (
// asHandlerQueueSize is the queue capacity for ASN operations
// DO NOT set this higher than 100000 without explicit instructions
asHandlerQueueSize = 100000
// asnBatchSize is the number of ASN operations to batch together
asnBatchSize = 30000
asnBatchSize = 10000
// asnBatchTimeout is the maximum time to wait before flushing a batch
// DO NOT reduce this timeout - larger batches are more efficient
asnBatchTimeout = 2 * time.Second
)

View File

@ -21,7 +21,7 @@ const (
pathExpirationTime = 30 * time.Minute
// peeringProcessInterval is how often to process AS paths into peerings
peeringProcessInterval = 30 * time.Second
peeringProcessInterval = 2 * time.Minute
// pathPruneInterval is how often to prune old AS paths
pathPruneInterval = 5 * time.Minute

View File

@ -15,14 +15,12 @@ import (
const (
// prefixHandlerQueueSize is the queue capacity for prefix tracking operations
// DO NOT set this higher than 100000 without explicit instructions
prefixHandlerQueueSize = 100000
// prefixBatchSize is the number of prefix updates to batch together
prefixBatchSize = 25000
prefixBatchSize = 5000
// prefixBatchTimeout is the maximum time to wait before flushing a batch
// DO NOT reduce this timeout - larger batches are more efficient
prefixBatchTimeout = 1 * time.Second
// IP version constants
@ -182,15 +180,9 @@ func (h *PrefixHandler) flushBatchLocked() {
var routesToUpsert []*database.LiveRoute
var routesToDelete []database.LiveRouteDeletion
// Collect unique prefixes to update
prefixesToUpdate := make(map[string]time.Time)
// Skip the prefix table updates entirely - just update live_routes
// The prefix table is not critical for routing lookups
for _, update := range prefixMap {
// Track prefix for both announcements and withdrawals
if _, exists := prefixesToUpdate[update.prefix]; !exists || update.timestamp.After(prefixesToUpdate[update.prefix]) {
prefixesToUpdate[update.prefix] = update.timestamp
}
if update.messageType == "announcement" && update.originASN > 0 {
// Create live route for batch upsert
route := h.createLiveRoute(update)
@ -198,20 +190,11 @@ func (h *PrefixHandler) flushBatchLocked() {
routesToUpsert = append(routesToUpsert, route)
}
} else if update.messageType == "withdrawal" {
// Parse CIDR to get IP version
_, ipVersion, err := parseCIDR(update.prefix)
if err != nil {
h.logger.Error("Failed to parse CIDR for withdrawal", "prefix", update.prefix, "error", err)
continue
}
// Create deletion record for batch delete
routesToDelete = append(routesToDelete, database.LiveRouteDeletion{
Prefix: update.prefix,
OriginASN: update.originASN,
PeerIP: update.peer,
IPVersion: ipVersion,
})
}
}
@ -234,13 +217,6 @@ func (h *PrefixHandler) flushBatchLocked() {
}
}
// Update prefix tables
if len(prefixesToUpdate) > 0 {
if err := h.db.UpdatePrefixesBatch(prefixesToUpdate); err != nil {
h.logger.Error("Failed to update prefix batch", "error", err, "count", len(prefixesToUpdate))
}
}
elapsed := time.Since(startTime)
h.logger.Debug("Flushed prefix batch",
"batch_size", batchSize,

View File

@ -15,7 +15,6 @@ import (
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/templates"
asinfo "git.eeqj.de/sneak/routewatch/pkg/asinfo"
"github.com/dustin/go-humanize"
"github.com/go-chi/chi/v5"
)
@ -80,8 +79,8 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
}
return func(w http.ResponseWriter, r *http.Request) {
// Create a 4 second timeout context for this request
ctx, cancel := context.WithTimeout(r.Context(), 4*time.Second)
// Create a 1 second timeout context for this request
ctx, cancel := context.WithTimeout(r.Context(), 1*time.Second)
defer cancel()
metrics := s.streamer.GetMetrics()
@ -91,7 +90,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
errChan := make(chan error)
go func() {
dbStats, err := s.db.GetStatsContext(ctx)
dbStats, err := s.db.GetStats()
if err != nil {
s.logger.Debug("Database stats query failed", "error", err)
errChan <- err
@ -125,7 +124,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
const bitsPerMegabit = 1000000.0
// Get route counts from database
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCountsContext(ctx)
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts()
if err != nil {
s.logger.Warn("Failed to get live route counts", "error", err)
// Continue with zero counts
@ -174,15 +173,14 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
func (s *Server) handleStats() http.HandlerFunc {
// HandlerStatsInfo represents handler statistics in the API response
type HandlerStatsInfo struct {
Name string `json:"name"`
QueueLength int `json:"queue_length"`
QueueCapacity int `json:"queue_capacity"`
QueueHighWaterMark int `json:"queue_high_water_mark"`
ProcessedCount uint64 `json:"processed_count"`
DroppedCount uint64 `json:"dropped_count"`
AvgProcessTimeMs float64 `json:"avg_process_time_ms"`
MinProcessTimeMs float64 `json:"min_process_time_ms"`
MaxProcessTimeMs float64 `json:"max_process_time_ms"`
Name string `json:"name"`
QueueLength int `json:"queue_length"`
QueueCapacity int `json:"queue_capacity"`
ProcessedCount uint64 `json:"processed_count"`
DroppedCount uint64 `json:"dropped_count"`
AvgProcessTimeMs float64 `json:"avg_process_time_ms"`
MinProcessTimeMs float64 `json:"min_process_time_ms"`
MaxProcessTimeMs float64 `json:"max_process_time_ms"`
}
// StatsResponse represents the API statistics response
@ -214,8 +212,8 @@ func (s *Server) handleStats() http.HandlerFunc {
}
return func(w http.ResponseWriter, r *http.Request) {
// Create a 4 second timeout context for this request
ctx, cancel := context.WithTimeout(r.Context(), 4*time.Second)
// Create a 1 second timeout context for this request
ctx, cancel := context.WithTimeout(r.Context(), 1*time.Second)
defer cancel()
// Check if context is already cancelled
@ -234,7 +232,7 @@ func (s *Server) handleStats() http.HandlerFunc {
errChan := make(chan error)
go func() {
dbStats, err := s.db.GetStatsContext(ctx)
dbStats, err := s.db.GetStats()
if err != nil {
s.logger.Debug("Database stats query failed", "error", err)
errChan <- err
@ -248,11 +246,12 @@ func (s *Server) handleStats() http.HandlerFunc {
select {
case <-ctx.Done():
s.logger.Error("Database stats timeout")
// Don't write response here - timeout middleware already handles it
http.Error(w, "Database timeout", http.StatusRequestTimeout)
return
case err := <-errChan:
s.logger.Error("Failed to get database stats", "error", err)
writeJSONError(w, http.StatusInternalServerError, err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
case dbStats = <-statsChan:
@ -267,7 +266,7 @@ func (s *Server) handleStats() http.HandlerFunc {
const bitsPerMegabit = 1000000.0
// Get route counts from database
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCountsContext(ctx)
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts()
if err != nil {
s.logger.Warn("Failed to get live route counts", "error", err)
// Continue with zero counts
@ -282,15 +281,14 @@ func (s *Server) handleStats() http.HandlerFunc {
const microsecondsPerMillisecond = 1000.0
for _, hs := range handlerStats {
handlerStatsInfo = append(handlerStatsInfo, HandlerStatsInfo{
Name: hs.Name,
QueueLength: hs.QueueLength,
QueueCapacity: hs.QueueCapacity,
QueueHighWaterMark: hs.QueueHighWaterMark,
ProcessedCount: hs.ProcessedCount,
DroppedCount: hs.DroppedCount,
AvgProcessTimeMs: float64(hs.AvgProcessTime.Microseconds()) / microsecondsPerMillisecond,
MinProcessTimeMs: float64(hs.MinProcessTime.Microseconds()) / microsecondsPerMillisecond,
MaxProcessTimeMs: float64(hs.MaxProcessTime.Microseconds()) / microsecondsPerMillisecond,
Name: hs.Name,
QueueLength: hs.QueueLength,
QueueCapacity: hs.QueueCapacity,
ProcessedCount: hs.ProcessedCount,
DroppedCount: hs.DroppedCount,
AvgProcessTimeMs: float64(hs.AvgProcessTime.Microseconds()) / microsecondsPerMillisecond,
MinProcessTimeMs: float64(hs.MinProcessTime.Microseconds()) / microsecondsPerMillisecond,
MaxProcessTimeMs: float64(hs.MaxProcessTime.Microseconds()) / microsecondsPerMillisecond,
})
}
@ -355,7 +353,7 @@ func (s *Server) handleIPLookup() http.HandlerFunc {
}
// Look up AS information for the IP
asInfo, err := s.db.GetASInfoForIPContext(r.Context(), ip)
asInfo, err := s.db.GetASInfoForIP(ip)
if err != nil {
// Check if it's an invalid IP error
if errors.Is(err, database.ErrInvalidIP) {
@ -386,7 +384,7 @@ func (s *Server) handleASDetailJSON() http.HandlerFunc {
return
}
asInfo, prefixes, err := s.db.GetASDetailsContext(r.Context(), asn)
asInfo, prefixes, err := s.db.GetASDetails(asn)
if err != nil {
if errors.Is(err, database.ErrNoRoute) {
writeJSONError(w, http.StatusNotFound, err.Error())
@ -439,7 +437,7 @@ func (s *Server) handlePrefixDetailJSON() http.HandlerFunc {
return
}
routes, err := s.db.GetPrefixDetailsContext(r.Context(), prefix)
routes, err := s.db.GetPrefixDetails(prefix)
if err != nil {
if errors.Is(err, database.ErrNoRoute) {
writeJSONError(w, http.StatusNotFound, err.Error())
@ -481,7 +479,7 @@ func (s *Server) handleASDetail() http.HandlerFunc {
return
}
asInfo, prefixes, err := s.db.GetASDetailsContext(r.Context(), asn)
asInfo, prefixes, err := s.db.GetASDetails(asn)
if err != nil {
if errors.Is(err, database.ErrNoRoute) {
http.Error(w, "AS not found", http.StatusNotFound)
@ -493,14 +491,6 @@ func (s *Server) handleASDetail() http.HandlerFunc {
return
}
// Get peers
peers, err := s.db.GetASPeersContext(r.Context(), asn)
if err != nil {
s.logger.Error("Failed to get AS peers", "error", err)
// Continue without peers rather than failing the whole request
peers = []database.ASPeer{}
}
// Group prefixes by IP version
const ipVersionV4 = 4
var ipv4Prefixes, ipv6Prefixes []database.LiveRoute
@ -557,8 +547,6 @@ func (s *Server) handleASDetail() http.HandlerFunc {
TotalCount int
IPv4Count int
IPv6Count int
Peers []database.ASPeer
PeerCount int
}{
ASN: asInfo,
IPv4Prefixes: ipv4Prefixes,
@ -566,16 +554,6 @@ func (s *Server) handleASDetail() http.HandlerFunc {
TotalCount: len(prefixes),
IPv4Count: len(ipv4Prefixes),
IPv6Count: len(ipv6Prefixes),
Peers: peers,
PeerCount: len(peers),
}
// Check if context is still valid before writing response
select {
case <-r.Context().Done():
// Request was cancelled, don't write response
return
default:
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
@ -605,7 +583,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
return
}
routes, err := s.db.GetPrefixDetailsContext(r.Context(), prefix)
routes, err := s.db.GetPrefixDetails(prefix)
if err != nil {
if errors.Is(err, database.ErrNoRoute) {
http.Error(w, "Prefix not found", http.StatusNotFound)
@ -619,7 +597,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
// Group by origin AS and collect unique AS info
type ASNInfo struct {
ASN int
Number int
Handle string
Description string
PeerCount int
@ -628,7 +606,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
for _, route := range routes {
if _, exists := originMap[route.OriginASN]; !exists {
// Get AS info from database
asInfo, _, _ := s.db.GetASDetailsContext(r.Context(), route.OriginASN)
asInfo, _, _ := s.db.GetASDetails(route.OriginASN)
handle := ""
description := ""
if asInfo != nil {
@ -636,7 +614,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
description = asInfo.Description
}
originMap[route.OriginASN] = &ASNInfo{
ASN: route.OriginASN,
Number: route.OriginASN,
Handle: handle,
Description: description,
PeerCount: 0,
@ -667,41 +645,12 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
origins = append(origins, origin)
}
// Create enhanced routes with AS path handles
type ASPathEntry struct {
ASN int
Handle string
}
type EnhancedRoute struct {
database.LiveRoute
ASPathWithHandle []ASPathEntry
}
enhancedRoutes := make([]EnhancedRoute, len(routes))
for i, route := range routes {
enhancedRoute := EnhancedRoute{
LiveRoute: route,
ASPathWithHandle: make([]ASPathEntry, len(route.ASPath)),
}
// Look up handle for each AS in the path
for j, asn := range route.ASPath {
handle := asinfo.GetHandle(asn)
enhancedRoute.ASPathWithHandle[j] = ASPathEntry{
ASN: asn,
Handle: handle,
}
}
enhancedRoutes[i] = enhancedRoute
}
// Prepare template data
data := struct {
Prefix string
MaskLength int
IPVersion int
Routes []EnhancedRoute
Routes []database.LiveRoute
Origins []*ASNInfo
PeerCount int
OriginCount int
@ -709,20 +658,12 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
Prefix: prefix,
MaskLength: maskLength,
IPVersion: ipVersion,
Routes: enhancedRoutes,
Routes: routes,
Origins: origins,
PeerCount: len(routes),
OriginCount: len(originMap),
}
// Check if context is still valid before writing response
select {
case <-r.Context().Done():
// Request was cancelled, don't write response
return
default:
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
tmpl := templates.PrefixDetailTemplate()
if err := tmpl.Execute(w, data); err != nil {
@ -761,123 +702,3 @@ func (s *Server) handleIPRedirect() http.HandlerFunc {
http.Redirect(w, r, "/prefix/"+url.QueryEscape(asInfo.Prefix), http.StatusSeeOther)
}
}
// handlePrefixLength shows a random sample of prefixes with the specified mask length
func (s *Server) handlePrefixLength() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
lengthStr := chi.URLParam(r, "length")
if lengthStr == "" {
http.Error(w, "Length parameter is required", http.StatusBadRequest)
return
}
maskLength, err := strconv.Atoi(lengthStr)
if err != nil {
http.Error(w, "Invalid mask length", http.StatusBadRequest)
return
}
// Determine IP version based on mask length
const (
maxIPv4MaskLength = 32
maxIPv6MaskLength = 128
)
var ipVersion int
if maskLength <= maxIPv4MaskLength {
ipVersion = 4
} else if maskLength <= maxIPv6MaskLength {
ipVersion = 6
} else {
http.Error(w, "Invalid mask length", http.StatusBadRequest)
return
}
// Get random sample of prefixes
const maxPrefixes = 500
prefixes, err := s.db.GetRandomPrefixesByLengthContext(r.Context(), maskLength, ipVersion, maxPrefixes)
if err != nil {
s.logger.Error("Failed to get prefixes by length", "error", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
// Sort prefixes for display
sort.Slice(prefixes, func(i, j int) bool {
// First compare by IP version
if prefixes[i].IPVersion != prefixes[j].IPVersion {
return prefixes[i].IPVersion < prefixes[j].IPVersion
}
// Then by prefix
return prefixes[i].Prefix < prefixes[j].Prefix
})
// Create enhanced prefixes with AS descriptions
type EnhancedPrefix struct {
database.LiveRoute
OriginASDescription string
Age string
}
enhancedPrefixes := make([]EnhancedPrefix, len(prefixes))
for i, prefix := range prefixes {
enhancedPrefixes[i] = EnhancedPrefix{
LiveRoute: prefix,
Age: formatAge(prefix.LastUpdated),
}
// Get AS description
if asInfo, ok := asinfo.Get(prefix.OriginASN); ok {
enhancedPrefixes[i].OriginASDescription = asInfo.Description
}
}
// Render template
data := map[string]interface{}{
"MaskLength": maskLength,
"IPVersion": ipVersion,
"Prefixes": enhancedPrefixes,
"Count": len(prefixes),
}
// Check if context is still valid before writing response
select {
case <-r.Context().Done():
// Request was cancelled, don't write response
return
default:
}
tmpl := templates.PrefixLengthTemplate()
if err := tmpl.Execute(w, data); err != nil {
s.logger.Error("Failed to render prefix length template", "error", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}
}
}
// formatAge returns a human-readable age string
func formatAge(timestamp time.Time) string {
age := time.Since(timestamp)
const hoursPerDay = 24
if age < time.Minute {
return "< 1m"
} else if age < time.Hour {
minutes := int(age.Minutes())
return strconv.Itoa(minutes) + "m"
} else if age < hoursPerDay*time.Hour {
hours := int(age.Hours())
return strconv.Itoa(hours) + "h"
}
days := int(age.Hours() / hoursPerDay)
return strconv.Itoa(days) + "d"
}

View File

@ -108,7 +108,6 @@ type timeoutWriter struct {
http.ResponseWriter
mu sync.Mutex
written bool
header http.Header // cached header to prevent concurrent access
}
func (tw *timeoutWriter) Write(b []byte) (int, error) {
@ -134,18 +133,6 @@ func (tw *timeoutWriter) WriteHeader(statusCode int) {
}
func (tw *timeoutWriter) Header() http.Header {
tw.mu.Lock()
defer tw.mu.Unlock()
if tw.written {
// Return a copy to prevent modifications after timeout
if tw.header == nil {
tw.header = make(http.Header)
}
return tw.header
}
return tw.ResponseWriter.Header()
}
@ -166,7 +153,6 @@ func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler {
tw := &timeoutWriter{
ResponseWriter: w,
header: make(http.Header),
}
done := make(chan struct{})
@ -192,12 +178,8 @@ func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler {
tw.markWritten() // Prevent the handler from writing after timeout
execTime := time.Since(startTime)
// Write directly to the underlying writer since we've marked tw as written
// This is safe because markWritten() prevents the handler from writing
tw.mu.Lock()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusRequestTimeout)
tw.mu.Unlock()
response := map[string]interface{}{
"status": "error",
@ -217,68 +199,3 @@ func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler {
})
}
}
// JSONValidationMiddleware ensures all JSON API responses are valid JSON
func JSONValidationMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Create a custom response writer to capture the response
rw := &responseWriter{
ResponseWriter: w,
body: &bytes.Buffer{},
statusCode: http.StatusOK,
}
// Serve the request
next.ServeHTTP(rw, r)
// Check if it's meant to be a JSON response
contentType := rw.Header().Get("Content-Type")
isJSON := contentType == "application/json" || contentType == ""
// If it's not JSON or has content, pass through
if !isJSON && rw.body.Len() > 0 {
w.WriteHeader(rw.statusCode)
_, _ = w.Write(rw.body.Bytes())
return
}
// For JSON responses, validate the JSON
if rw.body.Len() > 0 {
var testParse interface{}
if err := json.Unmarshal(rw.body.Bytes(), &testParse); err == nil {
// Valid JSON, write it out
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(rw.statusCode)
_, _ = w.Write(rw.body.Bytes())
return
}
}
// If we get here, either there's no body or invalid JSON
// Write a proper error response
w.Header().Set("Content-Type", "application/json")
// Determine appropriate status code
statusCode := rw.statusCode
if statusCode == http.StatusOK {
statusCode = http.StatusInternalServerError
}
w.WriteHeader(statusCode)
errorMsg := "Internal server error"
if statusCode == http.StatusRequestTimeout {
errorMsg = "Request timeout"
}
response := map[string]interface{}{
"status": "error",
"error": map[string]interface{}{
"msg": errorMsg,
"code": statusCode,
},
}
_ = json.NewEncoder(w).Encode(response)
})
}

View File

@ -16,24 +16,22 @@ func (s *Server) setupRoutes() {
r.Use(middleware.RealIP)
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
const requestTimeout = 8 * time.Second
const requestTimeout = 2 * time.Second
r.Use(TimeoutMiddleware(requestTimeout))
r.Use(JSONResponseMiddleware)
// Routes
r.Get("/", s.handleRoot())
r.Get("/status", s.handleStatusHTML())
r.Get("/status.json", JSONValidationMiddleware(s.handleStatusJSON()).ServeHTTP)
r.Get("/status.json", s.handleStatusJSON())
// AS and prefix detail pages
r.Get("/as/{asn}", s.handleASDetail())
r.Get("/prefix/{prefix}", s.handlePrefixDetail())
r.Get("/prefixlength/{length}", s.handlePrefixLength())
r.Get("/ip/{ip}", s.handleIPRedirect())
// API routes
r.Route("/api/v1", func(r chi.Router) {
r.Use(JSONValidationMiddleware)
r.Get("/stats", s.handleStats())
r.Get("/ip/{ip}", s.handleIPLookup())
r.Get("/as/{asn}", s.handleASDetailJSON())

View File

@ -42,7 +42,7 @@ func (s *Server) Start() error {
port = "8080"
}
const readHeaderTimeout = 40 * time.Second
const readHeaderTimeout = 10 * time.Second
s.srv = &http.Server{
Addr: ":" + port,
Handler: s.router,

View File

@ -7,9 +7,8 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
"net/http"
"os"
"sync"
"sync/atomic"
"time"
@ -20,20 +19,13 @@ import (
)
const (
risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json&" +
"client=https%3A%2F%2Fgit.eeqj.de%2Fsneak%2Froutewatch"
risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json"
metricsWindowSize = 60 // seconds for rolling average
metricsUpdateRate = time.Second
minBackoffDelay = 5 * time.Second
maxBackoffDelay = 320 * time.Second
metricsLogInterval = 10 * time.Second
bytesPerKB = 1024
bytesPerMB = 1024 * 1024
maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers
// Backpressure constants
backpressureThreshold = 0.5 // Start dropping at 50% queue utilization
backpressureSlope = 2.0 // Slope for linear drop probability increase
)
// MessageHandler is an interface for handling RIS messages
@ -54,13 +46,12 @@ type RawMessageHandler func(line string)
// handlerMetrics tracks performance metrics for a handler
type handlerMetrics struct {
processedCount uint64 // Total messages processed
droppedCount uint64 // Total messages dropped
totalTime time.Duration // Total processing time (for average calculation)
minTime time.Duration // Minimum processing time
maxTime time.Duration // Maximum processing time
queueHighWaterMark int // Maximum queue length seen
mu sync.Mutex // Protects the metrics
processedCount uint64 // Total messages processed
droppedCount uint64 // Total messages dropped
totalTime time.Duration // Total processing time (for average calculation)
minTime time.Duration // Minimum processing time
maxTime time.Duration // Maximum processing time
mu sync.Mutex // Protects the metrics
}
// handlerInfo wraps a handler with its queue and metrics
@ -80,8 +71,7 @@ type Streamer struct {
cancel context.CancelFunc
running bool
metrics *metrics.Tracker
totalDropped uint64 // Total dropped messages across all handlers
random *rand.Rand // Random number generator for backpressure drops
totalDropped uint64 // Total dropped messages across all handlers
}
// New creates a new RIS streamer
@ -93,8 +83,6 @@ func New(logger *logger.Logger, metrics *metrics.Tracker) *Streamer {
},
handlers: make([]*handlerInfo, 0),
metrics: metrics,
//nolint:gosec // Non-cryptographic randomness is fine for backpressure
random: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
@ -107,9 +95,6 @@ func (s *Streamer) RegisterHandler(handler MessageHandler) {
info := &handlerInfo{
handler: handler,
queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()),
metrics: handlerMetrics{
minTime: time.Duration(math.MaxInt64), // Initialize to max so first value sets the floor
},
}
s.handlers = append(s.handlers, info)
@ -146,7 +131,9 @@ func (s *Streamer) Start() error {
}
go func() {
s.streamWithReconnect(ctx)
if err := s.stream(ctx); err != nil {
s.logger.Error("Streaming error", "error", err)
}
s.mu.Lock()
s.running = false
s.mu.Unlock()
@ -183,7 +170,7 @@ func (s *Streamer) runHandlerWorker(info *handlerInfo) {
info.metrics.totalTime += elapsed
// Update min time
if elapsed < info.metrics.minTime {
if info.metrics.minTime == 0 || elapsed < info.metrics.minTime {
info.metrics.minTime = elapsed
}
@ -215,15 +202,14 @@ func (s *Streamer) GetMetricsTracker() *metrics.Tracker {
// HandlerStats represents metrics for a single handler
type HandlerStats struct {
Name string
QueueLength int
QueueCapacity int
QueueHighWaterMark int
ProcessedCount uint64
DroppedCount uint64
AvgProcessTime time.Duration
MinProcessTime time.Duration
MaxProcessTime time.Duration
Name string
QueueLength int
QueueCapacity int
ProcessedCount uint64
DroppedCount uint64
AvgProcessTime time.Duration
MinProcessTime time.Duration
MaxProcessTime time.Duration
}
// GetHandlerStats returns current handler statistics
@ -237,14 +223,13 @@ func (s *Streamer) GetHandlerStats() []HandlerStats {
info.metrics.mu.Lock()
hs := HandlerStats{
Name: fmt.Sprintf("%T", info.handler),
QueueLength: len(info.queue),
QueueCapacity: cap(info.queue),
QueueHighWaterMark: info.metrics.queueHighWaterMark,
ProcessedCount: info.metrics.processedCount,
DroppedCount: info.metrics.droppedCount,
MinProcessTime: info.metrics.minTime,
MaxProcessTime: info.metrics.maxTime,
Name: fmt.Sprintf("%T", info.handler),
QueueLength: len(info.queue),
QueueCapacity: cap(info.queue),
ProcessedCount: info.metrics.processedCount,
DroppedCount: info.metrics.droppedCount,
MinProcessTime: info.metrics.minTime,
MaxProcessTime: info.metrics.maxTime,
}
// Calculate average time
@ -335,72 +320,6 @@ func (s *Streamer) updateMetrics(messageBytes int) {
s.metrics.RecordMessage(int64(messageBytes))
}
// streamWithReconnect handles streaming with automatic reconnection and exponential backoff
func (s *Streamer) streamWithReconnect(ctx context.Context) {
backoffDelay := minBackoffDelay
consecutiveFailures := 0
for {
select {
case <-ctx.Done():
s.logger.Info("Stream context cancelled, stopping reconnection attempts")
return
default:
}
// Attempt to stream
startTime := time.Now()
err := s.stream(ctx)
streamDuration := time.Since(startTime)
if err == nil {
// Clean exit (context cancelled)
return
}
// Log the error
s.logger.Error("Stream disconnected",
"error", err,
"consecutive_failures", consecutiveFailures+1,
"stream_duration", streamDuration)
s.metrics.SetConnected(false)
// Check if context is cancelled
if ctx.Err() != nil {
return
}
// If we streamed for more than 30 seconds, reset the backoff
// This indicates we had a successful connection that received data
if streamDuration > 30*time.Second {
s.logger.Info("Resetting backoff delay due to successful connection",
"stream_duration", streamDuration)
backoffDelay = minBackoffDelay
consecutiveFailures = 0
} else {
// Increment consecutive failures
consecutiveFailures++
}
// Wait with exponential backoff
s.logger.Info("Waiting before reconnection attempt",
"delay_seconds", backoffDelay.Seconds(),
"consecutive_failures", consecutiveFailures)
select {
case <-ctx.Done():
return
case <-time.After(backoffDelay):
// Double the backoff delay for next time, up to max
backoffDelay *= 2
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
}
}
}
}
func (s *Streamer) stream(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, "GET", risLiveURL, nil)
if err != nil {
@ -471,13 +390,10 @@ func (s *Streamer) stream(ctx context.Context) error {
// Parse the message first
var wrapper ristypes.RISLiveMessage
if err := json.Unmarshal(line, &wrapper); err != nil {
// Log the error and return to trigger reconnection
s.logger.Error("Failed to parse JSON",
"error", err,
"line", string(line),
"line_length", len(line))
return fmt.Errorf("JSON parse error: %w", err)
// Output the raw line and panic on parse failure
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err)
fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(line))
panic(fmt.Sprintf("JSON parse error: %v", err))
}
// Check if it's a ris_message wrapper
@ -527,43 +443,32 @@ func (s *Streamer) stream(ctx context.Context) error {
// Peer state changes - silently ignore
continue
default:
s.logger.Error("Unknown message type",
"type", msg.Type,
"line", string(line),
fmt.Fprintf(
os.Stderr,
"UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n",
msg.Type,
string(line),
)
panic(
fmt.Sprintf(
"Unknown RIS message type: %s",
msg.Type,
),
)
panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type))
}
// Dispatch to interested handlers
s.mu.RLock()
for _, info := range s.handlers {
if !info.handler.WantsMessage(msg.Type) {
continue
}
// Check if we should drop due to backpressure
if s.shouldDropForBackpressure(info) {
atomic.AddUint64(&info.metrics.droppedCount, 1)
atomic.AddUint64(&s.totalDropped, 1)
continue
}
// Try to queue the message
select {
case info.queue <- &msg:
// Message queued successfully
// Update high water mark if needed
queueLen := len(info.queue)
info.metrics.mu.Lock()
if queueLen > info.metrics.queueHighWaterMark {
info.metrics.queueHighWaterMark = queueLen
if info.handler.WantsMessage(msg.Type) {
select {
case info.queue <- &msg:
// Message queued successfully
default:
// Queue is full, drop the message
atomic.AddUint64(&info.metrics.droppedCount, 1)
atomic.AddUint64(&s.totalDropped, 1)
}
info.metrics.mu.Unlock()
default:
// Queue is full, drop the message
atomic.AddUint64(&info.metrics.droppedCount, 1)
atomic.AddUint64(&s.totalDropped, 1)
}
}
s.mu.RUnlock()
@ -575,25 +480,3 @@ func (s *Streamer) stream(ctx context.Context) error {
return nil
}
// shouldDropForBackpressure determines if a message should be dropped based on queue utilization
func (s *Streamer) shouldDropForBackpressure(info *handlerInfo) bool {
// Calculate queue utilization
queueLen := len(info.queue)
queueCap := cap(info.queue)
utilization := float64(queueLen) / float64(queueCap)
// No drops below threshold
if utilization < backpressureThreshold {
return false
}
// Calculate drop probability (0.0 at threshold, 1.0 at 100% full)
dropProbability := (utilization - backpressureThreshold) * backpressureSlope
if dropProbability > 1.0 {
dropProbability = 1.0
}
// Random drop based on probability
return s.random.Float64() < dropProbability
}

View File

@ -3,7 +3,7 @@
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AS{{.ASN.ASN}} - {{.ASN.Handle}} - RouteWatch</title>
<title>AS{{.ASN.Number}} - {{.ASN.Handle}} - RouteWatch</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
@ -136,7 +136,7 @@
<div class="container">
<a href="/status" class="nav-link">← Back to Status</a>
<h1>AS{{.ASN.ASN}}{{if .ASN.Handle}} - {{.ASN.Handle}}{{end}}</h1>
<h1>AS{{.ASN.Number}}{{if .ASN.Handle}} - {{.ASN.Handle}}{{end}}</h1>
{{if .ASN.Description}}
<p class="subtitle">{{.ASN.Description}}</p>
{{end}}
@ -154,10 +154,6 @@
<div class="info-label">IPv6 Prefixes</div>
<div class="info-value">{{.IPv6Count}}</div>
</div>
<div class="info-card">
<div class="info-label">Peer ASNs</div>
<div class="info-value">{{.PeerCount}}</div>
</div>
<div class="info-card">
<div class="info-label">First Seen</div>
<div class="info-value">{{.ASN.FirstSeen.Format "2006-01-02"}}</div>
@ -227,44 +223,6 @@
<p>No prefixes announced by this AS</p>
</div>
{{end}}
{{if .Peers}}
<div class="prefix-section">
<div class="prefix-header">
<h2>Peer ASNs</h2>
<span class="prefix-count">{{.PeerCount}}</span>
</div>
<table class="prefix-table">
<thead>
<tr>
<th>ASN</th>
<th>Handle</th>
<th>Description</th>
<th>First Seen</th>
<th>Last Seen</th>
</tr>
</thead>
<tbody>
{{range .Peers}}
<tr>
<td><a href="/as/{{.ASN}}" class="prefix-link">AS{{.ASN}}</a></td>
<td>{{if .Handle}}{{.Handle}}{{else}}-{{end}}</td>
<td>{{if .Description}}{{.Description}}{{else}}-{{end}}</td>
<td>{{.FirstSeen.Format "2006-01-02"}}</td>
<td>{{.LastSeen.Format "2006-01-02"}}</td>
</tr>
{{end}}
</tbody>
</table>
</div>
{{else}}
<div class="prefix-section">
<h2>Peer ASNs</h2>
<div class="empty-state">
<p>No peering relationships found for this AS</p>
</div>
</div>
{{end}}
</div>
</body>
</html>

View File

@ -13,8 +13,7 @@
color: #333;
}
.container {
width: 90%;
max-width: 1600px;
max-width: 1200px;
margin: 0 auto;
background: white;
padding: 30px;
@ -92,7 +91,6 @@
.route-table td {
padding: 12px;
border-bottom: 1px solid #e0e0e0;
white-space: nowrap;
}
.route-table tr:hover {
background: #f8f9fa;
@ -116,13 +114,9 @@
font-family: monospace;
font-size: 13px;
color: #666;
max-width: 600px;
word-wrap: break-word;
white-space: normal !important;
line-height: 1.5;
}
.as-path .as-link {
font-weight: 600;
max-width: 300px;
overflow-x: auto;
white-space: nowrap;
}
.age {
color: #7f8c8d;
@ -174,7 +168,7 @@
font-size: 14px;
}
.as-path {
max-width: 100%;
max-width: 150px;
}
}
</style>
@ -207,7 +201,7 @@
<div class="origin-list">
{{range .Origins}}
<div class="origin-item">
<a href="/as/{{.ASN}}" class="as-link">AS{{.ASN}}</a>
<a href="/as/{{.Number}}" class="as-link">AS{{.Number}}</a>
{{if .Handle}} ({{.Handle}}){{end}}
<span style="color: #7f8c8d; margin-left: 10px;">{{.PeerCount}} peer{{if ne .PeerCount 1}}s{{end}}</span>
</div>
@ -240,7 +234,7 @@
<a href="/as/{{.OriginASN}}" class="as-link">AS{{.OriginASN}}</a>
</td>
<td class="peer-ip">{{.PeerIP}}</td>
<td class="as-path">{{range $i, $as := .ASPathWithHandle}}{{if $i}} → {{end}}<a href="/as/{{$as.ASN}}" class="as-link">{{if $as.Handle}}{{$as.Handle}}{{else}}AS{{$as.ASN}}{{end}}</a>{{end}}</td>
<td class="as-path">{{range $i, $as := .ASPath}}{{if $i}} → {{end}}{{$as}}{{end}}</td>
<td class="peer-ip">{{.NextHop}}</td>
<td>{{.LastUpdated.Format "2006-01-02 15:04:05"}}</td>
<td class="age">{{.LastUpdated | timeSince}}</td>

View File

@ -1,108 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Prefixes with /{{ .MaskLength }} - RouteWatch</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
background: #f5f5f5;
}
h1 {
color: #333;
margin-bottom: 10px;
}
.subtitle {
color: #666;
margin-bottom: 30px;
}
.info-card {
background: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
margin-bottom: 20px;
}
table {
width: 100%;
border-collapse: collapse;
background: white;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
border-radius: 8px;
overflow: hidden;
}
th {
background: #f8f9fa;
padding: 12px;
text-align: left;
font-weight: 600;
color: #333;
border-bottom: 2px solid #dee2e6;
}
td {
padding: 12px;
border-bottom: 1px solid #eee;
}
tr:last-child td {
border-bottom: none;
}
tr:hover {
background: #f8f9fa;
}
a {
color: #0066cc;
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
.prefix-link {
font-family: 'SF Mono', Monaco, 'Cascadia Mono', 'Roboto Mono', Consolas, 'Courier New', monospace;
}
.as-link {
white-space: nowrap;
}
.age {
color: #666;
font-size: 0.9em;
}
.back-link {
display: inline-block;
margin-bottom: 20px;
color: #0066cc;
}
</style>
</head>
<body>
<a href="/status" class="back-link">← Back to Status</a>
<h1>IPv{{ .IPVersion }} Prefixes with /{{ .MaskLength }}</h1>
<p class="subtitle">Showing {{ .Count }} randomly selected prefixes</p>
<table>
<thead>
<tr>
<th>Prefix</th>
<th>Age</th>
<th>Origin AS</th>
</tr>
</thead>
<tbody>
{{ range .Prefixes }}
<tr>
<td><a href="/prefix/{{ .Prefix | urlEncode }}" class="prefix-link">{{ .Prefix }}</a></td>
<td class="age">{{ .Age }}</td>
<td>
<a href="/as/{{ .OriginASN }}" class="as-link">
AS{{ .OriginASN }}{{ if .OriginASDescription }} ({{ .OriginASDescription }}){{ end }}
</a>
</td>
</tr>
{{ end }}
</tbody>
</table>
</body>
</html>

View File

@ -49,16 +49,6 @@
font-family: 'SF Mono', Monaco, 'Cascadia Mono', 'Roboto Mono', Consolas, 'Courier New', monospace;
color: #333;
}
.metric-value.metric-link {
text-decoration: underline;
text-decoration-style: dashed;
text-underline-offset: 2px;
cursor: pointer;
}
.metric-value.metric-link:hover {
color: #0066cc;
text-decoration-style: solid;
}
.connected {
color: #22c55e;
}
@ -241,7 +231,7 @@
metric.className = 'metric';
metric.innerHTML = `
<span class="metric-label">/${item.mask_length}</span>
<a href="/prefixlength/${item.mask_length}" class="metric-value metric-link">${formatNumber(item.count)}</a>
<span class="metric-value">${formatNumber(item.count)}</span>
`;
container.appendChild(metric);
});
@ -264,10 +254,6 @@
<span class="metric-label">Queue</span>
<span class="metric-value">${handler.queue_length}/${handler.queue_capacity}</span>
</div>
<div class="metric">
<span class="metric-label">High Water Mark</span>
<span class="metric-value">${handler.queue_high_water_mark}/${handler.queue_capacity} (${Math.round(handler.queue_high_water_mark * 100 / handler.queue_capacity)}%)</span>
</div>
<div class="metric">
<span class="metric-label">Processed</span>
<span class="metric-value">${formatNumber(handler.processed_count)}</span>
@ -290,39 +276,6 @@
});
}
function resetAllFields() {
// Reset all metric fields to '-'
document.getElementById('connected').textContent = '-';
document.getElementById('connected').className = 'metric-value';
document.getElementById('uptime').textContent = '-';
document.getElementById('go_version').textContent = '-';
document.getElementById('goroutines').textContent = '-';
document.getElementById('memory_usage').textContent = '-';
document.getElementById('total_messages').textContent = '-';
document.getElementById('messages_per_sec').textContent = '-';
document.getElementById('total_bytes').textContent = '-';
document.getElementById('mbits_per_sec').textContent = '-';
document.getElementById('asns').textContent = '-';
document.getElementById('prefixes').textContent = '-';
document.getElementById('ipv4_prefixes').textContent = '-';
document.getElementById('ipv6_prefixes').textContent = '-';
document.getElementById('peerings').textContent = '-';
document.getElementById('peers').textContent = '-';
document.getElementById('database_size').textContent = '-';
document.getElementById('live_routes').textContent = '-';
document.getElementById('ipv4_routes').textContent = '-';
document.getElementById('ipv6_routes').textContent = '-';
document.getElementById('ipv4_updates_per_sec').textContent = '-';
document.getElementById('ipv6_updates_per_sec').textContent = '-';
// Clear handler stats
document.getElementById('handler-stats-container').innerHTML = '';
// Clear prefix distributions
document.getElementById('ipv4-prefix-distribution').innerHTML = '<div class="metric"><span class="metric-label">No data</span></div>';
document.getElementById('ipv6-prefix-distribution').innerHTML = '<div class="metric"><span class="metric-label">No data</span></div>';
}
function updateStatus() {
fetch('/api/v1/stats')
.then(response => response.json())
@ -331,7 +284,6 @@
if (response.status === 'error') {
document.getElementById('error').textContent = 'Error: ' + response.error.msg;
document.getElementById('error').style.display = 'block';
resetAllFields();
return;
}
@ -378,13 +330,12 @@
.catch(error => {
document.getElementById('error').textContent = 'Error fetching status: ' + error;
document.getElementById('error').style.display = 'block';
resetAllFields();
});
}
// Update immediately and then every 2 seconds
// Update immediately and then every 500ms
updateStatus();
setInterval(updateStatus, 2000);
setInterval(updateStatus, 500);
</script>
</body>
</html>

View File

@ -18,15 +18,11 @@ var asDetailHTML string
//go:embed prefix_detail.html
var prefixDetailHTML string
//go:embed prefix_length.html
var prefixLengthHTML string
// Templates contains all parsed templates
type Templates struct {
Status *template.Template
ASDetail *template.Template
PrefixDetail *template.Template
PrefixLength *template.Template
}
var (
@ -103,12 +99,6 @@ func initTemplates() {
if err != nil {
panic("failed to parse prefix detail template: " + err.Error())
}
// Parse prefix length template
defaultTemplates.PrefixLength, err = template.New("prefixLength").Funcs(funcs).Parse(prefixLengthHTML)
if err != nil {
panic("failed to parse prefix length template: " + err.Error())
}
}
// Get returns the singleton Templates instance
@ -132,8 +122,3 @@ func ASDetailTemplate() *template.Template {
func PrefixDetailTemplate() *template.Template {
return Get().PrefixDetail
}
// PrefixLengthTemplate returns the parsed prefix length template
func PrefixLengthTemplate() *template.Template {
return Get().PrefixLength
}

477614
log.txt

File diff suppressed because it is too large Load Diff