Compare commits

..

No commits in common. "40d7f0185b05a12717a0ea3510f5b24a1aabadc7" and "2fc24bb937f7fa06f6ffd324e841e3d5809e118b" have entirely different histories.

16 changed files with 450 additions and 2228 deletions

View File

@ -62,7 +62,7 @@ func New(cfg *config.Config, logger *logger.Logger) (*Database, error) {
// Add connection parameters for go-sqlite3 // Add connection parameters for go-sqlite3
// Enable WAL mode and other performance optimizations // Enable WAL mode and other performance optimizations
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=OFF&cache=shared", dbPath) dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&cache=shared", dbPath)
db, err := sql.Open("sqlite3", dsn) db, err := sql.Open("sqlite3", dsn)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err) return nil, fmt.Errorf("failed to open database: %w", err)
@ -73,10 +73,9 @@ func New(cfg *config.Config, logger *logger.Logger) (*Database, error) {
} }
// Set connection pool parameters // Set connection pool parameters
// Multiple connections for better concurrency // Single connection to avoid locking issues with SQLite
const maxConns = 10 db.SetMaxOpenConns(1)
db.SetMaxOpenConns(maxConns) db.SetMaxIdleConns(1)
db.SetMaxIdleConns(maxConns)
db.SetConnMaxLifetime(0) db.SetConnMaxLifetime(0)
database := &Database{db: db, logger: logger, path: dbPath} database := &Database{db: db, logger: logger, path: dbPath}
@ -90,22 +89,19 @@ func New(cfg *config.Config, logger *logger.Logger) (*Database, error) {
// Initialize creates the database schema if it doesn't exist. // Initialize creates the database schema if it doesn't exist.
func (d *Database) Initialize() error { func (d *Database) Initialize() error {
// Set SQLite pragmas for extreme performance - prioritize speed over durability // Set SQLite pragmas for better performance
// WARNING: These settings trade durability for speed
pragmas := []string{ pragmas := []string{
"PRAGMA journal_mode=WAL", // Write-Ahead Logging "PRAGMA journal_mode=WAL", // Write-Ahead Logging
"PRAGMA synchronous=OFF", // Don't wait for disk writes "PRAGMA synchronous=OFF", // Don't wait for disk writes - RISKY but FAST
"PRAGMA cache_size=-8388608", // 8GB cache (negative = KB) "PRAGMA cache_size=-1048576", // 1GB cache (negative = KB)
"PRAGMA temp_store=MEMORY", // Use memory for temp tables "PRAGMA temp_store=MEMORY", // Use memory for temp tables
"PRAGMA mmap_size=10737418240", // 10GB memory-mapped I/O "PRAGMA mmap_size=536870912", // 512MB memory-mapped I/O
"PRAGMA page_size=8192", // 8KB pages for better performance "PRAGMA wal_autocheckpoint=10000", // Checkpoint every 10000 pages (less frequent)
"PRAGMA wal_autocheckpoint=100000", // Checkpoint every 100k pages (800MB) "PRAGMA wal_checkpoint(PASSIVE)", // Checkpoint now
"PRAGMA wal_checkpoint(TRUNCATE)", // Checkpoint and truncate WAL now "PRAGMA page_size=8192", // Larger page size for better performance
"PRAGMA busy_timeout=5000", // 5 second busy timeout "PRAGMA busy_timeout=30000", // 30 second busy timeout
"PRAGMA locking_mode=NORMAL", // Normal locking for multiple connections "PRAGMA optimize", // Run optimizer
"PRAGMA read_uncommitted=true", // Allow dirty reads
"PRAGMA analysis_limit=0", // Disable automatic ANALYZE
"PRAGMA threads=4", // Use multiple threads for sorting
"PRAGMA cache_spill=false", // Keep cache in memory, don't spill to disk
} }
for _, pragma := range pragmas { for _, pragma := range pragmas {
@ -115,17 +111,8 @@ func (d *Database) Initialize() error {
} }
err := d.exec(dbSchema) err := d.exec(dbSchema)
if err != nil {
return err return err
}
// Run VACUUM on startup to optimize database
d.logger.Info("Running VACUUM to optimize database (this may take a moment)")
if err := d.exec("VACUUM"); err != nil {
d.logger.Warn("Failed to VACUUM database", "error", err)
}
return nil
} }
// Close closes the database connection. // Close closes the database connection.
@ -143,241 +130,6 @@ func (d *Database) beginTx() (*loggingTx, error) {
return &loggingTx{Tx: tx, logger: d.logger}, nil return &loggingTx{Tx: tx, logger: d.logger}, nil
} }
// UpsertLiveRouteBatch inserts or updates multiple live routes in a single transaction
func (d *Database) UpsertLiveRouteBatch(routes []*LiveRoute) error {
if len(routes) == 0 {
return nil
}
tx, err := d.beginTx()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer func() {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
d.logger.Error("Failed to rollback transaction", "error", err)
}
}()
// Use prepared statement for better performance
query := `
INSERT INTO live_routes (id, prefix, mask_length, ip_version, origin_asn, peer_ip, as_path, next_hop,
last_updated, v4_ip_start, v4_ip_end)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(prefix, origin_asn, peer_ip) DO UPDATE SET
mask_length = excluded.mask_length,
ip_version = excluded.ip_version,
as_path = excluded.as_path,
next_hop = excluded.next_hop,
last_updated = excluded.last_updated,
v4_ip_start = excluded.v4_ip_start,
v4_ip_end = excluded.v4_ip_end
`
stmt, err := tx.Prepare(query)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer func() { _ = stmt.Close() }()
for _, route := range routes {
// Encode AS path as JSON
pathJSON, err := json.Marshal(route.ASPath)
if err != nil {
return fmt.Errorf("failed to encode AS path: %w", err)
}
// Convert v4_ip_start and v4_ip_end to interface{} for SQL NULL handling
var v4Start, v4End interface{}
if route.V4IPStart != nil {
v4Start = *route.V4IPStart
}
if route.V4IPEnd != nil {
v4End = *route.V4IPEnd
}
_, err = stmt.Exec(
route.ID.String(),
route.Prefix,
route.MaskLength,
route.IPVersion,
route.OriginASN,
route.PeerIP,
string(pathJSON),
route.NextHop,
route.LastUpdated,
v4Start,
v4End,
)
if err != nil {
return fmt.Errorf("failed to upsert route %s: %w", route.Prefix, err)
}
}
if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
// DeleteLiveRouteBatch deletes multiple live routes in a single transaction
func (d *Database) DeleteLiveRouteBatch(deletions []LiveRouteDeletion) error {
if len(deletions) == 0 {
return nil
}
tx, err := d.beginTx()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer func() {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
d.logger.Error("Failed to rollback transaction", "error", err)
}
}()
// Separate deletions by type and use prepared statements
var withOrigin []LiveRouteDeletion
var withoutOrigin []LiveRouteDeletion
for _, del := range deletions {
if del.OriginASN == 0 {
withoutOrigin = append(withoutOrigin, del)
} else {
withOrigin = append(withOrigin, del)
}
}
// Process deletions with origin ASN
if len(withOrigin) > 0 {
stmt, err := tx.Prepare(`DELETE FROM live_routes WHERE prefix = ? AND origin_asn = ? AND peer_ip = ?`)
if err != nil {
return fmt.Errorf("failed to prepare delete statement: %w", err)
}
defer func() { _ = stmt.Close() }()
for _, del := range withOrigin {
_, err = stmt.Exec(del.Prefix, del.OriginASN, del.PeerIP)
if err != nil {
return fmt.Errorf("failed to delete route %s: %w", del.Prefix, err)
}
}
}
// Process deletions without origin ASN
if len(withoutOrigin) > 0 {
stmt, err := tx.Prepare(`DELETE FROM live_routes WHERE prefix = ? AND peer_ip = ?`)
if err != nil {
return fmt.Errorf("failed to prepare delete statement: %w", err)
}
defer func() { _ = stmt.Close() }()
for _, del := range withoutOrigin {
_, err = stmt.Exec(del.Prefix, del.PeerIP)
if err != nil {
return fmt.Errorf("failed to delete route %s: %w", del.Prefix, err)
}
}
}
if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
// GetOrCreateASNBatch creates or updates multiple ASNs in a single transaction
func (d *Database) GetOrCreateASNBatch(asns map[int]time.Time) error {
if len(asns) == 0 {
return nil
}
tx, err := d.beginTx()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer func() {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
d.logger.Error("Failed to rollback transaction", "error", err)
}
}()
// Prepare statements
selectStmt, err := tx.Prepare(
"SELECT id, number, handle, description, first_seen, last_seen FROM asns WHERE number = ?")
if err != nil {
return fmt.Errorf("failed to prepare select statement: %w", err)
}
defer func() { _ = selectStmt.Close() }()
updateStmt, err := tx.Prepare("UPDATE asns SET last_seen = ? WHERE id = ?")
if err != nil {
return fmt.Errorf("failed to prepare update statement: %w", err)
}
defer func() { _ = updateStmt.Close() }()
insertStmt, err := tx.Prepare(
"INSERT INTO asns (id, number, handle, description, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare insert statement: %w", err)
}
defer func() { _ = insertStmt.Close() }()
for number, timestamp := range asns {
var asn ASN
var idStr string
var handle, description sql.NullString
err = selectStmt.QueryRow(number).Scan(&idStr, &asn.Number, &handle, &description, &asn.FirstSeen, &asn.LastSeen)
if err == nil {
// ASN exists, update last_seen
asn.ID, _ = uuid.Parse(idStr)
_, err = updateStmt.Exec(timestamp, asn.ID.String())
if err != nil {
return fmt.Errorf("failed to update ASN %d: %w", number, err)
}
continue
}
if err == sql.ErrNoRows {
// ASN doesn't exist, create it
asn = ASN{
ID: generateUUID(),
Number: number,
FirstSeen: timestamp,
LastSeen: timestamp,
}
// Look up ASN info
if info, ok := asinfo.Get(number); ok {
asn.Handle = info.Handle
asn.Description = info.Description
}
_, err = insertStmt.Exec(asn.ID.String(), asn.Number, asn.Handle, asn.Description, asn.FirstSeen, asn.LastSeen)
if err != nil {
return fmt.Errorf("failed to insert ASN %d: %w", number, err)
}
continue
}
if err != nil {
return fmt.Errorf("failed to query ASN %d: %w", number, err)
}
}
if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
// GetOrCreateASN retrieves an existing ASN or creates a new one if it doesn't exist. // GetOrCreateASN retrieves an existing ASN or creates a new one if it doesn't exist.
func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error) { func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error) {
tx, err := d.beginTx() tx, err := d.beginTx()
@ -581,69 +333,6 @@ func (d *Database) RecordPeering(asA, asB int, timestamp time.Time) error {
return nil return nil
} }
// UpdatePeerBatch updates or creates multiple BGP peer records in a single transaction
func (d *Database) UpdatePeerBatch(peers map[string]PeerUpdate) error {
if len(peers) == 0 {
return nil
}
tx, err := d.beginTx()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer func() {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
d.logger.Error("Failed to rollback transaction", "error", err)
}
}()
// Prepare statements
checkStmt, err := tx.Prepare("SELECT EXISTS(SELECT 1 FROM bgp_peers WHERE peer_ip = ?)")
if err != nil {
return fmt.Errorf("failed to prepare check statement: %w", err)
}
defer func() { _ = checkStmt.Close() }()
updateStmt, err := tx.Prepare(
"UPDATE bgp_peers SET peer_asn = ?, last_seen = ?, last_message_type = ? WHERE peer_ip = ?")
if err != nil {
return fmt.Errorf("failed to prepare update statement: %w", err)
}
defer func() { _ = updateStmt.Close() }()
insertStmt, err := tx.Prepare(
"INSERT INTO bgp_peers (id, peer_ip, peer_asn, first_seen, last_seen, last_message_type) VALUES (?, ?, ?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare insert statement: %w", err)
}
defer func() { _ = insertStmt.Close() }()
for _, update := range peers {
var exists bool
err = checkStmt.QueryRow(update.PeerIP).Scan(&exists)
if err != nil {
return fmt.Errorf("failed to check peer %s: %w", update.PeerIP, err)
}
if exists {
_, err = updateStmt.Exec(update.PeerASN, update.Timestamp, update.MessageType, update.PeerIP)
} else {
_, err = insertStmt.Exec(
generateUUID().String(), update.PeerIP, update.PeerASN,
update.Timestamp, update.Timestamp, update.MessageType)
}
if err != nil {
return fmt.Errorf("failed to update peer %s: %w", update.PeerIP, err)
}
}
if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
// UpdatePeer updates or creates a BGP peer record // UpdatePeer updates or creates a BGP peer record
func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error { func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
tx, err := d.beginTx() tx, err := d.beginTx()
@ -1054,115 +743,3 @@ func CalculateIPv4Range(cidr string) (start, end uint32, err error) {
return start, end, nil return start, end, nil
} }
// GetASDetails returns detailed information about an AS including prefixes
func (d *Database) GetASDetails(asn int) (*ASN, []LiveRoute, error) {
// Get AS information
var asnInfo ASN
var idStr string
var handle, description sql.NullString
err := d.db.QueryRow(
"SELECT id, number, handle, description, first_seen, last_seen FROM asns WHERE number = ?",
asn,
).Scan(&idStr, &asnInfo.Number, &handle, &description, &asnInfo.FirstSeen, &asnInfo.LastSeen)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil, fmt.Errorf("%w: AS%d", ErrNoRoute, asn)
}
return nil, nil, fmt.Errorf("failed to query AS: %w", err)
}
asnInfo.ID, _ = uuid.Parse(idStr)
asnInfo.Handle = handle.String
asnInfo.Description = description.String
// Get prefixes announced by this AS (unique prefixes with most recent update)
query := `
SELECT prefix, mask_length, ip_version, MAX(last_updated) as last_updated
FROM live_routes
WHERE origin_asn = ?
GROUP BY prefix, mask_length, ip_version
`
rows, err := d.db.Query(query, asn)
if err != nil {
return &asnInfo, nil, fmt.Errorf("failed to query prefixes: %w", err)
}
defer func() { _ = rows.Close() }()
var prefixes []LiveRoute
for rows.Next() {
var route LiveRoute
var lastUpdatedStr string
err := rows.Scan(&route.Prefix, &route.MaskLength, &route.IPVersion, &lastUpdatedStr)
if err != nil {
d.logger.Error("Failed to scan prefix row", "error", err, "asn", asn)
continue
}
// Parse the timestamp string
route.LastUpdated, err = time.Parse("2006-01-02 15:04:05-07:00", lastUpdatedStr)
if err != nil {
// Try without timezone
route.LastUpdated, err = time.Parse("2006-01-02 15:04:05", lastUpdatedStr)
if err != nil {
d.logger.Error("Failed to parse timestamp", "error", err, "timestamp", lastUpdatedStr)
continue
}
}
route.OriginASN = asn
prefixes = append(prefixes, route)
}
return &asnInfo, prefixes, nil
}
// GetPrefixDetails returns detailed information about a prefix
func (d *Database) GetPrefixDetails(prefix string) ([]LiveRoute, error) {
query := `
SELECT lr.origin_asn, lr.peer_ip, lr.as_path, lr.next_hop, lr.last_updated,
a.handle, a.description
FROM live_routes lr
LEFT JOIN asns a ON a.number = lr.origin_asn
WHERE lr.prefix = ?
ORDER BY lr.origin_asn, lr.peer_ip
`
rows, err := d.db.Query(query, prefix)
if err != nil {
return nil, fmt.Errorf("failed to query prefix details: %w", err)
}
defer func() { _ = rows.Close() }()
var routes []LiveRoute
for rows.Next() {
var route LiveRoute
var pathJSON string
var handle, description sql.NullString
err := rows.Scan(
&route.OriginASN, &route.PeerIP, &pathJSON, &route.NextHop,
&route.LastUpdated, &handle, &description,
)
if err != nil {
continue
}
// Decode AS path
if err := json.Unmarshal([]byte(pathJSON), &route.ASPath); err != nil {
route.ASPath = []int{}
}
route.Prefix = prefix
routes = append(routes, route)
}
if len(routes) == 0 {
return nil, fmt.Errorf("%w: %s", ErrNoRoute, prefix)
}
return routes, nil
}

View File

@ -22,7 +22,6 @@ type Stats struct {
type Store interface { type Store interface {
// ASN operations // ASN operations
GetOrCreateASN(number int, timestamp time.Time) (*ASN, error) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
GetOrCreateASNBatch(asns map[int]time.Time) error
// Prefix operations // Prefix operations
GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefix, error) GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefix, error)
@ -38,23 +37,16 @@ type Store interface {
// Peer operations // Peer operations
UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error
UpdatePeerBatch(peers map[string]PeerUpdate) error
// Live route operations // Live route operations
UpsertLiveRoute(route *LiveRoute) error UpsertLiveRoute(route *LiveRoute) error
UpsertLiveRouteBatch(routes []*LiveRoute) error
DeleteLiveRoute(prefix string, originASN int, peerIP string) error DeleteLiveRoute(prefix string, originASN int, peerIP string) error
DeleteLiveRouteBatch(deletions []LiveRouteDeletion) error
GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error) GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error)
GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error)
// IP lookup operations // IP lookup operations
GetASInfoForIP(ip string) (*ASInfo, error) GetASInfoForIP(ip string) (*ASInfo, error)
// AS and prefix detail operations
GetASDetails(asn int) (*ASN, []LiveRoute, error)
GetPrefixDetails(prefix string) ([]LiveRoute, error)
// Lifecycle // Lifecycle
Close() error Close() error
} }

View File

@ -77,18 +77,3 @@ type ASInfo struct {
LastUpdated time.Time `json:"last_updated"` LastUpdated time.Time `json:"last_updated"`
Age string `json:"age"` Age string `json:"age"`
} }
// LiveRouteDeletion represents parameters for deleting a live route
type LiveRouteDeletion struct {
Prefix string
OriginASN int
PeerIP string
}
// PeerUpdate represents parameters for updating a peer
type PeerUpdate struct {
PeerIP string
PeerASN int
MessageType string
Timestamp time.Time
}

View File

@ -139,10 +139,6 @@ func (rw *RouteWatch) Shutdown() {
rw.logger.Info("Flushing prefix handler") rw.logger.Info("Flushing prefix handler")
rw.prefixHandler.Stop() rw.prefixHandler.Stop()
} }
if rw.peeringHandler != nil {
rw.logger.Info("Flushing peering handler")
rw.peeringHandler.Stop()
}
// Stop services // Stop services
rw.streamer.Stop() rw.streamer.Stop()

View File

@ -201,84 +201,6 @@ func (m *mockStore) GetASInfoForIP(ip string) (*database.ASInfo, error) {
}, nil }, nil
} }
// GetASDetails mock implementation
func (m *mockStore) GetASDetails(asn int) (*database.ASN, []database.LiveRoute, error) {
m.mu.Lock()
defer m.mu.Unlock()
// Check if ASN exists
if asnInfo, exists := m.ASNs[asn]; exists {
// Return empty prefixes for now
return asnInfo, []database.LiveRoute{}, nil
}
return nil, nil, database.ErrNoRoute
}
// GetPrefixDetails mock implementation
func (m *mockStore) GetPrefixDetails(prefix string) ([]database.LiveRoute, error) {
// Return empty routes for now
return []database.LiveRoute{}, nil
}
// UpsertLiveRouteBatch mock implementation
func (m *mockStore) UpsertLiveRouteBatch(routes []*database.LiveRoute) error {
m.mu.Lock()
defer m.mu.Unlock()
for _, route := range routes {
// Track prefix
if _, exists := m.Prefixes[route.Prefix]; !exists {
m.Prefixes[route.Prefix] = &database.Prefix{
ID: uuid.New(),
Prefix: route.Prefix,
IPVersion: route.IPVersion,
FirstSeen: route.LastUpdated,
LastSeen: route.LastUpdated,
}
m.PrefixCount++
if route.IPVersion == 4 {
m.IPv4Prefixes++
} else {
m.IPv6Prefixes++
}
}
m.RouteCount++
}
return nil
}
// DeleteLiveRouteBatch mock implementation
func (m *mockStore) DeleteLiveRouteBatch(deletions []database.LiveRouteDeletion) error {
// Simple mock - just return nil
return nil
}
// GetOrCreateASNBatch mock implementation
func (m *mockStore) GetOrCreateASNBatch(asns map[int]time.Time) error {
m.mu.Lock()
defer m.mu.Unlock()
for number, timestamp := range asns {
if _, exists := m.ASNs[number]; !exists {
m.ASNs[number] = &database.ASN{
ID: uuid.New(),
Number: number,
FirstSeen: timestamp,
LastSeen: timestamp,
}
m.ASNCount++
}
}
return nil
}
// UpdatePeerBatch mock implementation
func (m *mockStore) UpdatePeerBatch(peers map[string]database.PeerUpdate) error {
// Simple mock - just return nil
return nil
}
func TestRouteWatchLiveFeed(t *testing.T) { func TestRouteWatchLiveFeed(t *testing.T) {
// Create mock database // Create mock database

View File

@ -144,9 +144,11 @@ func (h *ASHandler) flushBatchLocked() {
} }
} }
// Process all ASNs in a single batch transaction for asn, ts := range asnMap {
if err := h.db.GetOrCreateASNBatch(asnMap); err != nil { _, err := h.db.GetOrCreateASN(asn, ts)
h.logger.Error("Failed to process ASN batch", "error", err, "count", len(asnMap)) if err != nil {
h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err)
}
} }
// Clear batch // Clear batch

View File

@ -4,8 +4,6 @@ import (
"context" "context"
"os" "os"
"os/signal" "os/signal"
"runtime"
"strings"
"syscall" "syscall"
"time" "time"
@ -16,53 +14,19 @@ import (
const ( const (
// shutdownTimeout is the maximum time allowed for graceful shutdown // shutdownTimeout is the maximum time allowed for graceful shutdown
shutdownTimeout = 60 * time.Second shutdownTimeout = 60 * time.Second
// debugInterval is how often to log debug stats
debugInterval = 60 * time.Second
// bytesPerMB is bytes per megabyte
bytesPerMB = 1024 * 1024
) )
// logDebugStats logs goroutine count and memory usage
func logDebugStats(logger *logger.Logger) {
// Only run if DEBUG env var contains "routewatch"
debugEnv := os.Getenv("DEBUG")
if !strings.Contains(debugEnv, "routewatch") {
return
}
ticker := time.NewTicker(debugInterval)
defer ticker.Stop()
for range ticker.C {
var m runtime.MemStats
runtime.ReadMemStats(&m)
logger.Debug("System stats",
"goroutines", runtime.NumGoroutine(),
"alloc_mb", m.Alloc/bytesPerMB,
"total_alloc_mb", m.TotalAlloc/bytesPerMB,
"sys_mb", m.Sys/bytesPerMB,
"num_gc", m.NumGC,
"heap_alloc_mb", m.HeapAlloc/bytesPerMB,
"heap_sys_mb", m.HeapSys/bytesPerMB,
"heap_idle_mb", m.HeapIdle/bytesPerMB,
"heap_inuse_mb", m.HeapInuse/bytesPerMB,
"heap_released_mb", m.HeapReleased/bytesPerMB,
"stack_inuse_mb", m.StackInuse/bytesPerMB,
)
}
}
// CLIEntry is the main entry point for the CLI // CLIEntry is the main entry point for the CLI
func CLIEntry() { func CLIEntry() {
app := fx.New( app := fx.New(
getModule(), getModule(),
fx.StopTimeout(shutdownTimeout), // Allow 60 seconds for graceful shutdown fx.StopTimeout(shutdownTimeout), // Allow 60 seconds for graceful shutdown
fx.Invoke(func(lc fx.Lifecycle, rw *RouteWatch, logger *logger.Logger, shutdowner fx.Shutdowner) { fx.Invoke(func(lc fx.Lifecycle, rw *RouteWatch, logger *logger.Logger) {
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error { OnStart: func(_ context.Context) error {
// Start debug stats logging go func() {
go logDebugStats(logger) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle shutdown signals // Handle shutdown signals
sigCh := make(chan os.Signal, 1) sigCh := make(chan os.Signal, 1)
@ -71,12 +35,9 @@ func CLIEntry() {
go func() { go func() {
<-sigCh <-sigCh
logger.Info("Received shutdown signal") logger.Info("Received shutdown signal")
if err := shutdowner.Shutdown(); err != nil { cancel()
logger.Error("Failed to shutdown gracefully", "error", err)
}
}() }()
go func() {
if err := rw.Run(ctx); err != nil { if err := rw.Run(ctx); err != nil {
logger.Error("RouteWatch error", "error", err) logger.Error("RouteWatch error", "error", err)
} }

View File

@ -135,22 +135,18 @@ func (h *PeerHandler) flushBatchLocked() {
} }
} }
// Convert to database format // Apply updates
dbPeerMap := make(map[string]database.PeerUpdate) for _, update := range peerMap {
for peerIP, update := range peerMap { if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil {
dbPeerMap[peerIP] = database.PeerUpdate{ h.logger.Error("Failed to update peer",
PeerIP: update.peerIP, "peer", update.peerIP,
PeerASN: update.peerASN, "peer_asn", update.peerASN,
MessageType: update.messageType, "message_type", update.messageType,
Timestamp: update.timestamp, "error", err,
)
} }
} }
// Process all peers in a single batch transaction
if err := h.db.UpdatePeerBatch(dbPeerMap); err != nil {
h.logger.Error("Failed to process peer batch", "error", err, "count", len(dbPeerMap))
}
// Clear batch // Clear batch
h.peerBatch = h.peerBatch[:0] h.peerBatch = h.peerBatch[:0]
h.lastFlush = time.Now() h.lastFlush = time.Now()

View File

@ -18,10 +18,10 @@ const (
prefixHandlerQueueSize = 100000 prefixHandlerQueueSize = 100000
// prefixBatchSize is the number of prefix updates to batch together // prefixBatchSize is the number of prefix updates to batch together
prefixBatchSize = 5000 prefixBatchSize = 25000
// prefixBatchTimeout is the maximum time to wait before flushing a batch // prefixBatchTimeout is the maximum time to wait before flushing a batch
prefixBatchTimeout = 1 * time.Second prefixBatchTimeout = 2 * time.Second
// IP version constants // IP version constants
ipv4Version = 4 ipv4Version = 4
@ -163,9 +163,6 @@ func (h *PrefixHandler) flushBatchLocked() {
return return
} }
startTime := time.Now()
batchSize := len(h.batch)
// Group updates by prefix to deduplicate // Group updates by prefix to deduplicate
// For each prefix, keep the latest update // For each prefix, keep the latest update
prefixMap := make(map[string]prefixUpdate) prefixMap := make(map[string]prefixUpdate)
@ -176,55 +173,27 @@ func (h *PrefixHandler) flushBatchLocked() {
} }
} }
// Collect routes to upsert and delete // Apply updates to database
var routesToUpsert []*database.LiveRoute
var routesToDelete []database.LiveRouteDeletion
// Skip the prefix table updates entirely - just update live_routes
// The prefix table is not critical for routing lookups
for _, update := range prefixMap { for _, update := range prefixMap {
if update.messageType == "announcement" && update.originASN > 0 { // Get or create prefix (this maintains the prefixes table)
// Create live route for batch upsert prefix, err := h.db.GetOrCreatePrefix(update.prefix, update.timestamp)
route := h.createLiveRoute(update) if err != nil {
if route != nil { h.logger.Error("Failed to get/create prefix",
routesToUpsert = append(routesToUpsert, route) "prefix", update.prefix,
} "error", err,
} else if update.messageType == "withdrawal" {
// Create deletion record for batch delete
routesToDelete = append(routesToDelete, database.LiveRouteDeletion{
Prefix: update.prefix,
OriginASN: update.originASN,
PeerIP: update.peer,
})
}
}
// Process batch operations
successCount := 0
if len(routesToUpsert) > 0 {
if err := h.db.UpsertLiveRouteBatch(routesToUpsert); err != nil {
h.logger.Error("Failed to upsert route batch", "error", err, "count", len(routesToUpsert))
} else {
successCount += len(routesToUpsert)
}
}
if len(routesToDelete) > 0 {
if err := h.db.DeleteLiveRouteBatch(routesToDelete); err != nil {
h.logger.Error("Failed to delete route batch", "error", err, "count", len(routesToDelete))
} else {
successCount += len(routesToDelete)
}
}
elapsed := time.Since(startTime)
h.logger.Debug("Flushed prefix batch",
"batch_size", batchSize,
"unique_prefixes", len(prefixMap),
"success", successCount,
"duration_ms", elapsed.Milliseconds(),
) )
continue
}
// For announcements, get ASN info and create announcement record
if update.messageType == "announcement" && update.originASN > 0 {
h.processAnnouncement(prefix, update)
} else if update.messageType == "withdrawal" {
h.processWithdrawal(prefix, update)
}
}
// Clear batch // Clear batch
h.batch = h.batch[:0] h.batch = h.batch[:0]
h.lastFlush = time.Now() h.lastFlush = time.Now()
@ -246,7 +215,6 @@ func parseCIDR(prefix string) (maskLength int, ipVersion int, err error) {
} }
// processAnnouncement handles storing an announcement in the database // processAnnouncement handles storing an announcement in the database
// nolint:unused // kept for potential future use
func (h *PrefixHandler) processAnnouncement(_ *database.Prefix, update prefixUpdate) { func (h *PrefixHandler) processAnnouncement(_ *database.Prefix, update prefixUpdate) {
// Parse CIDR to get mask length // Parse CIDR to get mask length
maskLength, ipVersion, err := parseCIDR(update.prefix) maskLength, ipVersion, err := parseCIDR(update.prefix)
@ -303,143 +271,7 @@ func (h *PrefixHandler) processAnnouncement(_ *database.Prefix, update prefixUpd
} }
} }
// createLiveRoute creates a LiveRoute from a prefix update
func (h *PrefixHandler) createLiveRoute(update prefixUpdate) *database.LiveRoute {
// Parse CIDR to get mask length
maskLength, ipVersion, err := parseCIDR(update.prefix)
if err != nil {
h.logger.Error("Failed to parse CIDR",
"prefix", update.prefix,
"error", err,
)
return nil
}
// Track route update metrics
if h.metrics != nil {
if ipVersion == ipv4Version {
h.metrics.RecordIPv4Update()
} else {
h.metrics.RecordIPv6Update()
}
}
// Create live route record
liveRoute := &database.LiveRoute{
ID: uuid.New(),
Prefix: update.prefix,
MaskLength: maskLength,
IPVersion: ipVersion,
OriginASN: update.originASN,
PeerIP: update.peer,
ASPath: update.path,
NextHop: update.peer, // Using peer as next hop
LastUpdated: update.timestamp,
}
// For IPv4, calculate the IP range
if ipVersion == ipv4Version {
start, end, err := database.CalculateIPv4Range(update.prefix)
if err == nil {
liveRoute.V4IPStart = &start
liveRoute.V4IPEnd = &end
} else {
h.logger.Error("Failed to calculate IPv4 range",
"prefix", update.prefix,
"error", err,
)
}
}
return liveRoute
}
// processAnnouncementDirect handles storing an announcement directly without prefix table
// nolint:unused // kept for potential future use
func (h *PrefixHandler) processAnnouncementDirect(update prefixUpdate) {
// Parse CIDR to get mask length
maskLength, ipVersion, err := parseCIDR(update.prefix)
if err != nil {
h.logger.Error("Failed to parse CIDR",
"prefix", update.prefix,
"error", err,
)
return
}
// Track route update metrics
if h.metrics != nil {
if ipVersion == ipv4Version {
h.metrics.RecordIPv4Update()
} else {
h.metrics.RecordIPv6Update()
}
}
// Create live route record
liveRoute := &database.LiveRoute{
ID: uuid.New(),
Prefix: update.prefix,
MaskLength: maskLength,
IPVersion: ipVersion,
OriginASN: update.originASN,
PeerIP: update.peer,
ASPath: update.path,
NextHop: update.peer, // Using peer as next hop
LastUpdated: update.timestamp,
}
// For IPv4, calculate the IP range
if ipVersion == ipv4Version {
start, end, err := database.CalculateIPv4Range(update.prefix)
if err == nil {
liveRoute.V4IPStart = &start
liveRoute.V4IPEnd = &end
} else {
h.logger.Error("Failed to calculate IPv4 range",
"prefix", update.prefix,
"error", err,
)
}
}
if err := h.db.UpsertLiveRoute(liveRoute); err != nil {
h.logger.Error("Failed to upsert live route",
"prefix", update.prefix,
"error", err,
)
}
}
// processWithdrawalDirect handles removing a route directly without prefix table
// nolint:unused // kept for potential future use
func (h *PrefixHandler) processWithdrawalDirect(update prefixUpdate) {
// For withdrawals, we need to delete the route from live_routes
if update.originASN > 0 {
if err := h.db.DeleteLiveRoute(update.prefix, update.originASN, update.peer); err != nil {
h.logger.Error("Failed to delete live route",
"prefix", update.prefix,
"origin_asn", update.originASN,
"peer", update.peer,
"error", err,
)
}
} else {
// If no origin ASN, just delete all routes for this prefix from this peer
if err := h.db.DeleteLiveRoute(update.prefix, 0, update.peer); err != nil {
h.logger.Error("Failed to delete live route (no origin ASN)",
"prefix", update.prefix,
"peer", update.peer,
"error", err,
)
}
}
}
// processWithdrawal handles removing a route from the live routing table // processWithdrawal handles removing a route from the live routing table
// nolint:unused // kept for potential future use
func (h *PrefixHandler) processWithdrawal(_ *database.Prefix, update prefixUpdate) { func (h *PrefixHandler) processWithdrawal(_ *database.Prefix, update prefixUpdate) {
// For withdrawals, we need to delete the route from live_routes // For withdrawals, we need to delete the route from live_routes
// Since we have the origin ASN from the update, we can delete the specific route // Since we have the origin ASN from the update, we can delete the specific route

View File

@ -1,704 +0,0 @@
package server
import (
"bytes"
"context"
"encoding/json"
"errors"
"net"
"net/http"
"net/url"
"runtime"
"sort"
"strconv"
"time"
"git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/templates"
"github.com/dustin/go-humanize"
"github.com/go-chi/chi/v5"
)
// handleRoot returns a handler that redirects to /status
func (s *Server) handleRoot() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/status", http.StatusSeeOther)
}
}
// writeJSONError writes a standardized JSON error response
func writeJSONError(w http.ResponseWriter, statusCode int, message string) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
_ = json.NewEncoder(w).Encode(map[string]interface{}{
"status": "error",
"error": map[string]interface{}{
"msg": message,
"code": statusCode,
},
})
}
// writeJSONSuccess writes a standardized JSON success response
func writeJSONSuccess(w http.ResponseWriter, data interface{}) error {
w.Header().Set("Content-Type", "application/json")
return json.NewEncoder(w).Encode(map[string]interface{}{
"status": "ok",
"data": data,
})
}
// handleStatusJSON returns a handler that serves JSON statistics
func (s *Server) handleStatusJSON() http.HandlerFunc {
// Stats represents the statistics response
type Stats struct {
Uptime string `json:"uptime"`
TotalMessages uint64 `json:"total_messages"`
TotalBytes uint64 `json:"total_bytes"`
MessagesPerSec float64 `json:"messages_per_sec"`
MbitsPerSec float64 `json:"mbits_per_sec"`
Connected bool `json:"connected"`
GoVersion string `json:"go_version"`
Goroutines int `json:"goroutines"`
MemoryUsage string `json:"memory_usage"`
ASNs int `json:"asns"`
Prefixes int `json:"prefixes"`
IPv4Prefixes int `json:"ipv4_prefixes"`
IPv6Prefixes int `json:"ipv6_prefixes"`
Peerings int `json:"peerings"`
Peers int `json:"peers"`
DatabaseSizeBytes int64 `json:"database_size_bytes"`
LiveRoutes int `json:"live_routes"`
IPv4Routes int `json:"ipv4_routes"`
IPv6Routes int `json:"ipv6_routes"`
IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"`
IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"`
IPv4PrefixDistribution []database.PrefixDistribution `json:"ipv4_prefix_distribution"`
IPv6PrefixDistribution []database.PrefixDistribution `json:"ipv6_prefix_distribution"`
}
return func(w http.ResponseWriter, r *http.Request) {
// Create a 1 second timeout context for this request
ctx, cancel := context.WithTimeout(r.Context(), 1*time.Second)
defer cancel()
metrics := s.streamer.GetMetrics()
// Get database stats with timeout
statsChan := make(chan database.Stats)
errChan := make(chan error)
go func() {
dbStats, err := s.db.GetStats()
if err != nil {
s.logger.Debug("Database stats query failed", "error", err)
errChan <- err
return
}
statsChan <- dbStats
}()
var dbStats database.Stats
select {
case <-ctx.Done():
s.logger.Error("Database stats timeout in status.json")
writeJSONError(w, http.StatusRequestTimeout, "Database timeout")
return
case err := <-errChan:
s.logger.Error("Failed to get database stats", "error", err)
writeJSONError(w, http.StatusInternalServerError, err.Error())
return
case dbStats = <-statsChan:
// Success
}
uptime := time.Since(metrics.ConnectedSince).Truncate(time.Second).String()
if metrics.ConnectedSince.IsZero() {
uptime = "0s"
}
const bitsPerMegabit = 1000000.0
// Get route counts from database
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts()
if err != nil {
s.logger.Warn("Failed to get live route counts", "error", err)
// Continue with zero counts
}
// Get route update metrics
routeMetrics := s.streamer.GetMetricsTracker().GetRouteMetrics()
// Get memory stats
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
stats := Stats{
Uptime: uptime,
TotalMessages: metrics.TotalMessages,
TotalBytes: metrics.TotalBytes,
MessagesPerSec: metrics.MessagesPerSec,
MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit,
Connected: metrics.Connected,
GoVersion: runtime.Version(),
Goroutines: runtime.NumGoroutine(),
MemoryUsage: humanize.Bytes(memStats.Alloc),
ASNs: dbStats.ASNs,
Prefixes: dbStats.Prefixes,
IPv4Prefixes: dbStats.IPv4Prefixes,
IPv6Prefixes: dbStats.IPv6Prefixes,
Peerings: dbStats.Peerings,
Peers: dbStats.Peers,
DatabaseSizeBytes: dbStats.FileSizeBytes,
LiveRoutes: dbStats.LiveRoutes,
IPv4Routes: ipv4Routes,
IPv6Routes: ipv6Routes,
IPv4UpdatesPerSec: routeMetrics.IPv4UpdatesPerSec,
IPv6UpdatesPerSec: routeMetrics.IPv6UpdatesPerSec,
IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution,
IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution,
}
if err := writeJSONSuccess(w, stats); err != nil {
s.logger.Error("Failed to encode stats", "error", err)
}
}
}
// handleStats returns a handler that serves API v1 statistics
func (s *Server) handleStats() http.HandlerFunc {
// HandlerStatsInfo represents handler statistics in the API response
type HandlerStatsInfo struct {
Name string `json:"name"`
QueueLength int `json:"queue_length"`
QueueCapacity int `json:"queue_capacity"`
ProcessedCount uint64 `json:"processed_count"`
DroppedCount uint64 `json:"dropped_count"`
AvgProcessTimeMs float64 `json:"avg_process_time_ms"`
MinProcessTimeMs float64 `json:"min_process_time_ms"`
MaxProcessTimeMs float64 `json:"max_process_time_ms"`
}
// StatsResponse represents the API statistics response
type StatsResponse struct {
Uptime string `json:"uptime"`
TotalMessages uint64 `json:"total_messages"`
TotalBytes uint64 `json:"total_bytes"`
MessagesPerSec float64 `json:"messages_per_sec"`
MbitsPerSec float64 `json:"mbits_per_sec"`
Connected bool `json:"connected"`
GoVersion string `json:"go_version"`
Goroutines int `json:"goroutines"`
MemoryUsage string `json:"memory_usage"`
ASNs int `json:"asns"`
Prefixes int `json:"prefixes"`
IPv4Prefixes int `json:"ipv4_prefixes"`
IPv6Prefixes int `json:"ipv6_prefixes"`
Peerings int `json:"peerings"`
Peers int `json:"peers"`
DatabaseSizeBytes int64 `json:"database_size_bytes"`
LiveRoutes int `json:"live_routes"`
IPv4Routes int `json:"ipv4_routes"`
IPv6Routes int `json:"ipv6_routes"`
IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"`
IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"`
HandlerStats []HandlerStatsInfo `json:"handler_stats"`
IPv4PrefixDistribution []database.PrefixDistribution `json:"ipv4_prefix_distribution"`
IPv6PrefixDistribution []database.PrefixDistribution `json:"ipv6_prefix_distribution"`
}
return func(w http.ResponseWriter, r *http.Request) {
// Create a 1 second timeout context for this request
ctx, cancel := context.WithTimeout(r.Context(), 1*time.Second)
defer cancel()
// Check if context is already cancelled
select {
case <-ctx.Done():
http.Error(w, "Request timeout", http.StatusRequestTimeout)
return
default:
}
metrics := s.streamer.GetMetrics()
// Get database stats with timeout
statsChan := make(chan database.Stats)
errChan := make(chan error)
go func() {
dbStats, err := s.db.GetStats()
if err != nil {
s.logger.Debug("Database stats query failed", "error", err)
errChan <- err
return
}
statsChan <- dbStats
}()
var dbStats database.Stats
select {
case <-ctx.Done():
s.logger.Error("Database stats timeout")
http.Error(w, "Database timeout", http.StatusRequestTimeout)
return
case err := <-errChan:
s.logger.Error("Failed to get database stats", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
case dbStats = <-statsChan:
// Success
}
uptime := time.Since(metrics.ConnectedSince).Truncate(time.Second).String()
if metrics.ConnectedSince.IsZero() {
uptime = "0s"
}
const bitsPerMegabit = 1000000.0
// Get route counts from database
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts()
if err != nil {
s.logger.Warn("Failed to get live route counts", "error", err)
// Continue with zero counts
}
// Get route update metrics
routeMetrics := s.streamer.GetMetricsTracker().GetRouteMetrics()
// Get handler stats
handlerStats := s.streamer.GetHandlerStats()
handlerStatsInfo := make([]HandlerStatsInfo, 0, len(handlerStats))
const microsecondsPerMillisecond = 1000.0
for _, hs := range handlerStats {
handlerStatsInfo = append(handlerStatsInfo, HandlerStatsInfo{
Name: hs.Name,
QueueLength: hs.QueueLength,
QueueCapacity: hs.QueueCapacity,
ProcessedCount: hs.ProcessedCount,
DroppedCount: hs.DroppedCount,
AvgProcessTimeMs: float64(hs.AvgProcessTime.Microseconds()) / microsecondsPerMillisecond,
MinProcessTimeMs: float64(hs.MinProcessTime.Microseconds()) / microsecondsPerMillisecond,
MaxProcessTimeMs: float64(hs.MaxProcessTime.Microseconds()) / microsecondsPerMillisecond,
})
}
// Get memory stats
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
stats := StatsResponse{
Uptime: uptime,
TotalMessages: metrics.TotalMessages,
TotalBytes: metrics.TotalBytes,
MessagesPerSec: metrics.MessagesPerSec,
MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit,
Connected: metrics.Connected,
GoVersion: runtime.Version(),
Goroutines: runtime.NumGoroutine(),
MemoryUsage: humanize.Bytes(memStats.Alloc),
ASNs: dbStats.ASNs,
Prefixes: dbStats.Prefixes,
IPv4Prefixes: dbStats.IPv4Prefixes,
IPv6Prefixes: dbStats.IPv6Prefixes,
Peerings: dbStats.Peerings,
Peers: dbStats.Peers,
DatabaseSizeBytes: dbStats.FileSizeBytes,
LiveRoutes: dbStats.LiveRoutes,
IPv4Routes: ipv4Routes,
IPv6Routes: ipv6Routes,
IPv4UpdatesPerSec: routeMetrics.IPv4UpdatesPerSec,
IPv6UpdatesPerSec: routeMetrics.IPv6UpdatesPerSec,
HandlerStats: handlerStatsInfo,
IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution,
IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution,
}
if err := writeJSONSuccess(w, stats); err != nil {
s.logger.Error("Failed to encode stats", "error", err)
}
}
}
// handleStatusHTML returns a handler that serves the HTML status page
func (s *Server) handleStatusHTML() http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
tmpl := templates.StatusTemplate()
if err := tmpl.Execute(w, nil); err != nil {
s.logger.Error("Failed to render template", "error", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}
}
}
// handleIPLookup returns a handler that looks up AS information for an IP address
func (s *Server) handleIPLookup() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ip := chi.URLParam(r, "ip")
if ip == "" {
writeJSONError(w, http.StatusBadRequest, "IP parameter is required")
return
}
// Look up AS information for the IP
asInfo, err := s.db.GetASInfoForIP(ip)
if err != nil {
// Check if it's an invalid IP error
if errors.Is(err, database.ErrInvalidIP) {
writeJSONError(w, http.StatusBadRequest, err.Error())
} else {
// All other errors (including ErrNoRoute) are 404
writeJSONError(w, http.StatusNotFound, err.Error())
}
return
}
// Return successful response
if err := writeJSONSuccess(w, asInfo); err != nil {
s.logger.Error("Failed to encode AS info", "error", err)
}
}
}
// handleASDetailJSON returns AS details as JSON
func (s *Server) handleASDetailJSON() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
asnStr := chi.URLParam(r, "asn")
asn, err := strconv.Atoi(asnStr)
if err != nil {
writeJSONError(w, http.StatusBadRequest, "Invalid ASN")
return
}
asInfo, prefixes, err := s.db.GetASDetails(asn)
if err != nil {
if errors.Is(err, database.ErrNoRoute) {
writeJSONError(w, http.StatusNotFound, err.Error())
} else {
writeJSONError(w, http.StatusInternalServerError, err.Error())
}
return
}
// Group prefixes by IP version
const ipVersionV4 = 4
var ipv4Prefixes, ipv6Prefixes []database.LiveRoute
for _, p := range prefixes {
if p.IPVersion == ipVersionV4 {
ipv4Prefixes = append(ipv4Prefixes, p)
} else {
ipv6Prefixes = append(ipv6Prefixes, p)
}
}
response := map[string]interface{}{
"asn": asInfo,
"ipv4_prefixes": ipv4Prefixes,
"ipv6_prefixes": ipv6Prefixes,
"total_count": len(prefixes),
}
if err := writeJSONSuccess(w, response); err != nil {
s.logger.Error("Failed to encode AS details", "error", err)
}
}
}
// handlePrefixDetailJSON returns prefix details as JSON
func (s *Server) handlePrefixDetailJSON() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
prefixParam := chi.URLParam(r, "prefix")
if prefixParam == "" {
writeJSONError(w, http.StatusBadRequest, "Prefix parameter is required")
return
}
// URL decode the prefix parameter
prefix, err := url.QueryUnescape(prefixParam)
if err != nil {
writeJSONError(w, http.StatusBadRequest, "Invalid prefix parameter")
return
}
routes, err := s.db.GetPrefixDetails(prefix)
if err != nil {
if errors.Is(err, database.ErrNoRoute) {
writeJSONError(w, http.StatusNotFound, err.Error())
} else {
writeJSONError(w, http.StatusInternalServerError, err.Error())
}
return
}
// Group by origin AS
originMap := make(map[int][]database.LiveRoute)
for _, route := range routes {
originMap[route.OriginASN] = append(originMap[route.OriginASN], route)
}
response := map[string]interface{}{
"prefix": prefix,
"routes": routes,
"origins": originMap,
"peer_count": len(routes),
"origin_count": len(originMap),
}
if err := writeJSONSuccess(w, response); err != nil {
s.logger.Error("Failed to encode prefix details", "error", err)
}
}
}
// handleASDetail returns a handler that serves the AS detail HTML page
func (s *Server) handleASDetail() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
asnStr := chi.URLParam(r, "asn")
asn, err := strconv.Atoi(asnStr)
if err != nil {
http.Error(w, "Invalid ASN", http.StatusBadRequest)
return
}
asInfo, prefixes, err := s.db.GetASDetails(asn)
if err != nil {
if errors.Is(err, database.ErrNoRoute) {
http.Error(w, "AS not found", http.StatusNotFound)
} else {
s.logger.Error("Failed to get AS details", "error", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
}
return
}
// Group prefixes by IP version
const ipVersionV4 = 4
var ipv4Prefixes, ipv6Prefixes []database.LiveRoute
for _, p := range prefixes {
if p.IPVersion == ipVersionV4 {
ipv4Prefixes = append(ipv4Prefixes, p)
} else {
ipv6Prefixes = append(ipv6Prefixes, p)
}
}
// Sort prefixes by network address
sort.Slice(ipv4Prefixes, func(i, j int) bool {
// Parse the prefixes to compare network addresses
ipI, netI, _ := net.ParseCIDR(ipv4Prefixes[i].Prefix)
ipJ, netJ, _ := net.ParseCIDR(ipv4Prefixes[j].Prefix)
// Compare by network address first
cmp := bytes.Compare(ipI.To4(), ipJ.To4())
if cmp != 0 {
return cmp < 0
}
// If network addresses are equal, compare by mask length
onesI, _ := netI.Mask.Size()
onesJ, _ := netJ.Mask.Size()
return onesI < onesJ
})
sort.Slice(ipv6Prefixes, func(i, j int) bool {
// Parse the prefixes to compare network addresses
ipI, netI, _ := net.ParseCIDR(ipv6Prefixes[i].Prefix)
ipJ, netJ, _ := net.ParseCIDR(ipv6Prefixes[j].Prefix)
// Compare by network address first
cmp := bytes.Compare(ipI.To16(), ipJ.To16())
if cmp != 0 {
return cmp < 0
}
// If network addresses are equal, compare by mask length
onesI, _ := netI.Mask.Size()
onesJ, _ := netJ.Mask.Size()
return onesI < onesJ
})
// Prepare template data
data := struct {
ASN *database.ASN
IPv4Prefixes []database.LiveRoute
IPv6Prefixes []database.LiveRoute
TotalCount int
IPv4Count int
IPv6Count int
}{
ASN: asInfo,
IPv4Prefixes: ipv4Prefixes,
IPv6Prefixes: ipv6Prefixes,
TotalCount: len(prefixes),
IPv4Count: len(ipv4Prefixes),
IPv6Count: len(ipv6Prefixes),
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
tmpl := templates.ASDetailTemplate()
if err := tmpl.Execute(w, data); err != nil {
s.logger.Error("Failed to render AS detail template", "error", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}
}
}
// handlePrefixDetail returns a handler that serves the prefix detail HTML page
func (s *Server) handlePrefixDetail() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
prefixParam := chi.URLParam(r, "prefix")
if prefixParam == "" {
http.Error(w, "Prefix parameter is required", http.StatusBadRequest)
return
}
// URL decode the prefix parameter
prefix, err := url.QueryUnescape(prefixParam)
if err != nil {
http.Error(w, "Invalid prefix parameter", http.StatusBadRequest)
return
}
routes, err := s.db.GetPrefixDetails(prefix)
if err != nil {
if errors.Is(err, database.ErrNoRoute) {
http.Error(w, "Prefix not found", http.StatusNotFound)
} else {
s.logger.Error("Failed to get prefix details", "error", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
}
return
}
// Group by origin AS and collect unique AS info
type ASNInfo struct {
Number int
Handle string
Description string
PeerCount int
}
originMap := make(map[int]*ASNInfo)
for _, route := range routes {
if _, exists := originMap[route.OriginASN]; !exists {
// Get AS info from database
asInfo, _, _ := s.db.GetASDetails(route.OriginASN)
handle := ""
description := ""
if asInfo != nil {
handle = asInfo.Handle
description = asInfo.Description
}
originMap[route.OriginASN] = &ASNInfo{
Number: route.OriginASN,
Handle: handle,
Description: description,
PeerCount: 0,
}
}
originMap[route.OriginASN].PeerCount++
}
// Get the first route to extract some common info
var maskLength, ipVersion int
if len(routes) > 0 {
// Parse CIDR to get mask length and IP version
_, ipNet, err := net.ParseCIDR(prefix)
if err == nil {
ones, _ := ipNet.Mask.Size()
maskLength = ones
if ipNet.IP.To4() != nil {
ipVersion = 4
} else {
ipVersion = 6
}
}
}
// Convert origin map to sorted slice
var origins []*ASNInfo
for _, origin := range originMap {
origins = append(origins, origin)
}
// Prepare template data
data := struct {
Prefix string
MaskLength int
IPVersion int
Routes []database.LiveRoute
Origins []*ASNInfo
PeerCount int
OriginCount int
}{
Prefix: prefix,
MaskLength: maskLength,
IPVersion: ipVersion,
Routes: routes,
Origins: origins,
PeerCount: len(routes),
OriginCount: len(originMap),
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
tmpl := templates.PrefixDetailTemplate()
if err := tmpl.Execute(w, data); err != nil {
s.logger.Error("Failed to render prefix detail template", "error", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}
}
}
// handleIPRedirect looks up the prefix containing the IP and redirects to its detail page
func (s *Server) handleIPRedirect() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ip := chi.URLParam(r, "ip")
if ip == "" {
http.Error(w, "IP parameter is required", http.StatusBadRequest)
return
}
// Look up AS information for the IP (which includes the prefix)
asInfo, err := s.db.GetASInfoForIP(ip)
if err != nil {
if errors.Is(err, database.ErrInvalidIP) {
http.Error(w, "Invalid IP address", http.StatusBadRequest)
} else if errors.Is(err, database.ErrNoRoute) {
http.Error(w, "No route found for this IP", http.StatusNotFound)
} else {
s.logger.Error("Failed to look up IP", "error", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
}
return
}
// Redirect to the prefix detail page (URL encode the prefix)
http.Redirect(w, r, "/prefix/"+url.QueryEscape(asInfo.Prefix), http.StatusSeeOther)
}
}

View File

@ -1,42 +0,0 @@
package server
import (
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
)
// setupRoutes configures the HTTP routes
func (s *Server) setupRoutes() {
r := chi.NewRouter()
// Middleware
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
const requestTimeout = 2 * time.Second
r.Use(TimeoutMiddleware(requestTimeout))
r.Use(JSONResponseMiddleware)
// Routes
r.Get("/", s.handleRoot())
r.Get("/status", s.handleStatusHTML())
r.Get("/status.json", s.handleStatusJSON())
// AS and prefix detail pages
r.Get("/as/{asn}", s.handleASDetail())
r.Get("/prefix/{prefix}", s.handlePrefixDetail())
r.Get("/ip/{ip}", s.handleIPRedirect())
// API routes
r.Route("/api/v1", func(r chi.Router) {
r.Get("/stats", s.handleStats())
r.Get("/ip/{ip}", s.handleIPLookup())
r.Get("/as/{asn}", s.handleASDetailJSON())
r.Get("/prefix/{prefix}", s.handlePrefixDetailJSON())
})
s.router = r
}

View File

@ -3,14 +3,20 @@ package server
import ( import (
"context" "context"
"encoding/json"
"errors"
"net/http" "net/http"
"os" "os"
"runtime"
"time" "time"
"git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/logger" "git.eeqj.de/sneak/routewatch/internal/logger"
"git.eeqj.de/sneak/routewatch/internal/streamer" "git.eeqj.de/sneak/routewatch/internal/streamer"
"git.eeqj.de/sneak/routewatch/internal/templates"
"github.com/dustin/go-humanize"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
) )
// Server provides HTTP endpoints for status monitoring // Server provides HTTP endpoints for status monitoring
@ -35,6 +41,33 @@ func New(db database.Store, streamer *streamer.Streamer, logger *logger.Logger)
return s return s
} }
// setupRoutes configures the HTTP routes
func (s *Server) setupRoutes() {
r := chi.NewRouter()
// Middleware
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
const requestTimeout = 2 * time.Second
r.Use(TimeoutMiddleware(requestTimeout))
r.Use(JSONResponseMiddleware)
// Routes
r.Get("/", s.handleRoot())
r.Get("/status", s.handleStatusHTML())
r.Get("/status.json", s.handleStatusJSON())
// API routes
r.Route("/api/v1", func(r chi.Router) {
r.Get("/stats", s.handleStats())
r.Get("/ip/{ip}", s.handleIPLookup())
})
s.router = r
}
// Start starts the HTTP server // Start starts the HTTP server
func (s *Server) Start() error { func (s *Server) Start() error {
port := os.Getenv("PORT") port := os.Getenv("PORT")
@ -70,3 +103,357 @@ func (s *Server) Stop(ctx context.Context) error {
return s.srv.Shutdown(ctx) return s.srv.Shutdown(ctx)
} }
// handleRoot returns a handler that redirects to /status
func (s *Server) handleRoot() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/status", http.StatusSeeOther)
}
}
// writeJSONError writes a standardized JSON error response
func writeJSONError(w http.ResponseWriter, statusCode int, message string) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
_ = json.NewEncoder(w).Encode(map[string]interface{}{
"status": "error",
"error": map[string]interface{}{
"msg": message,
"code": statusCode,
},
})
}
// writeJSONSuccess writes a standardized JSON success response
func writeJSONSuccess(w http.ResponseWriter, data interface{}) error {
w.Header().Set("Content-Type", "application/json")
return json.NewEncoder(w).Encode(map[string]interface{}{
"status": "ok",
"data": data,
})
}
// handleStatusJSON returns a handler that serves JSON statistics
func (s *Server) handleStatusJSON() http.HandlerFunc {
// Stats represents the statistics response
type Stats struct {
Uptime string `json:"uptime"`
TotalMessages uint64 `json:"total_messages"`
TotalBytes uint64 `json:"total_bytes"`
MessagesPerSec float64 `json:"messages_per_sec"`
MbitsPerSec float64 `json:"mbits_per_sec"`
Connected bool `json:"connected"`
GoVersion string `json:"go_version"`
Goroutines int `json:"goroutines"`
MemoryUsage string `json:"memory_usage"`
ASNs int `json:"asns"`
Prefixes int `json:"prefixes"`
IPv4Prefixes int `json:"ipv4_prefixes"`
IPv6Prefixes int `json:"ipv6_prefixes"`
Peerings int `json:"peerings"`
Peers int `json:"peers"`
DatabaseSizeBytes int64 `json:"database_size_bytes"`
LiveRoutes int `json:"live_routes"`
IPv4Routes int `json:"ipv4_routes"`
IPv6Routes int `json:"ipv6_routes"`
IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"`
IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"`
IPv4PrefixDistribution []database.PrefixDistribution `json:"ipv4_prefix_distribution"`
IPv6PrefixDistribution []database.PrefixDistribution `json:"ipv6_prefix_distribution"`
}
return func(w http.ResponseWriter, r *http.Request) {
// Create a 1 second timeout context for this request
ctx, cancel := context.WithTimeout(r.Context(), 1*time.Second)
defer cancel()
metrics := s.streamer.GetMetrics()
// Get database stats with timeout
statsChan := make(chan database.Stats)
errChan := make(chan error)
go func() {
dbStats, err := s.db.GetStats()
if err != nil {
s.logger.Debug("Database stats query failed", "error", err)
errChan <- err
return
}
statsChan <- dbStats
}()
var dbStats database.Stats
select {
case <-ctx.Done():
s.logger.Error("Database stats timeout in status.json")
writeJSONError(w, http.StatusRequestTimeout, "Database timeout")
return
case err := <-errChan:
s.logger.Error("Failed to get database stats", "error", err)
writeJSONError(w, http.StatusInternalServerError, err.Error())
return
case dbStats = <-statsChan:
// Success
}
uptime := time.Since(metrics.ConnectedSince).Truncate(time.Second).String()
if metrics.ConnectedSince.IsZero() {
uptime = "0s"
}
const bitsPerMegabit = 1000000.0
// Get route counts from database
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts()
if err != nil {
s.logger.Warn("Failed to get live route counts", "error", err)
// Continue with zero counts
}
// Get route update metrics
routeMetrics := s.streamer.GetMetricsTracker().GetRouteMetrics()
// Get memory stats
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
stats := Stats{
Uptime: uptime,
TotalMessages: metrics.TotalMessages,
TotalBytes: metrics.TotalBytes,
MessagesPerSec: metrics.MessagesPerSec,
MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit,
Connected: metrics.Connected,
GoVersion: runtime.Version(),
Goroutines: runtime.NumGoroutine(),
MemoryUsage: humanize.Bytes(memStats.Alloc),
ASNs: dbStats.ASNs,
Prefixes: dbStats.Prefixes,
IPv4Prefixes: dbStats.IPv4Prefixes,
IPv6Prefixes: dbStats.IPv6Prefixes,
Peerings: dbStats.Peerings,
Peers: dbStats.Peers,
DatabaseSizeBytes: dbStats.FileSizeBytes,
LiveRoutes: dbStats.LiveRoutes,
IPv4Routes: ipv4Routes,
IPv6Routes: ipv6Routes,
IPv4UpdatesPerSec: routeMetrics.IPv4UpdatesPerSec,
IPv6UpdatesPerSec: routeMetrics.IPv6UpdatesPerSec,
IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution,
IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution,
}
if err := writeJSONSuccess(w, stats); err != nil {
s.logger.Error("Failed to encode stats", "error", err)
}
}
}
// handleStats returns a handler that serves API v1 statistics
func (s *Server) handleStats() http.HandlerFunc {
// HandlerStatsInfo represents handler statistics in the API response
type HandlerStatsInfo struct {
Name string `json:"name"`
QueueLength int `json:"queue_length"`
QueueCapacity int `json:"queue_capacity"`
ProcessedCount uint64 `json:"processed_count"`
DroppedCount uint64 `json:"dropped_count"`
AvgProcessTimeMs float64 `json:"avg_process_time_ms"`
MinProcessTimeMs float64 `json:"min_process_time_ms"`
MaxProcessTimeMs float64 `json:"max_process_time_ms"`
}
// StatsResponse represents the API statistics response
type StatsResponse struct {
Uptime string `json:"uptime"`
TotalMessages uint64 `json:"total_messages"`
TotalBytes uint64 `json:"total_bytes"`
MessagesPerSec float64 `json:"messages_per_sec"`
MbitsPerSec float64 `json:"mbits_per_sec"`
Connected bool `json:"connected"`
GoVersion string `json:"go_version"`
Goroutines int `json:"goroutines"`
MemoryUsage string `json:"memory_usage"`
ASNs int `json:"asns"`
Prefixes int `json:"prefixes"`
IPv4Prefixes int `json:"ipv4_prefixes"`
IPv6Prefixes int `json:"ipv6_prefixes"`
Peerings int `json:"peerings"`
Peers int `json:"peers"`
DatabaseSizeBytes int64 `json:"database_size_bytes"`
LiveRoutes int `json:"live_routes"`
IPv4Routes int `json:"ipv4_routes"`
IPv6Routes int `json:"ipv6_routes"`
IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"`
IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"`
HandlerStats []HandlerStatsInfo `json:"handler_stats"`
IPv4PrefixDistribution []database.PrefixDistribution `json:"ipv4_prefix_distribution"`
IPv6PrefixDistribution []database.PrefixDistribution `json:"ipv6_prefix_distribution"`
}
return func(w http.ResponseWriter, r *http.Request) {
// Create a 1 second timeout context for this request
ctx, cancel := context.WithTimeout(r.Context(), 1*time.Second)
defer cancel()
// Check if context is already cancelled
select {
case <-ctx.Done():
http.Error(w, "Request timeout", http.StatusRequestTimeout)
return
default:
}
metrics := s.streamer.GetMetrics()
// Get database stats with timeout
statsChan := make(chan database.Stats)
errChan := make(chan error)
go func() {
dbStats, err := s.db.GetStats()
if err != nil {
s.logger.Debug("Database stats query failed", "error", err)
errChan <- err
return
}
statsChan <- dbStats
}()
var dbStats database.Stats
select {
case <-ctx.Done():
s.logger.Error("Database stats timeout")
http.Error(w, "Database timeout", http.StatusRequestTimeout)
return
case err := <-errChan:
s.logger.Error("Failed to get database stats", "error", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
case dbStats = <-statsChan:
// Success
}
uptime := time.Since(metrics.ConnectedSince).Truncate(time.Second).String()
if metrics.ConnectedSince.IsZero() {
uptime = "0s"
}
const bitsPerMegabit = 1000000.0
// Get route counts from database
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts()
if err != nil {
s.logger.Warn("Failed to get live route counts", "error", err)
// Continue with zero counts
}
// Get route update metrics
routeMetrics := s.streamer.GetMetricsTracker().GetRouteMetrics()
// Get handler stats
handlerStats := s.streamer.GetHandlerStats()
handlerStatsInfo := make([]HandlerStatsInfo, 0, len(handlerStats))
const microsecondsPerMillisecond = 1000.0
for _, hs := range handlerStats {
handlerStatsInfo = append(handlerStatsInfo, HandlerStatsInfo{
Name: hs.Name,
QueueLength: hs.QueueLength,
QueueCapacity: hs.QueueCapacity,
ProcessedCount: hs.ProcessedCount,
DroppedCount: hs.DroppedCount,
AvgProcessTimeMs: float64(hs.AvgProcessTime.Microseconds()) / microsecondsPerMillisecond,
MinProcessTimeMs: float64(hs.MinProcessTime.Microseconds()) / microsecondsPerMillisecond,
MaxProcessTimeMs: float64(hs.MaxProcessTime.Microseconds()) / microsecondsPerMillisecond,
})
}
// Get memory stats
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
stats := StatsResponse{
Uptime: uptime,
TotalMessages: metrics.TotalMessages,
TotalBytes: metrics.TotalBytes,
MessagesPerSec: metrics.MessagesPerSec,
MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit,
Connected: metrics.Connected,
GoVersion: runtime.Version(),
Goroutines: runtime.NumGoroutine(),
MemoryUsage: humanize.Bytes(memStats.Alloc),
ASNs: dbStats.ASNs,
Prefixes: dbStats.Prefixes,
IPv4Prefixes: dbStats.IPv4Prefixes,
IPv6Prefixes: dbStats.IPv6Prefixes,
Peerings: dbStats.Peerings,
Peers: dbStats.Peers,
DatabaseSizeBytes: dbStats.FileSizeBytes,
LiveRoutes: dbStats.LiveRoutes,
IPv4Routes: ipv4Routes,
IPv6Routes: ipv6Routes,
IPv4UpdatesPerSec: routeMetrics.IPv4UpdatesPerSec,
IPv6UpdatesPerSec: routeMetrics.IPv6UpdatesPerSec,
HandlerStats: handlerStatsInfo,
IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution,
IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution,
}
if err := writeJSONSuccess(w, stats); err != nil {
s.logger.Error("Failed to encode stats", "error", err)
}
}
}
// handleStatusHTML returns a handler that serves the HTML status page
func (s *Server) handleStatusHTML() http.HandlerFunc {
return func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
tmpl := templates.StatusTemplate()
if err := tmpl.Execute(w, nil); err != nil {
s.logger.Error("Failed to render template", "error", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}
}
}
// handleIPLookup returns a handler that looks up AS information for an IP address
func (s *Server) handleIPLookup() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ip := chi.URLParam(r, "ip")
if ip == "" {
writeJSONError(w, http.StatusBadRequest, "IP parameter is required")
return
}
// Look up AS information for the IP
asInfo, err := s.db.GetASInfoForIP(ip)
if err != nil {
// Check if it's an invalid IP error
if errors.Is(err, database.ErrInvalidIP) {
writeJSONError(w, http.StatusBadRequest, err.Error())
} else {
// All other errors (including ErrNoRoute) are 404
writeJSONError(w, http.StatusNotFound, err.Error())
}
return
}
// Return successful response
if err := writeJSONSuccess(w, asInfo); err != nil {
s.logger.Error("Failed to encode AS info", "error", err)
}
}
}

View File

@ -1,228 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AS{{.ASN.Number}} - {{.ASN.Handle}} - RouteWatch</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
margin: 0;
padding: 20px;
background: #f5f5f5;
color: #333;
}
.container {
max-width: 1200px;
margin: 0 auto;
background: white;
padding: 30px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
h1 {
margin: 0 0 10px 0;
color: #2c3e50;
}
.subtitle {
color: #7f8c8d;
margin-bottom: 30px;
}
.info-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 20px;
margin-bottom: 30px;
}
.info-card {
background: #f8f9fa;
padding: 20px;
border-radius: 6px;
border-left: 4px solid #3498db;
}
.info-label {
font-size: 14px;
color: #7f8c8d;
margin-bottom: 5px;
}
.info-value {
font-size: 24px;
font-weight: bold;
color: #2c3e50;
}
.prefix-section {
margin-top: 30px;
}
.prefix-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 20px;
}
.prefix-header h2 {
margin: 0;
color: #2c3e50;
}
.prefix-count {
background: #e74c3c;
color: white;
padding: 5px 12px;
border-radius: 20px;
font-size: 14px;
font-weight: bold;
}
.prefix-table {
width: 100%;
border-collapse: collapse;
background: white;
border: 1px solid #e0e0e0;
border-radius: 6px;
overflow: hidden;
}
.prefix-table th {
background: #34495e;
color: white;
padding: 12px;
text-align: left;
font-weight: 600;
}
.prefix-table td {
padding: 12px;
border-bottom: 1px solid #e0e0e0;
}
.prefix-table tr:hover {
background: #f8f9fa;
}
.prefix-table tr:last-child td {
border-bottom: none;
}
.prefix-link {
color: #3498db;
text-decoration: none;
font-family: monospace;
}
.prefix-link:hover {
text-decoration: underline;
}
.age {
color: #7f8c8d;
font-size: 14px;
}
.nav-link {
display: inline-block;
margin-bottom: 20px;
color: #3498db;
text-decoration: none;
}
.nav-link:hover {
text-decoration: underline;
}
.empty-state {
text-align: center;
padding: 40px;
color: #7f8c8d;
}
@media (max-width: 768px) {
.container {
padding: 20px;
}
.info-grid {
grid-template-columns: 1fr;
}
}
</style>
</head>
<body>
<div class="container">
<a href="/status" class="nav-link">← Back to Status</a>
<h1>AS{{.ASN.Number}}{{if .ASN.Handle}} - {{.ASN.Handle}}{{end}}</h1>
{{if .ASN.Description}}
<p class="subtitle">{{.ASN.Description}}</p>
{{end}}
<div class="info-grid">
<div class="info-card">
<div class="info-label">Total Prefixes</div>
<div class="info-value">{{.TotalCount}}</div>
</div>
<div class="info-card">
<div class="info-label">IPv4 Prefixes</div>
<div class="info-value">{{.IPv4Count}}</div>
</div>
<div class="info-card">
<div class="info-label">IPv6 Prefixes</div>
<div class="info-value">{{.IPv6Count}}</div>
</div>
<div class="info-card">
<div class="info-label">First Seen</div>
<div class="info-value">{{.ASN.FirstSeen.Format "2006-01-02"}}</div>
</div>
</div>
{{if .IPv4Prefixes}}
<div class="prefix-section">
<div class="prefix-header">
<h2>IPv4 Prefixes</h2>
<span class="prefix-count">{{.IPv4Count}}</span>
</div>
<table class="prefix-table">
<thead>
<tr>
<th>Prefix</th>
<th>Mask Length</th>
<th>Last Updated</th>
<th>Age</th>
</tr>
</thead>
<tbody>
{{range .IPv4Prefixes}}
<tr>
<td><a href="/prefix/{{.Prefix | urlEncode}}" class="prefix-link">{{.Prefix}}</a></td>
<td>/{{.MaskLength}}</td>
<td>{{.LastUpdated.Format "2006-01-02 15:04:05"}}</td>
<td class="age">{{.LastUpdated | timeSince}}</td>
</tr>
{{end}}
</tbody>
</table>
</div>
{{end}}
{{if .IPv6Prefixes}}
<div class="prefix-section">
<div class="prefix-header">
<h2>IPv6 Prefixes</h2>
<span class="prefix-count">{{.IPv6Count}}</span>
</div>
<table class="prefix-table">
<thead>
<tr>
<th>Prefix</th>
<th>Mask Length</th>
<th>Last Updated</th>
<th>Age</th>
</tr>
</thead>
<tbody>
{{range .IPv6Prefixes}}
<tr>
<td><a href="/prefix/{{.Prefix | urlEncode}}" class="prefix-link">{{.Prefix}}</a></td>
<td>/{{.MaskLength}}</td>
<td>{{.LastUpdated.Format "2006-01-02 15:04:05"}}</td>
<td class="age">{{.LastUpdated | timeSince}}</td>
</tr>
{{end}}
</tbody>
</table>
</div>
{{end}}
{{if eq .TotalCount 0}}
<div class="empty-state">
<p>No prefixes announced by this AS</p>
</div>
{{end}}
</div>
</body>
</html>

View File

@ -1,253 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{{.Prefix}} - RouteWatch</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
margin: 0;
padding: 20px;
background: #f5f5f5;
color: #333;
}
.container {
max-width: 1200px;
margin: 0 auto;
background: white;
padding: 30px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
h1 {
margin: 0 0 10px 0;
color: #2c3e50;
font-family: monospace;
font-size: 28px;
}
.subtitle {
color: #7f8c8d;
margin-bottom: 30px;
}
.info-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 20px;
margin-bottom: 30px;
}
.info-card {
background: #f8f9fa;
padding: 20px;
border-radius: 6px;
border-left: 4px solid #3498db;
}
.info-label {
font-size: 14px;
color: #7f8c8d;
margin-bottom: 5px;
}
.info-value {
font-size: 24px;
font-weight: bold;
color: #2c3e50;
}
.routes-section {
margin-top: 30px;
}
.routes-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 20px;
}
.routes-header h2 {
margin: 0;
color: #2c3e50;
}
.route-count {
background: #e74c3c;
color: white;
padding: 5px 12px;
border-radius: 20px;
font-size: 14px;
font-weight: bold;
}
.route-table {
width: 100%;
border-collapse: collapse;
background: white;
border: 1px solid #e0e0e0;
border-radius: 6px;
overflow: hidden;
}
.route-table th {
background: #34495e;
color: white;
padding: 12px;
text-align: left;
font-weight: 600;
}
.route-table td {
padding: 12px;
border-bottom: 1px solid #e0e0e0;
}
.route-table tr:hover {
background: #f8f9fa;
}
.route-table tr:last-child td {
border-bottom: none;
}
.as-link {
color: #3498db;
text-decoration: none;
}
.as-link:hover {
text-decoration: underline;
}
.peer-ip {
font-family: monospace;
font-size: 14px;
color: #555;
}
.as-path {
font-family: monospace;
font-size: 13px;
color: #666;
max-width: 300px;
overflow-x: auto;
white-space: nowrap;
}
.age {
color: #7f8c8d;
font-size: 14px;
}
.nav-link {
display: inline-block;
margin-bottom: 20px;
color: #3498db;
text-decoration: none;
}
.nav-link:hover {
text-decoration: underline;
}
.origins-section {
margin-top: 30px;
background: #f8f9fa;
padding: 20px;
border-radius: 6px;
}
.origins-section h3 {
margin-top: 0;
color: #2c3e50;
}
.origin-list {
display: flex;
flex-wrap: wrap;
gap: 10px;
}
.origin-item {
background: white;
padding: 10px 15px;
border-radius: 4px;
border: 1px solid #e0e0e0;
}
.empty-state {
text-align: center;
padding: 40px;
color: #7f8c8d;
}
@media (max-width: 768px) {
.container {
padding: 20px;
}
.info-grid {
grid-template-columns: 1fr;
}
.route-table {
font-size: 14px;
}
.as-path {
max-width: 150px;
}
}
</style>
</head>
<body>
<div class="container">
<a href="/status" class="nav-link">← Back to Status</a>
<h1>{{.Prefix}}</h1>
<p class="subtitle">IPv{{.IPVersion}} Prefix{{if .MaskLength}} • /{{.MaskLength}}{{end}}</p>
<div class="info-grid">
<div class="info-card">
<div class="info-label">Seen from Peers</div>
<div class="info-value">{{.PeerCount}}</div>
</div>
<div class="info-card">
<div class="info-label">Origin ASNs</div>
<div class="info-value">{{.OriginCount}}</div>
</div>
<div class="info-card">
<div class="info-label">IP Version</div>
<div class="info-value">IPv{{.IPVersion}}</div>
</div>
</div>
{{if .Origins}}
<div class="origins-section">
<h3>Origin ASNs</h3>
<div class="origin-list">
{{range .Origins}}
<div class="origin-item">
<a href="/as/{{.Number}}" class="as-link">AS{{.Number}}</a>
{{if .Handle}} ({{.Handle}}){{end}}
<span style="color: #7f8c8d; margin-left: 10px;">{{.PeerCount}} peer{{if ne .PeerCount 1}}s{{end}}</span>
</div>
{{end}}
</div>
</div>
{{end}}
{{if .Routes}}
<div class="routes-section">
<div class="routes-header">
<h2>Route Details</h2>
<span class="route-count">{{.PeerCount}} route{{if ne .PeerCount 1}}s{{end}}</span>
</div>
<table class="route-table">
<thead>
<tr>
<th>Origin AS</th>
<th>Peer IP</th>
<th>AS Path</th>
<th>Next Hop</th>
<th>Last Updated</th>
<th>Age</th>
</tr>
</thead>
<tbody>
{{range .Routes}}
<tr>
<td>
<a href="/as/{{.OriginASN}}" class="as-link">AS{{.OriginASN}}</a>
</td>
<td class="peer-ip">{{.PeerIP}}</td>
<td class="as-path">{{range $i, $as := .ASPath}}{{if $i}} → {{end}}{{$as}}{{end}}</td>
<td class="peer-ip">{{.NextHop}}</td>
<td>{{.LastUpdated.Format "2006-01-02 15:04:05"}}</td>
<td class="age">{{.LastUpdated | timeSince}}</td>
</tr>
{{end}}
</tbody>
</table>
</div>
{{else}}
<div class="empty-state">
<p>No routes found for this prefix</p>
</div>
{{end}}
</div>
</body>
</html>

View File

@ -4,25 +4,15 @@ package templates
import ( import (
_ "embed" _ "embed"
"html/template" "html/template"
"net/url"
"sync" "sync"
"time"
) )
//go:embed status.html //go:embed status.html
var statusHTML string var statusHTML string
//go:embed as_detail.html
var asDetailHTML string
//go:embed prefix_detail.html
var prefixDetailHTML string
// Templates contains all parsed templates // Templates contains all parsed templates
type Templates struct { type Templates struct {
Status *template.Template Status *template.Template
ASDetail *template.Template
PrefixDetail *template.Template
} }
var ( var (
@ -32,73 +22,17 @@ var (
once sync.Once once sync.Once
) )
const (
hoursPerDay = 24
daysPerMonth = 30
)
// timeSince returns a human-readable duration since the given time
func timeSince(t time.Time) string {
duration := time.Since(t)
if duration < time.Minute {
return "just now"
}
if duration < time.Hour {
minutes := int(duration.Minutes())
if minutes == 1 {
return "1 minute ago"
}
return duration.Truncate(time.Minute).String() + " ago"
}
if duration < hoursPerDay*time.Hour {
hours := int(duration.Hours())
if hours == 1 {
return "1 hour ago"
}
return duration.Truncate(time.Hour).String() + " ago"
}
days := int(duration.Hours() / hoursPerDay)
if days == 1 {
return "1 day ago"
}
if days < daysPerMonth {
return duration.Truncate(hoursPerDay*time.Hour).String() + " ago"
}
return t.Format("2006-01-02")
}
// initTemplates parses all embedded templates // initTemplates parses all embedded templates
func initTemplates() { func initTemplates() {
var err error var err error
defaultTemplates = &Templates{} defaultTemplates = &Templates{}
// Create common template functions
funcs := template.FuncMap{
"timeSince": timeSince,
"urlEncode": url.QueryEscape,
}
// Parse status template // Parse status template
defaultTemplates.Status, err = template.New("status").Parse(statusHTML) defaultTemplates.Status, err = template.New("status").Parse(statusHTML)
if err != nil { if err != nil {
panic("failed to parse status template: " + err.Error()) panic("failed to parse status template: " + err.Error())
} }
// Parse AS detail template
defaultTemplates.ASDetail, err = template.New("asDetail").Funcs(funcs).Parse(asDetailHTML)
if err != nil {
panic("failed to parse AS detail template: " + err.Error())
}
// Parse prefix detail template
defaultTemplates.PrefixDetail, err = template.New("prefixDetail").Funcs(funcs).Parse(prefixDetailHTML)
if err != nil {
panic("failed to parse prefix detail template: " + err.Error())
}
} }
// Get returns the singleton Templates instance // Get returns the singleton Templates instance
@ -112,13 +46,3 @@ func Get() *Templates {
func StatusTemplate() *template.Template { func StatusTemplate() *template.Template {
return Get().Status return Get().Status
} }
// ASDetailTemplate returns the parsed AS detail template
func ASDetailTemplate() *template.Template {
return Get().ASDetail
}
// PrefixDetailTemplate returns the parsed prefix detail template
func PrefixDetailTemplate() *template.Template {
return Get().PrefixDetail
}

125
log.txt
View File

@ -1,125 +0,0 @@
[Fx] PROVIDE fx.Lifecycle <= go.uber.org/fx.New.func1()
[Fx] PROVIDE fx.Shutdowner <= go.uber.org/fx.(*App).shutdowner-fm()
[Fx] PROVIDE fx.DotGraph <= go.uber.org/fx.(*App).dotGraph-fm()
[Fx] PROVIDE *logger.Logger <= git.eeqj.de/sneak/routewatch/internal/logger.New()
[Fx] PROVIDE *config.Config <= git.eeqj.de/sneak/routewatch/internal/config.New()
[Fx] PROVIDE *metrics.Tracker <= git.eeqj.de/sneak/routewatch/internal/metrics.New()
[Fx] PROVIDE database.Store <= fx.Annotate(git.eeqj.de/sneak/routewatch/internal/database.New(), fx.As([[database.Store]])
[Fx] PROVIDE *streamer.Streamer <= git.eeqj.de/sneak/routewatch/internal/streamer.New()
[Fx] PROVIDE *server.Server <= git.eeqj.de/sneak/routewatch/internal/server.New()
[Fx] PROVIDE *routewatch.RouteWatch <= git.eeqj.de/sneak/routewatch/internal/routewatch.New()
[Fx] INVOKE git.eeqj.de/sneak/routewatch/internal/routewatch.CLIEntry.func1()
[Fx] BEFORE RUN provide: go.uber.org/fx.New.func1()
[Fx] RUN provide: go.uber.org/fx.New.func1() in 6.292µs
[Fx] BEFORE RUN provide: git.eeqj.de/sneak/routewatch/internal/config.New()
[Fx] RUN provide: git.eeqj.de/sneak/routewatch/internal/config.New() in 6.458µs
[Fx] BEFORE RUN provide: git.eeqj.de/sneak/routewatch/internal/logger.New()
[Fx] RUN provide: git.eeqj.de/sneak/routewatch/internal/logger.New() in 4.417µs
[Fx] BEFORE RUN provide: fx.Annotate(git.eeqj.de/sneak/routewatch/internal/database.New(), fx.As([[database.Store]])
{"time":"2025-07-28T17:12:49.977025+02:00","level":"INFO","msg":"Opening database","source":"database.go:55","func":"database.New","path":"/Users/user/Library/Application Support/berlin.sneak.app.routewatch/db.sqlite"}
{"time":"2025-07-28T17:12:50.21775+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"PRAGMA wal_checkpoint(TRUNCATE)","duration":115272875}
{"time":"2025-07-28T17:12:50.217936+02:00","level":"INFO","msg":"Running VACUUM to optimize database (this may take a moment)","source":"database.go:122","func":"database.(*Database).Initialize"}
{"time":"2025-07-28T17:12:59.531431+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"VACUUM","duration":9313432750}
[Fx] RUN provide: fx.Annotate(git.eeqj.de/sneak/routewatch/internal/database.New(), fx.As([[database.Store]]) in 9.554516041s
[Fx] BEFORE RUN provide: git.eeqj.de/sneak/routewatch/internal/metrics.New()
[Fx] RUN provide: git.eeqj.de/sneak/routewatch/internal/metrics.New() in 38.292µs
[Fx] BEFORE RUN provide: git.eeqj.de/sneak/routewatch/internal/streamer.New()
[Fx] RUN provide: git.eeqj.de/sneak/routewatch/internal/streamer.New() in 4.5µs
[Fx] BEFORE RUN provide: git.eeqj.de/sneak/routewatch/internal/server.New()
[Fx] RUN provide: git.eeqj.de/sneak/routewatch/internal/server.New() in 60.125µs
[Fx] BEFORE RUN provide: git.eeqj.de/sneak/routewatch/internal/routewatch.New()
[Fx] RUN provide: git.eeqj.de/sneak/routewatch/internal/routewatch.New() in 3.083µs
[Fx] BEFORE RUN provide: go.uber.org/fx.(*App).shutdowner-fm()
[Fx] RUN provide: go.uber.org/fx.(*App).shutdowner-fm() in 7.167µs
[Fx] HOOK OnStart git.eeqj.de/sneak/routewatch/internal/routewatch.CLIEntry.func1.1() executing (caller: git.eeqj.de/sneak/routewatch/internal/routewatch.CLIEntry.func1)
[Fx] HOOK OnStart git.eeqj.de/sneak/routewatch/internal/routewatch.CLIEntry.func1.1() called by git.eeqj.de/sneak/routewatch/internal/routewatch.CLIEntry.func1 ran successfully in 162.25µs
[Fx] RUNNING
{"time":"2025-07-28T17:12:59.53194+02:00","level":"INFO","msg":"Starting RouteWatch","source":"app.go:64","func":"routewatch.(*RouteWatch).Run"}
{"time":"2025-07-28T17:12:59.531973+02:00","level":"INFO","msg":"Using batched database handlers for improved performance","source":"app.go:76","func":"routewatch.(*RouteWatch).Run"}
{"time":"2025-07-28T17:12:59.533095+02:00","level":"INFO","msg":"Starting HTTP server","source":"server.go:52","func":"server.(*Server).Start","port":"8080"}
2025/07/28 17:13:00 [akrotiri/R2eLWiud8V-000001] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56646 - 200 3622B in 324.489792ms
2025/07/28 17:13:00 [akrotiri/R2eLWiud8V-000002] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56646 - 200 3622B in 323.379916ms
{"time":"2025-07-28T17:13:00.934924+02:00","level":"INFO","msg":"Connected to RIS Live stream","source":"streamer.go:343","func":"streamer.(*Streamer).stream"}
2025/07/28 17:13:01 [akrotiri/R2eLWiud8V-000003] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56646 - 200 3655B in 340.457292ms
{"time":"2025-07-28T17:13:01.836921+02:00","level":"INFO","msg":"BGP session opened","source":"streamer.go:428","func":"streamer.(*Streamer).stream","peer":"193.107.13.3","peer_asn":"47787"}
{"time":"2025-07-28T17:13:02.283639+02:00","level":"ERROR","msg":"Database stats timeout","source":"handlers.go:248","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1"}
2025/07/28 17:13:02 [akrotiri/R2eLWiud8V-000004] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56646 - 408 17B in 1.000718708s
{"time":"2025-07-28T17:13:02.805274+02:00","level":"ERROR","msg":"Database stats timeout","source":"handlers.go:248","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1"}
2025/07/28 17:13:02 [akrotiri/R2eLWiud8V-000005] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56649 - 408 17B in 1.00114125s
{"time":"2025-07-28T17:13:02.87048+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"SELECT COUNT(*) FROM asns","duration":1587465542}
{"time":"2025-07-28T17:13:02.871544+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"SELECT COUNT(*) FROM asns","duration":568061250}
{"time":"2025-07-28T17:13:02.871628+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"SELECT COUNT(*) FROM asns","duration":587415667}
{"time":"2025-07-28T17:13:02.871894+02:00","level":"ERROR","msg":"Failed to process ASN batch","source":"ashandler.go:149","func":"routewatch.(*ASHandler).flushBatchLocked","error":"failed to update ASN 23634: database table is locked","count":483}
{"time":"2025-07-28T17:13:02.883924+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"SELECT COUNT(*) FROM asns","duration":1079722417}
{"time":"2025-07-28T17:13:02.928747+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"SELECT COUNT(*) FROM asns","duration":122623584}
{"time":"2025-07-28T17:13:02.94791+02:00","level":"ERROR","msg":"Failed to process peer batch","source":"peerhandler.go:151","func":"routewatch.(*PeerHandler).flushBatchLocked","error":"failed to update peer 2001:7f8:24::8d: database table is locked","count":276}
{"time":"2025-07-28T17:13:02.948072+02:00","level":"DEBUG","msg":"Database stats query failed","source":"handlers.go:237","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1.1","error":"failed to count live routes: database table is locked: live_routes"}
{"time":"2025-07-28T17:13:02.948115+02:00","level":"DEBUG","msg":"Database stats query failed","source":"handlers.go:237","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1.1","error":"failed to count live routes: database table is locked: live_routes"}
{"time":"2025-07-28T17:13:02.948089+02:00","level":"ERROR","msg":"Failed to get database stats","source":"handlers.go:253","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1","error":"failed to count live routes: database table is locked: live_routes"}
2025/07/28 17:13:02 [akrotiri/R2eLWiud8V-000006] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56650 - 500 67B in 663.970667ms
{"time":"2025-07-28T17:13:02.948473+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"SELECT COUNT(*) FROM asns","duration":142440958}
{"time":"2025-07-28T17:13:02.9485+02:00","level":"DEBUG","msg":"Database stats query failed","source":"handlers.go:237","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1.1","error":"failed to count live routes: database table is locked: live_routes"}
{"time":"2025-07-28T17:13:02.948509+02:00","level":"ERROR","msg":"Failed to get database stats","source":"handlers.go:253","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1","error":"failed to count live routes: database table is locked: live_routes"}
2025/07/28 17:13:02 [akrotiri/R2eLWiud8V-000007] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56651 - 500 67B in 645.064042ms
{"time":"2025-07-28T17:13:02.948853+02:00","level":"ERROR","msg":"Failed to process ASN batch","source":"ashandler.go:149","func":"routewatch.(*ASHandler).flushBatchLocked","error":"failed to update ASN 49432: database table is locked","count":503}
{"time":"2025-07-28T17:13:02.961369+02:00","level":"DEBUG","msg":"Database stats query failed","source":"handlers.go:237","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1.1","error":"failed to count live routes: database table is locked: live_routes"}
{"time":"2025-07-28T17:13:02.987269+02:00","level":"DEBUG","msg":"Database stats query failed","source":"handlers.go:237","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1.1","error":"failed to count live routes: database table is locked: live_routes"}
{"time":"2025-07-28T17:13:02.987286+02:00","level":"ERROR","msg":"Failed to get database stats","source":"handlers.go:253","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1","error":"failed to count live routes: database table is locked: live_routes"}
{"time":"2025-07-28T17:13:02.987269+02:00","level":"DEBUG","msg":"Database stats query failed","source":"handlers.go:237","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1.1","error":"failed to count live routes: database table is locked: live_routes"}
{"time":"2025-07-28T17:13:02.987299+02:00","level":"ERROR","msg":"Failed to get database stats","source":"handlers.go:253","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1","error":"failed to count live routes: database table is locked: live_routes"}
2025/07/28 17:13:02 [akrotiri/R2eLWiud8V-000009] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56653 - 500 67B in 181.208959ms
2025/07/28 17:13:02 [akrotiri/R2eLWiud8V-000008] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56652 - 500 67B in 181.296125ms
{"time":"2025-07-28T17:13:03.124495+02:00","level":"DEBUG","msg":"Flushed prefix batch","source":"prefixhandler.go:221","func":"routewatch.(*PrefixHandler).flushBatchLocked","batch_size":5000,"unique_prefixes":1731,"success":1731,"duration_ms":1852}
{"time":"2025-07-28T17:13:03.178923+02:00","level":"DEBUG","msg":"Flushed prefix batch","source":"prefixhandler.go:221","func":"routewatch.(*PrefixHandler).flushBatchLocked","batch_size":5000,"unique_prefixes":1107,"success":1107,"duration_ms":54}
{"time":"2025-07-28T17:13:03.577435+02:00","level":"ERROR","msg":"Failed to process ASN batch","source":"ashandler.go:149","func":"routewatch.(*ASHandler).flushBatchLocked","error":"failed to update ASN 138583: database table is locked","count":563}
2025/07/28 17:13:03 [akrotiri/R2eLWiud8V-000010] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56650 - 200 3679B in 321.707916ms
{"time":"2025-07-28T17:13:03.931676+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"SELECT COUNT(*) FROM asns","duration":135058125}
{"time":"2025-07-28T17:13:03.944156+02:00","level":"ERROR","msg":"Failed to process ASN batch","source":"ashandler.go:149","func":"routewatch.(*ASHandler).flushBatchLocked","error":"failed to update ASN 265315: database table is locked","count":486}
{"time":"2025-07-28T17:13:03.948721+02:00","level":"INFO","msg":"BGP session opened","source":"streamer.go:428","func":"streamer.(*Streamer).stream","peer":"196.60.8.170","peer_asn":"327781"}
{"time":"2025-07-28T17:13:04.067453+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"DELETE FROM live_routes WHERE prefix = ? AND peer_ip = ?","duration":115385375}
{"time":"2025-07-28T17:13:04.068433+02:00","level":"ERROR","msg":"Failed to process ASN batch","source":"ashandler.go:149","func":"routewatch.(*ASHandler).flushBatchLocked","error":"failed to update ASN 3491: database table is locked","count":512}
{"time":"2025-07-28T17:13:04.10943+02:00","level":"WARN","msg":"BGP notification","source":"streamer.go:436","func":"streamer.(*Streamer).stream","peer":"80.81.192.113","peer_asn":"35320"}
{"time":"2025-07-28T17:13:04.159034+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"DELETE FROM live_routes WHERE prefix = ? AND peer_ip = ?","duration":51181709}
{"time":"2025-07-28T17:13:04.159666+02:00","level":"DEBUG","msg":"Flushed prefix batch","source":"prefixhandler.go:221","func":"routewatch.(*PrefixHandler).flushBatchLocked","batch_size":5021,"unique_prefixes":1552,"success":1552,"duration_ms":649}
2025/07/28 17:13:04 [akrotiri/R2eLWiud8V-000011] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56650 - 200 3685B in 449.669375ms
{"time":"2025-07-28T17:13:04.246417+02:00","level":"ERROR","msg":"Failed to process ASN batch","source":"ashandler.go:149","func":"routewatch.(*ASHandler).flushBatchLocked","error":"failed to update ASN 141216: database table is locked","count":510}
{"time":"2025-07-28T17:13:04.396504+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"SELECT COUNT(*) FROM asns","duration":94807500}
{"time":"2025-07-28T17:13:04.397204+02:00","level":"ERROR","msg":"Failed to process ASN batch","source":"ashandler.go:149","func":"routewatch.(*ASHandler).flushBatchLocked","error":"failed to update ASN 28329: database table is locked","count":500}
{"time":"2025-07-28T17:13:04.416404+02:00","level":"DEBUG","msg":"Database stats query failed","source":"handlers.go:237","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1.1","error":"failed to count live routes: database table is locked: live_routes"}
{"time":"2025-07-28T17:13:04.416423+02:00","level":"ERROR","msg":"Failed to get database stats","source":"handlers.go:253","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1","error":"failed to count live routes: database table is locked: live_routes"}
2025/07/28 17:13:04 [akrotiri/R2eLWiud8V-000012] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56650 - 500 67B in 114.792667ms
{"time":"2025-07-28T17:13:04.419385+02:00","level":"DEBUG","msg":"Flushed prefix batch","source":"prefixhandler.go:221","func":"routewatch.(*PrefixHandler).flushBatchLocked","batch_size":5001,"unique_prefixes":1311,"success":1311,"duration_ms":259}
{"time":"2025-07-28T17:13:04.427118+02:00","level":"ERROR","msg":"Failed to process peer batch","source":"peerhandler.go:151","func":"routewatch.(*PeerHandler).flushBatchLocked","error":"failed to update peer 2001:7f8:4::f2d7:1: database table is locked","count":558}
{"time":"2025-07-28T17:13:04.640385+02:00","level":"ERROR","msg":"Failed to process ASN batch","source":"ashandler.go:149","func":"routewatch.(*ASHandler).flushBatchLocked","error":"failed to update ASN 9002: database table is locked","count":588}
{"time":"2025-07-28T17:13:04.643686+02:00","level":"DEBUG","msg":"Flushed prefix batch","source":"prefixhandler.go:221","func":"routewatch.(*PrefixHandler).flushBatchLocked","batch_size":5000,"unique_prefixes":1884,"success":1884,"duration_ms":224}
{"time":"2025-07-28T17:13:04.796451+02:00","level":"ERROR","msg":"Failed to process ASN batch","source":"ashandler.go:149","func":"routewatch.(*ASHandler).flushBatchLocked","error":"failed to update ASN 16276: database table is locked","count":472}
{"time":"2025-07-28T17:13:04.79782+02:00","level":"DEBUG","msg":"Flushed prefix batch","source":"prefixhandler.go:221","func":"routewatch.(*PrefixHandler).flushBatchLocked","batch_size":5007,"unique_prefixes":1111,"success":1111,"duration_ms":153}
{"time":"2025-07-28T17:13:04.825693+02:00","level":"DEBUG","msg":"Database stats query failed","source":"handlers.go:237","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1.1","error":"failed to count live routes: database table is locked: live_routes"}
{"time":"2025-07-28T17:13:04.825713+02:00","level":"ERROR","msg":"Failed to get database stats","source":"handlers.go:253","func":"server.(*Server).setupRoutes.func1.(*Server).handleStats.1","error":"failed to count live routes: database table is locked: live_routes"}
2025/07/28 17:13:04 [akrotiri/R2eLWiud8V-000013] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56650 - 500 67B in 19.796125ms
{"time":"2025-07-28T17:13:05.014116+02:00","level":"ERROR","msg":"Failed to process ASN batch","source":"ashandler.go:149","func":"routewatch.(*ASHandler).flushBatchLocked","error":"failed to update ASN 134484: database table is locked","count":576}
{"time":"2025-07-28T17:13:05.014915+02:00","level":"DEBUG","msg":"Flushed prefix batch","source":"prefixhandler.go:221","func":"routewatch.(*PrefixHandler).flushBatchLocked","batch_size":5000,"unique_prefixes":1797,"success":1797,"duration_ms":216}
{"time":"2025-07-28T17:13:05.179676+02:00","level":"DEBUG","msg":"Flushed prefix batch","source":"prefixhandler.go:221","func":"routewatch.(*PrefixHandler).flushBatchLocked","batch_size":5062,"unique_prefixes":1485,"success":1485,"duration_ms":164}
2025/07/28 17:13:05 [akrotiri/R2eLWiud8V-000014] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56650 - 200 3780B in 320.984667ms
{"time":"2025-07-28T17:13:05.818964+02:00","level":"DEBUG","msg":"Flushed prefix batch","source":"prefixhandler.go:221","func":"routewatch.(*PrefixHandler).flushBatchLocked","batch_size":5000,"unique_prefixes":1357,"success":1357,"duration_ms":147}
{"time":"2025-07-28T17:13:06.16192+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"UPDATE asns SET last_seen = ? WHERE id = ?","duration":50454000}
2025/07/28 17:13:06 [akrotiri/R2eLWiud8V-000015] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56650 - 200 3785B in 317.421541ms
{"time":"2025-07-28T17:13:06.459642+02:00","level":"DEBUG","msg":"Slow query","source":"slowquery.go:17","func":"database.logSlowQuery","query":"\n\t\tINSERT INTO live_routes (id, prefix, mask_length, ip_version, origin_asn, peer_ip, as_path, next_hop, \n\t\t\tlast_updated, v4_ip_start, v4_ip_end)\n\t\tVALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)\n\t\tON CONFLICT(prefix, origin_asn, peer_ip) DO UPDATE SET\n\t\t\tmask_length = excluded.mask_length,\n\t\t\tip_version = excluded.ip_version,\n\t\t\tas_path = excluded.as_path,\n\t\t\tnext_hop = excluded.next_hop,\n\t\t\tlast_updated = excluded.last_updated,\n\t\t\tv4_ip_start = excluded.v4_ip_start,\n\t\t\tv4_ip_end = excluded.v4_ip_end\n\t","duration":86616625}
{"time":"2025-07-28T17:13:06.45968+02:00","level":"ERROR","msg":"Failed to upsert route batch","source":"prefixhandler.go:206","func":"routewatch.(*PrefixHandler).flushBatchLocked","error":"failed to upsert route 14.232.144.0/20: database table is locked: live_routes","count":1582}
{"time":"2025-07-28T17:13:06.460464+02:00","level":"ERROR","msg":"Failed to delete route batch","source":"prefixhandler.go:214","func":"routewatch.(*PrefixHandler).flushBatchLocked","error":"failed to delete route 2a06:de02:5bb:0:0:0:0:0/48: database table is locked: live_routes","count":68}
{"time":"2025-07-28T17:13:06.460474+02:00","level":"DEBUG","msg":"Flushed prefix batch","source":"prefixhandler.go:221","func":"routewatch.(*PrefixHandler).flushBatchLocked","batch_size":5000,"unique_prefixes":1650,"success":0,"duration_ms":109}
{"time":"2025-07-28T17:13:06.549698+02:00","level":"ERROR","msg":"Failed to process ASN batch","source":"ashandler.go:149","func":"routewatch.(*ASHandler).flushBatchLocked","error":"failed to update ASN 58453: database table is locked","count":508}
2025/07/28 17:13:06 [akrotiri/R2eLWiud8V-000016] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56650 - 200 3785B in 319.24825ms
{"time":"2025-07-28T17:13:06.914088+02:00","level":"DEBUG","msg":"Flushed prefix batch","source":"prefixhandler.go:221","func":"routewatch.(*PrefixHandler).flushBatchLocked","batch_size":5000,"unique_prefixes":2065,"success":2065,"duration_ms":213}
2025/07/28 17:13:07 [akrotiri/R2eLWiud8V-000017] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56650 - 200 3787B in 326.68525ms
{"time":"2025-07-28T17:13:07.510509+02:00","level":"ERROR","msg":"Failed to upsert route batch","source":"prefixhandler.go:206","func":"routewatch.(*PrefixHandler).flushBatchLocked","error":"failed to upsert route 189.28.84.0/24: database table is locked: live_routes","count":1795}
{"time":"2025-07-28T17:13:07.511781+02:00","level":"ERROR","msg":"Failed to delete route batch","source":"prefixhandler.go:214","func":"routewatch.(*PrefixHandler).flushBatchLocked","error":"failed to delete route 2406:7f40:8300:0:0:0:0:0/40: database table is locked: live_routes","count":91}
{"time":"2025-07-28T17:13:07.511764+02:00","level":"ERROR","msg":"Failed to process ASN batch","source":"ashandler.go:149","func":"routewatch.(*ASHandler).flushBatchLocked","error":"failed to update ASN 45192: database table is locked","count":589}
{"time":"2025-07-28T17:13:07.5118+02:00","level":"DEBUG","msg":"Flushed prefix batch","source":"prefixhandler.go:221","func":"routewatch.(*PrefixHandler).flushBatchLocked","batch_size":5000,"unique_prefixes":1886,"success":0,"duration_ms":31}
2025/07/28 17:13:07 [akrotiri/R2eLWiud8V-000018] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56650 - 200 3786B in 322.780666ms
{"time":"2025-07-28T17:13:08.182061+02:00","level":"WARN","msg":"BGP notification","source":"streamer.go:436","func":"streamer.(*Streamer).stream","peer":"193.239.118.249","peer_asn":"41255"}
{"time":"2025-07-28T17:13:08.200973+02:00","level":"ERROR","msg":"Failed to upsert route batch","source":"prefixhandler.go:206","func":"routewatch.(*PrefixHandler).flushBatchLocked","error":"failed to upsert route 158.172.251.0/24: database table is locked","count":1061}
2025/07/28 17:13:08 [akrotiri/R2eLWiud8V-000019] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56650 - 200 3790B in 331.584125ms
{"time":"2025-07-28T17:13:08.201058+02:00","level":"ERROR","msg":"Failed to delete route batch","source":"prefixhandler.go:214","func":"routewatch.(*PrefixHandler).flushBatchLocked","error":"failed to delete route 2a06:de02:4a0:0:0:0:0:0/48: database table is locked","count":88}
{"time":"2025-07-28T17:13:08.201066+02:00","level":"DEBUG","msg":"Flushed prefix batch","source":"prefixhandler.go:221","func":"routewatch.(*PrefixHandler).flushBatchLocked","batch_size":5001,"unique_prefixes":1149,"success":0,"duration_ms":16}
2025/07/28 17:13:08 [akrotiri/R2eLWiud8V-000020] "GET http://127.0.0.1:8080/api/v1/stats HTTP/1.1" from 127.0.0.1:56650 - 200 3786B in 319.2075ms