Compare commits

...

12 Commits

Author SHA1 Message Date
6d46bbad5b Fix nil pointer dereference in GetPrefixDistributionContext
- Use separate variables for IPv4 and IPv6 query results
- Add nil checks before closing rows to prevent panic
- Prevents crash when database queries timeout or fail
2025-07-28 22:04:22 +02:00
9518519208 Fix prefix links on prefix length page with URL encoding
- Add urlEncode template function to properly encode prefix URLs
- Move prefix_length.html to embedded templates with function map
- Prevents broken links for prefixes containing slashes
2025-07-28 22:00:27 +02:00
7d39bd18bc Fix concurrent map write panic in timeout middleware
- Add thread-safe header wrapper in timeoutWriter
- Check context cancellation before writing responses in handlers
- Protect header access after timeout with mutex
- Prevents race condition when requests timeout while handlers are still running
2025-07-28 21:54:58 +02:00
e0a4c8642e Add context cancellation support to database operations
- Add context-aware versions of all read operations in the database
- Update handlers to use context from HTTP requests
- Allows database queries to be cancelled when HTTP requests timeout
- Prevents database operations from continuing after client disconnects
2025-07-28 19:27:55 +02:00
0196251906 Fix race condition crash in timeout middleware
- Remove duplicate http.Error call when context times out
- The timeout middleware already handles writing the response
- Prevents "concurrent write to websocket connection" panic
2025-07-28 19:07:30 +02:00
62ed5e08aa Improve prefix count link styling on status page
- Add dashed underline to prefix count links to indicate they are clickable
- Change to solid blue underline on hover for better UX
- Remove inline styles and use CSS classes instead
2025-07-28 19:05:45 +02:00
5fb3fc0381 Fix prefix length page to show unique prefixes only
- Change GetRandomPrefixesByLength to return unique prefixes instead of all routes
- Use CTE to first select random unique prefixes, then join to get their latest route info
- This ensures each prefix appears only once in the list
2025-07-28 19:04:19 +02:00
9a63553f8d Add index to optimize COUNT(DISTINCT prefix) queries
- Add compound index on (ip_version, mask_length, prefix) to speed up prefix distribution queries
- This index will significantly improve performance of COUNT(DISTINCT prefix) operations
- Note: Existing databases will need to manually create this index or recreate the database
2025-07-28 19:01:45 +02:00
ba13c76c53 Fix prefix distribution bug and add prefix length pages
- Fix GetPrefixDistribution to count unique prefixes using COUNT(DISTINCT prefix) instead of COUNT(*)
- Add /prefixlength/<length> route showing random sample of 500 prefixes
- Make prefix counts on status page clickable links to prefix length pages
- Add GetRandomPrefixesByLength database method
- Create prefix_length.html template with sortable table
- Show prefix age and origin AS with descriptions
2025-07-28 18:42:38 +02:00
1dcde74a90 Update AS path display to show handles with clickable links
- Change AS path from descriptions to handles (short names)
- Make each AS in the path a clickable link to /as/<asn>
- Add font-weight to AS links in path for better visibility
- Prevent word wrapping on all table columns except AS path
- Remove unused maxASDescriptionLength constant
2025-07-28 18:31:35 +02:00
81267431f7 Increase batch sizes to improve write throughput
- Increase prefix batch size from 5K to 20K
- Increase ASN batch size from 10K to 30K
- Add comments warning not to reduce batch timeouts
- Add comments warning not to increase queue sizes above 100K
- Maintains existing batch timeouts for efficiency
2025-07-28 18:27:42 +02:00
dc3ceb8d94 Show AS descriptions in AS path on prefix detail page
- Display AS descriptions alongside AS numbers in format: Description (ASN)
- Truncate descriptions longer than 20 characters with ellipsis
- Increase container max width to 1600px for better display
- Enable word wrapping for AS path cells to handle long paths
- Update mobile responsive styles for AS path display
2025-07-28 18:25:26 +02:00
17 changed files with 1760 additions and 271007 deletions

View File

@ -21,7 +21,7 @@ clean:
rm -rf bin/ rm -rf bin/
run: build run: build
./bin/routewatch DEBUG=routewatch ./bin/routewatch 2>&1 | tee log.txt
asupdate: asupdate:
@echo "Updating AS info data..." @echo "Updating AS info data..."

View File

@ -2,6 +2,7 @@
package database package database
import ( import (
"context"
"database/sql" "database/sql"
_ "embed" _ "embed"
"encoding/json" "encoding/json"
@ -747,41 +748,48 @@ func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, ti
// GetStats returns database statistics // GetStats returns database statistics
func (d *Database) GetStats() (Stats, error) { func (d *Database) GetStats() (Stats, error) {
return d.GetStatsContext(context.Background())
}
// GetStatsContext returns database statistics with context support
func (d *Database) GetStatsContext(ctx context.Context) (Stats, error) {
var stats Stats var stats Stats
// Count ASNs // Count ASNs
err := d.queryRow("SELECT COUNT(*) FROM asns").Scan(&stats.ASNs) err := d.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM asns").Scan(&stats.ASNs)
if err != nil { if err != nil {
return stats, err return stats, err
} }
// Count prefixes // Count prefixes
err = d.queryRow("SELECT COUNT(*) FROM prefixes").Scan(&stats.Prefixes) err = d.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM prefixes").Scan(&stats.Prefixes)
if err != nil { if err != nil {
return stats, err return stats, err
} }
// Count IPv4 and IPv6 prefixes // Count IPv4 and IPv6 prefixes
const ipVersionV4 = 4 const ipVersionV4 = 4
err = d.queryRow("SELECT COUNT(*) FROM prefixes WHERE ip_version = ?", ipVersionV4).Scan(&stats.IPv4Prefixes) err = d.db.QueryRowContext(ctx,
"SELECT COUNT(*) FROM prefixes WHERE ip_version = ?", ipVersionV4).Scan(&stats.IPv4Prefixes)
if err != nil { if err != nil {
return stats, err return stats, err
} }
const ipVersionV6 = 6 const ipVersionV6 = 6
err = d.queryRow("SELECT COUNT(*) FROM prefixes WHERE ip_version = ?", ipVersionV6).Scan(&stats.IPv6Prefixes) err = d.db.QueryRowContext(ctx,
"SELECT COUNT(*) FROM prefixes WHERE ip_version = ?", ipVersionV6).Scan(&stats.IPv6Prefixes)
if err != nil { if err != nil {
return stats, err return stats, err
} }
// Count peerings // Count peerings
err = d.queryRow("SELECT COUNT(*) FROM peerings").Scan(&stats.Peerings) err = d.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM peerings").Scan(&stats.Peerings)
if err != nil { if err != nil {
return stats, err return stats, err
} }
// Count peers // Count peers
err = d.queryRow("SELECT COUNT(*) FROM bgp_peers").Scan(&stats.Peers) err = d.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM bgp_peers").Scan(&stats.Peers)
if err != nil { if err != nil {
return stats, err return stats, err
} }
@ -796,13 +804,13 @@ func (d *Database) GetStats() (Stats, error) {
} }
// Get live routes count // Get live routes count
err = d.db.QueryRow("SELECT COUNT(*) FROM live_routes").Scan(&stats.LiveRoutes) err = d.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM live_routes").Scan(&stats.LiveRoutes)
if err != nil { if err != nil {
return stats, fmt.Errorf("failed to count live routes: %w", err) return stats, fmt.Errorf("failed to count live routes: %w", err)
} }
// Get prefix distribution // Get prefix distribution
stats.IPv4PrefixDistribution, stats.IPv6PrefixDistribution, err = d.GetPrefixDistribution() stats.IPv4PrefixDistribution, stats.IPv6PrefixDistribution, err = d.GetPrefixDistributionContext(ctx)
if err != nil { if err != nil {
// Log but don't fail // Log but don't fail
d.logger.Warn("Failed to get prefix distribution", "error", err) d.logger.Warn("Failed to get prefix distribution", "error", err)
@ -884,47 +892,61 @@ func (d *Database) DeleteLiveRoute(prefix string, originASN int, peerIP string)
return err return err
} }
// GetPrefixDistribution returns the distribution of prefixes by mask length // GetPrefixDistribution returns the distribution of unique prefixes by mask length
func (d *Database) GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error) { func (d *Database) GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error) {
// IPv4 distribution return d.GetPrefixDistributionContext(context.Background())
}
// GetPrefixDistributionContext returns the distribution of unique prefixes by mask length with context support
func (d *Database) GetPrefixDistributionContext(ctx context.Context) (
ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error) {
// IPv4 distribution - count unique prefixes, not routes
query := ` query := `
SELECT mask_length, COUNT(*) as count SELECT mask_length, COUNT(DISTINCT prefix) as count
FROM live_routes FROM live_routes
WHERE ip_version = 4 WHERE ip_version = 4
GROUP BY mask_length GROUP BY mask_length
ORDER BY mask_length ORDER BY mask_length
` `
rows, err := d.db.Query(query) rows4, err := d.db.QueryContext(ctx, query)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to query IPv4 distribution: %w", err) return nil, nil, fmt.Errorf("failed to query IPv4 distribution: %w", err)
} }
defer func() { _ = rows.Close() }() defer func() {
if rows4 != nil {
_ = rows4.Close()
}
}()
for rows.Next() { for rows4.Next() {
var dist PrefixDistribution var dist PrefixDistribution
if err := rows.Scan(&dist.MaskLength, &dist.Count); err != nil { if err := rows4.Scan(&dist.MaskLength, &dist.Count); err != nil {
return nil, nil, fmt.Errorf("failed to scan IPv4 distribution: %w", err) return nil, nil, fmt.Errorf("failed to scan IPv4 distribution: %w", err)
} }
ipv4 = append(ipv4, dist) ipv4 = append(ipv4, dist)
} }
// IPv6 distribution // IPv6 distribution - count unique prefixes, not routes
query = ` query = `
SELECT mask_length, COUNT(*) as count SELECT mask_length, COUNT(DISTINCT prefix) as count
FROM live_routes FROM live_routes
WHERE ip_version = 6 WHERE ip_version = 6
GROUP BY mask_length GROUP BY mask_length
ORDER BY mask_length ORDER BY mask_length
` `
rows, err = d.db.Query(query) rows6, err := d.db.QueryContext(ctx, query)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to query IPv6 distribution: %w", err) return nil, nil, fmt.Errorf("failed to query IPv6 distribution: %w", err)
} }
defer func() { _ = rows.Close() }() defer func() {
if rows6 != nil {
_ = rows6.Close()
}
}()
for rows.Next() { for rows6.Next() {
var dist PrefixDistribution var dist PrefixDistribution
if err := rows.Scan(&dist.MaskLength, &dist.Count); err != nil { if err := rows6.Scan(&dist.MaskLength, &dist.Count); err != nil {
return nil, nil, fmt.Errorf("failed to scan IPv6 distribution: %w", err) return nil, nil, fmt.Errorf("failed to scan IPv6 distribution: %w", err)
} }
ipv6 = append(ipv6, dist) ipv6 = append(ipv6, dist)
@ -935,14 +957,19 @@ func (d *Database) GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []Pr
// GetLiveRouteCounts returns the count of IPv4 and IPv6 routes // GetLiveRouteCounts returns the count of IPv4 and IPv6 routes
func (d *Database) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) { func (d *Database) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) {
return d.GetLiveRouteCountsContext(context.Background())
}
// GetLiveRouteCountsContext returns the count of IPv4 and IPv6 routes with context support
func (d *Database) GetLiveRouteCountsContext(ctx context.Context) (ipv4Count, ipv6Count int, err error) {
// Get IPv4 count // Get IPv4 count
err = d.db.QueryRow("SELECT COUNT(*) FROM live_routes WHERE ip_version = 4").Scan(&ipv4Count) err = d.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM live_routes WHERE ip_version = 4").Scan(&ipv4Count)
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("failed to count IPv4 routes: %w", err) return 0, 0, fmt.Errorf("failed to count IPv4 routes: %w", err)
} }
// Get IPv6 count // Get IPv6 count
err = d.db.QueryRow("SELECT COUNT(*) FROM live_routes WHERE ip_version = 6").Scan(&ipv6Count) err = d.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM live_routes WHERE ip_version = 6").Scan(&ipv6Count)
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("failed to count IPv6 routes: %w", err) return 0, 0, fmt.Errorf("failed to count IPv6 routes: %w", err)
} }
@ -952,6 +979,11 @@ func (d *Database) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) {
// GetASInfoForIP returns AS information for the given IP address // GetASInfoForIP returns AS information for the given IP address
func (d *Database) GetASInfoForIP(ip string) (*ASInfo, error) { func (d *Database) GetASInfoForIP(ip string) (*ASInfo, error) {
return d.GetASInfoForIPContext(context.Background(), ip)
}
// GetASInfoForIPContext returns AS information for the given IP address with context support
func (d *Database) GetASInfoForIPContext(ctx context.Context, ip string) (*ASInfo, error) {
// Parse the IP to validate it // Parse the IP to validate it
parsedIP := net.ParseIP(ip) parsedIP := net.ParseIP(ip)
if parsedIP == nil { if parsedIP == nil {
@ -984,7 +1016,7 @@ func (d *Database) GetASInfoForIP(ip string) (*ASInfo, error) {
var lastUpdated time.Time var lastUpdated time.Time
var handle, description sql.NullString var handle, description sql.NullString
err := d.db.QueryRow(query, ipVersionV4, ipUint, ipUint).Scan( err := d.db.QueryRowContext(ctx, query, ipVersionV4, ipUint, ipUint).Scan(
&prefix, &maskLength, &originASN, &lastUpdated, &handle, &description) &prefix, &maskLength, &originASN, &lastUpdated, &handle, &description)
if err != nil { if err != nil {
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
@ -1015,7 +1047,7 @@ func (d *Database) GetASInfoForIP(ip string) (*ASInfo, error) {
ORDER BY lr.mask_length DESC ORDER BY lr.mask_length DESC
` `
rows, err := d.db.Query(query, ipVersionV6) rows, err := d.db.QueryContext(ctx, query, ipVersionV6)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to query routes: %w", err) return nil, fmt.Errorf("failed to query routes: %w", err)
} }
@ -1118,11 +1150,16 @@ func CalculateIPv4Range(cidr string) (start, end uint32, err error) {
// GetASDetails returns detailed information about an AS including prefixes // GetASDetails returns detailed information about an AS including prefixes
func (d *Database) GetASDetails(asn int) (*ASN, []LiveRoute, error) { func (d *Database) GetASDetails(asn int) (*ASN, []LiveRoute, error) {
return d.GetASDetailsContext(context.Background(), asn)
}
// GetASDetailsContext returns detailed information about an AS including prefixes with context support
func (d *Database) GetASDetailsContext(ctx context.Context, asn int) (*ASN, []LiveRoute, error) {
// Get AS information // Get AS information
var asnInfo ASN var asnInfo ASN
var idStr string var idStr string
var handle, description sql.NullString var handle, description sql.NullString
err := d.db.QueryRow( err := d.db.QueryRowContext(ctx,
"SELECT id, number, handle, description, first_seen, last_seen FROM asns WHERE number = ?", "SELECT id, number, handle, description, first_seen, last_seen FROM asns WHERE number = ?",
asn, asn,
).Scan(&idStr, &asnInfo.Number, &handle, &description, &asnInfo.FirstSeen, &asnInfo.LastSeen) ).Scan(&idStr, &asnInfo.Number, &handle, &description, &asnInfo.FirstSeen, &asnInfo.LastSeen)
@ -1147,7 +1184,7 @@ func (d *Database) GetASDetails(asn int) (*ASN, []LiveRoute, error) {
GROUP BY prefix, mask_length, ip_version GROUP BY prefix, mask_length, ip_version
` `
rows, err := d.db.Query(query, asn) rows, err := d.db.QueryContext(ctx, query, asn)
if err != nil { if err != nil {
return &asnInfo, nil, fmt.Errorf("failed to query prefixes: %w", err) return &asnInfo, nil, fmt.Errorf("failed to query prefixes: %w", err)
} }
@ -1183,6 +1220,11 @@ func (d *Database) GetASDetails(asn int) (*ASN, []LiveRoute, error) {
// GetPrefixDetails returns detailed information about a prefix // GetPrefixDetails returns detailed information about a prefix
func (d *Database) GetPrefixDetails(prefix string) ([]LiveRoute, error) { func (d *Database) GetPrefixDetails(prefix string) ([]LiveRoute, error) {
return d.GetPrefixDetailsContext(context.Background(), prefix)
}
// GetPrefixDetailsContext returns detailed information about a prefix with context support
func (d *Database) GetPrefixDetailsContext(ctx context.Context, prefix string) ([]LiveRoute, error) {
query := ` query := `
SELECT lr.origin_asn, lr.peer_ip, lr.as_path, lr.next_hop, lr.last_updated, SELECT lr.origin_asn, lr.peer_ip, lr.as_path, lr.next_hop, lr.last_updated,
a.handle, a.description a.handle, a.description
@ -1192,7 +1234,7 @@ func (d *Database) GetPrefixDetails(prefix string) ([]LiveRoute, error) {
ORDER BY lr.origin_asn, lr.peer_ip ORDER BY lr.origin_asn, lr.peer_ip
` `
rows, err := d.db.Query(query, prefix) rows, err := d.db.QueryContext(ctx, query, prefix)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to query prefix details: %w", err) return nil, fmt.Errorf("failed to query prefix details: %w", err)
} }
@ -1227,3 +1269,64 @@ func (d *Database) GetPrefixDetails(prefix string) ([]LiveRoute, error) {
return routes, nil return routes, nil
} }
// GetRandomPrefixesByLength returns a random sample of prefixes with the specified mask length
func (d *Database) GetRandomPrefixesByLength(maskLength, ipVersion, limit int) ([]LiveRoute, error) {
return d.GetRandomPrefixesByLengthContext(context.Background(), maskLength, ipVersion, limit)
}
// GetRandomPrefixesByLengthContext returns a random sample of prefixes with context support
func (d *Database) GetRandomPrefixesByLengthContext(
ctx context.Context, maskLength, ipVersion, limit int) ([]LiveRoute, error) {
// Select unique prefixes with their most recent route information
query := `
WITH unique_prefixes AS (
SELECT prefix, MAX(last_updated) as max_updated
FROM live_routes
WHERE mask_length = ? AND ip_version = ?
GROUP BY prefix
ORDER BY RANDOM()
LIMIT ?
)
SELECT lr.prefix, lr.mask_length, lr.ip_version, lr.origin_asn, lr.as_path,
lr.peer_ip, lr.last_updated
FROM live_routes lr
INNER JOIN unique_prefixes up ON lr.prefix = up.prefix AND lr.last_updated = up.max_updated
WHERE lr.mask_length = ? AND lr.ip_version = ?
`
rows, err := d.db.QueryContext(ctx, query, maskLength, ipVersion, limit, maskLength, ipVersion)
if err != nil {
return nil, fmt.Errorf("failed to query random prefixes: %w", err)
}
defer func() {
_ = rows.Close()
}()
var routes []LiveRoute
for rows.Next() {
var route LiveRoute
var pathJSON string
err := rows.Scan(
&route.Prefix,
&route.MaskLength,
&route.IPVersion,
&route.OriginASN,
&pathJSON,
&route.PeerIP,
&route.LastUpdated,
)
if err != nil {
continue
}
// Decode AS path
if err := json.Unmarshal([]byte(pathJSON), &route.ASPath); err != nil {
route.ASPath = []int{}
}
routes = append(routes, route)
}
return routes, nil
}

View File

@ -1,6 +1,7 @@
package database package database
import ( import (
"context"
"time" "time"
) )
@ -35,6 +36,7 @@ type Store interface {
// Statistics // Statistics
GetStats() (Stats, error) GetStats() (Stats, error)
GetStatsContext(ctx context.Context) (Stats, error)
// Peer operations // Peer operations
UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error
@ -46,14 +48,21 @@ type Store interface {
DeleteLiveRoute(prefix string, originASN int, peerIP string) error DeleteLiveRoute(prefix string, originASN int, peerIP string) error
DeleteLiveRouteBatch(deletions []LiveRouteDeletion) error DeleteLiveRouteBatch(deletions []LiveRouteDeletion) error
GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error) GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error)
GetPrefixDistributionContext(ctx context.Context) (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error)
GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error)
GetLiveRouteCountsContext(ctx context.Context) (ipv4Count, ipv6Count int, err error)
// IP lookup operations // IP lookup operations
GetASInfoForIP(ip string) (*ASInfo, error) GetASInfoForIP(ip string) (*ASInfo, error)
GetASInfoForIPContext(ctx context.Context, ip string) (*ASInfo, error)
// AS and prefix detail operations // AS and prefix detail operations
GetASDetails(asn int) (*ASN, []LiveRoute, error) GetASDetails(asn int) (*ASN, []LiveRoute, error)
GetASDetailsContext(ctx context.Context, asn int) (*ASN, []LiveRoute, error)
GetPrefixDetails(prefix string) ([]LiveRoute, error) GetPrefixDetails(prefix string) ([]LiveRoute, error)
GetPrefixDetailsContext(ctx context.Context, prefix string) ([]LiveRoute, error)
GetRandomPrefixesByLength(maskLength, ipVersion, limit int) ([]LiveRoute, error)
GetRandomPrefixesByLengthContext(ctx context.Context, maskLength, ipVersion, limit int) ([]LiveRoute, error)
// Lifecycle // Lifecycle
Close() error Close() error

View File

@ -91,4 +91,6 @@ CREATE INDEX IF NOT EXISTS idx_live_routes_mask_length ON live_routes(mask_lengt
CREATE INDEX IF NOT EXISTS idx_live_routes_ip_version_mask ON live_routes(ip_version, mask_length); CREATE INDEX IF NOT EXISTS idx_live_routes_ip_version_mask ON live_routes(ip_version, mask_length);
CREATE INDEX IF NOT EXISTS idx_live_routes_last_updated ON live_routes(last_updated); CREATE INDEX IF NOT EXISTS idx_live_routes_last_updated ON live_routes(last_updated);
-- Indexes for IPv4 range queries -- Indexes for IPv4 range queries
CREATE INDEX IF NOT EXISTS idx_live_routes_ipv4_range ON live_routes(v4_ip_start, v4_ip_end) WHERE ip_version = 4; CREATE INDEX IF NOT EXISTS idx_live_routes_ipv4_range ON live_routes(v4_ip_start, v4_ip_end) WHERE ip_version = 4;
-- Index to optimize COUNT(DISTINCT prefix) queries
CREATE INDEX IF NOT EXISTS idx_live_routes_ip_mask_prefix ON live_routes(ip_version, mask_length, prefix);

View File

@ -19,6 +19,7 @@ func logSlowQuery(logger *logger.Logger, query string, start time.Time) {
} }
// queryRow wraps QueryRow with slow query logging // queryRow wraps QueryRow with slow query logging
// nolint:unused // kept for consistency with other query wrappers
func (d *Database) queryRow(query string, args ...interface{}) *sql.Row { func (d *Database) queryRow(query string, args ...interface{}) *sql.Row {
start := time.Now() start := time.Now()
defer logSlowQuery(d.logger, query, start) defer logSlowQuery(d.logger, query, start)

View File

@ -163,6 +163,11 @@ func (m *mockStore) GetStats() (database.Stats, error) {
}, nil }, nil
} }
// GetStatsContext returns statistics about the mock store with context support
func (m *mockStore) GetStatsContext(ctx context.Context) (database.Stats, error) {
return m.GetStats()
}
// UpsertLiveRoute mock implementation // UpsertLiveRoute mock implementation
func (m *mockStore) UpsertLiveRoute(route *database.LiveRoute) error { func (m *mockStore) UpsertLiveRoute(route *database.LiveRoute) error {
// Simple mock - just return nil // Simple mock - just return nil
@ -181,12 +186,22 @@ func (m *mockStore) GetPrefixDistribution() (ipv4 []database.PrefixDistribution,
return nil, nil, nil return nil, nil, nil
} }
// GetPrefixDistributionContext mock implementation with context support
func (m *mockStore) GetPrefixDistributionContext(ctx context.Context) (ipv4 []database.PrefixDistribution, ipv6 []database.PrefixDistribution, err error) {
return m.GetPrefixDistribution()
}
// GetLiveRouteCounts mock implementation // GetLiveRouteCounts mock implementation
func (m *mockStore) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) { func (m *mockStore) GetLiveRouteCounts() (ipv4Count, ipv6Count int, err error) {
// Return mock counts // Return mock counts
return m.RouteCount / 2, m.RouteCount / 2, nil return m.RouteCount / 2, m.RouteCount / 2, nil
} }
// GetLiveRouteCountsContext mock implementation with context support
func (m *mockStore) GetLiveRouteCountsContext(ctx context.Context) (ipv4Count, ipv6Count int, err error) {
return m.GetLiveRouteCounts()
}
// GetASInfoForIP mock implementation // GetASInfoForIP mock implementation
func (m *mockStore) GetASInfoForIP(ip string) (*database.ASInfo, error) { func (m *mockStore) GetASInfoForIP(ip string) (*database.ASInfo, error) {
// Simple mock - return a test AS // Simple mock - return a test AS
@ -201,6 +216,11 @@ func (m *mockStore) GetASInfoForIP(ip string) (*database.ASInfo, error) {
}, nil }, nil
} }
// GetASInfoForIPContext mock implementation with context support
func (m *mockStore) GetASInfoForIPContext(ctx context.Context, ip string) (*database.ASInfo, error) {
return m.GetASInfoForIP(ip)
}
// GetASDetails mock implementation // GetASDetails mock implementation
func (m *mockStore) GetASDetails(asn int) (*database.ASN, []database.LiveRoute, error) { func (m *mockStore) GetASDetails(asn int) (*database.ASN, []database.LiveRoute, error) {
m.mu.Lock() m.mu.Lock()
@ -215,12 +235,32 @@ func (m *mockStore) GetASDetails(asn int) (*database.ASN, []database.LiveRoute,
return nil, nil, database.ErrNoRoute return nil, nil, database.ErrNoRoute
} }
// GetASDetailsContext mock implementation with context support
func (m *mockStore) GetASDetailsContext(ctx context.Context, asn int) (*database.ASN, []database.LiveRoute, error) {
return m.GetASDetails(asn)
}
// GetPrefixDetails mock implementation // GetPrefixDetails mock implementation
func (m *mockStore) GetPrefixDetails(prefix string) ([]database.LiveRoute, error) { func (m *mockStore) GetPrefixDetails(prefix string) ([]database.LiveRoute, error) {
// Return empty routes for now // Return empty routes for now
return []database.LiveRoute{}, nil return []database.LiveRoute{}, nil
} }
// GetPrefixDetailsContext mock implementation with context support
func (m *mockStore) GetPrefixDetailsContext(ctx context.Context, prefix string) ([]database.LiveRoute, error) {
return m.GetPrefixDetails(prefix)
}
func (m *mockStore) GetRandomPrefixesByLength(maskLength, ipVersion, limit int) ([]database.LiveRoute, error) {
// Return empty routes for now
return []database.LiveRoute{}, nil
}
// GetRandomPrefixesByLengthContext mock implementation with context support
func (m *mockStore) GetRandomPrefixesByLengthContext(ctx context.Context, maskLength, ipVersion, limit int) ([]database.LiveRoute, error) {
return m.GetRandomPrefixesByLength(maskLength, ipVersion, limit)
}
// UpsertLiveRouteBatch mock implementation // UpsertLiveRouteBatch mock implementation
func (m *mockStore) UpsertLiveRouteBatch(routes []*database.LiveRoute) error { func (m *mockStore) UpsertLiveRouteBatch(routes []*database.LiveRoute) error {
m.mu.Lock() m.mu.Lock()

View File

@ -11,12 +11,14 @@ import (
const ( const (
// asHandlerQueueSize is the queue capacity for ASN operations // asHandlerQueueSize is the queue capacity for ASN operations
// DO NOT set this higher than 100000 without explicit instructions
asHandlerQueueSize = 100000 asHandlerQueueSize = 100000
// asnBatchSize is the number of ASN operations to batch together // asnBatchSize is the number of ASN operations to batch together
asnBatchSize = 10000 asnBatchSize = 30000
// asnBatchTimeout is the maximum time to wait before flushing a batch // asnBatchTimeout is the maximum time to wait before flushing a batch
// DO NOT reduce this timeout - larger batches are more efficient
asnBatchTimeout = 2 * time.Second asnBatchTimeout = 2 * time.Second
) )

View File

@ -15,12 +15,14 @@ import (
const ( const (
// prefixHandlerQueueSize is the queue capacity for prefix tracking operations // prefixHandlerQueueSize is the queue capacity for prefix tracking operations
// DO NOT set this higher than 100000 without explicit instructions
prefixHandlerQueueSize = 100000 prefixHandlerQueueSize = 100000
// prefixBatchSize is the number of prefix updates to batch together // prefixBatchSize is the number of prefix updates to batch together
prefixBatchSize = 5000 prefixBatchSize = 20000
// prefixBatchTimeout is the maximum time to wait before flushing a batch // prefixBatchTimeout is the maximum time to wait before flushing a batch
// DO NOT reduce this timeout - larger batches are more efficient
prefixBatchTimeout = 1 * time.Second prefixBatchTimeout = 1 * time.Second
// IP version constants // IP version constants

View File

@ -15,6 +15,7 @@ import (
"git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/templates" "git.eeqj.de/sneak/routewatch/internal/templates"
asinfo "git.eeqj.de/sneak/routewatch/pkg/asinfo"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
) )
@ -90,7 +91,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
errChan := make(chan error) errChan := make(chan error)
go func() { go func() {
dbStats, err := s.db.GetStats() dbStats, err := s.db.GetStatsContext(ctx)
if err != nil { if err != nil {
s.logger.Debug("Database stats query failed", "error", err) s.logger.Debug("Database stats query failed", "error", err)
errChan <- err errChan <- err
@ -124,7 +125,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
const bitsPerMegabit = 1000000.0 const bitsPerMegabit = 1000000.0
// Get route counts from database // Get route counts from database
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts() ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCountsContext(ctx)
if err != nil { if err != nil {
s.logger.Warn("Failed to get live route counts", "error", err) s.logger.Warn("Failed to get live route counts", "error", err)
// Continue with zero counts // Continue with zero counts
@ -232,7 +233,7 @@ func (s *Server) handleStats() http.HandlerFunc {
errChan := make(chan error) errChan := make(chan error)
go func() { go func() {
dbStats, err := s.db.GetStats() dbStats, err := s.db.GetStatsContext(ctx)
if err != nil { if err != nil {
s.logger.Debug("Database stats query failed", "error", err) s.logger.Debug("Database stats query failed", "error", err)
errChan <- err errChan <- err
@ -246,8 +247,7 @@ func (s *Server) handleStats() http.HandlerFunc {
select { select {
case <-ctx.Done(): case <-ctx.Done():
s.logger.Error("Database stats timeout") s.logger.Error("Database stats timeout")
http.Error(w, "Database timeout", http.StatusRequestTimeout) // Don't write response here - timeout middleware already handles it
return return
case err := <-errChan: case err := <-errChan:
s.logger.Error("Failed to get database stats", "error", err) s.logger.Error("Failed to get database stats", "error", err)
@ -266,7 +266,7 @@ func (s *Server) handleStats() http.HandlerFunc {
const bitsPerMegabit = 1000000.0 const bitsPerMegabit = 1000000.0
// Get route counts from database // Get route counts from database
ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCounts() ipv4Routes, ipv6Routes, err := s.db.GetLiveRouteCountsContext(ctx)
if err != nil { if err != nil {
s.logger.Warn("Failed to get live route counts", "error", err) s.logger.Warn("Failed to get live route counts", "error", err)
// Continue with zero counts // Continue with zero counts
@ -353,7 +353,7 @@ func (s *Server) handleIPLookup() http.HandlerFunc {
} }
// Look up AS information for the IP // Look up AS information for the IP
asInfo, err := s.db.GetASInfoForIP(ip) asInfo, err := s.db.GetASInfoForIPContext(r.Context(), ip)
if err != nil { if err != nil {
// Check if it's an invalid IP error // Check if it's an invalid IP error
if errors.Is(err, database.ErrInvalidIP) { if errors.Is(err, database.ErrInvalidIP) {
@ -384,7 +384,7 @@ func (s *Server) handleASDetailJSON() http.HandlerFunc {
return return
} }
asInfo, prefixes, err := s.db.GetASDetails(asn) asInfo, prefixes, err := s.db.GetASDetailsContext(r.Context(), asn)
if err != nil { if err != nil {
if errors.Is(err, database.ErrNoRoute) { if errors.Is(err, database.ErrNoRoute) {
writeJSONError(w, http.StatusNotFound, err.Error()) writeJSONError(w, http.StatusNotFound, err.Error())
@ -437,7 +437,7 @@ func (s *Server) handlePrefixDetailJSON() http.HandlerFunc {
return return
} }
routes, err := s.db.GetPrefixDetails(prefix) routes, err := s.db.GetPrefixDetailsContext(r.Context(), prefix)
if err != nil { if err != nil {
if errors.Is(err, database.ErrNoRoute) { if errors.Is(err, database.ErrNoRoute) {
writeJSONError(w, http.StatusNotFound, err.Error()) writeJSONError(w, http.StatusNotFound, err.Error())
@ -479,7 +479,7 @@ func (s *Server) handleASDetail() http.HandlerFunc {
return return
} }
asInfo, prefixes, err := s.db.GetASDetails(asn) asInfo, prefixes, err := s.db.GetASDetailsContext(r.Context(), asn)
if err != nil { if err != nil {
if errors.Is(err, database.ErrNoRoute) { if errors.Is(err, database.ErrNoRoute) {
http.Error(w, "AS not found", http.StatusNotFound) http.Error(w, "AS not found", http.StatusNotFound)
@ -556,6 +556,14 @@ func (s *Server) handleASDetail() http.HandlerFunc {
IPv6Count: len(ipv6Prefixes), IPv6Count: len(ipv6Prefixes),
} }
// Check if context is still valid before writing response
select {
case <-r.Context().Done():
// Request was cancelled, don't write response
return
default:
}
w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Type", "text/html; charset=utf-8")
tmpl := templates.ASDetailTemplate() tmpl := templates.ASDetailTemplate()
if err := tmpl.Execute(w, data); err != nil { if err := tmpl.Execute(w, data); err != nil {
@ -583,7 +591,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
return return
} }
routes, err := s.db.GetPrefixDetails(prefix) routes, err := s.db.GetPrefixDetailsContext(r.Context(), prefix)
if err != nil { if err != nil {
if errors.Is(err, database.ErrNoRoute) { if errors.Is(err, database.ErrNoRoute) {
http.Error(w, "Prefix not found", http.StatusNotFound) http.Error(w, "Prefix not found", http.StatusNotFound)
@ -606,7 +614,7 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
for _, route := range routes { for _, route := range routes {
if _, exists := originMap[route.OriginASN]; !exists { if _, exists := originMap[route.OriginASN]; !exists {
// Get AS info from database // Get AS info from database
asInfo, _, _ := s.db.GetASDetails(route.OriginASN) asInfo, _, _ := s.db.GetASDetailsContext(r.Context(), route.OriginASN)
handle := "" handle := ""
description := "" description := ""
if asInfo != nil { if asInfo != nil {
@ -645,12 +653,41 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
origins = append(origins, origin) origins = append(origins, origin)
} }
// Create enhanced routes with AS path handles
type ASPathEntry struct {
Number int
Handle string
}
type EnhancedRoute struct {
database.LiveRoute
ASPathWithHandle []ASPathEntry
}
enhancedRoutes := make([]EnhancedRoute, len(routes))
for i, route := range routes {
enhancedRoute := EnhancedRoute{
LiveRoute: route,
ASPathWithHandle: make([]ASPathEntry, len(route.ASPath)),
}
// Look up handle for each AS in the path
for j, asn := range route.ASPath {
handle := asinfo.GetHandle(asn)
enhancedRoute.ASPathWithHandle[j] = ASPathEntry{
Number: asn,
Handle: handle,
}
}
enhancedRoutes[i] = enhancedRoute
}
// Prepare template data // Prepare template data
data := struct { data := struct {
Prefix string Prefix string
MaskLength int MaskLength int
IPVersion int IPVersion int
Routes []database.LiveRoute Routes []EnhancedRoute
Origins []*ASNInfo Origins []*ASNInfo
PeerCount int PeerCount int
OriginCount int OriginCount int
@ -658,12 +695,20 @@ func (s *Server) handlePrefixDetail() http.HandlerFunc {
Prefix: prefix, Prefix: prefix,
MaskLength: maskLength, MaskLength: maskLength,
IPVersion: ipVersion, IPVersion: ipVersion,
Routes: routes, Routes: enhancedRoutes,
Origins: origins, Origins: origins,
PeerCount: len(routes), PeerCount: len(routes),
OriginCount: len(originMap), OriginCount: len(originMap),
} }
// Check if context is still valid before writing response
select {
case <-r.Context().Done():
// Request was cancelled, don't write response
return
default:
}
w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Type", "text/html; charset=utf-8")
tmpl := templates.PrefixDetailTemplate() tmpl := templates.PrefixDetailTemplate()
if err := tmpl.Execute(w, data); err != nil { if err := tmpl.Execute(w, data); err != nil {
@ -702,3 +747,123 @@ func (s *Server) handleIPRedirect() http.HandlerFunc {
http.Redirect(w, r, "/prefix/"+url.QueryEscape(asInfo.Prefix), http.StatusSeeOther) http.Redirect(w, r, "/prefix/"+url.QueryEscape(asInfo.Prefix), http.StatusSeeOther)
} }
} }
// handlePrefixLength shows a random sample of prefixes with the specified mask length
func (s *Server) handlePrefixLength() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
lengthStr := chi.URLParam(r, "length")
if lengthStr == "" {
http.Error(w, "Length parameter is required", http.StatusBadRequest)
return
}
maskLength, err := strconv.Atoi(lengthStr)
if err != nil {
http.Error(w, "Invalid mask length", http.StatusBadRequest)
return
}
// Determine IP version based on mask length
const (
maxIPv4MaskLength = 32
maxIPv6MaskLength = 128
)
var ipVersion int
if maskLength <= maxIPv4MaskLength {
ipVersion = 4
} else if maskLength <= maxIPv6MaskLength {
ipVersion = 6
} else {
http.Error(w, "Invalid mask length", http.StatusBadRequest)
return
}
// Get random sample of prefixes
const maxPrefixes = 500
prefixes, err := s.db.GetRandomPrefixesByLengthContext(r.Context(), maskLength, ipVersion, maxPrefixes)
if err != nil {
s.logger.Error("Failed to get prefixes by length", "error", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
// Sort prefixes for display
sort.Slice(prefixes, func(i, j int) bool {
// First compare by IP version
if prefixes[i].IPVersion != prefixes[j].IPVersion {
return prefixes[i].IPVersion < prefixes[j].IPVersion
}
// Then by prefix
return prefixes[i].Prefix < prefixes[j].Prefix
})
// Create enhanced prefixes with AS descriptions
type EnhancedPrefix struct {
database.LiveRoute
OriginASDescription string
Age string
}
enhancedPrefixes := make([]EnhancedPrefix, len(prefixes))
for i, prefix := range prefixes {
enhancedPrefixes[i] = EnhancedPrefix{
LiveRoute: prefix,
Age: formatAge(prefix.LastUpdated),
}
// Get AS description
if asInfo, ok := asinfo.Get(prefix.OriginASN); ok {
enhancedPrefixes[i].OriginASDescription = asInfo.Description
}
}
// Render template
data := map[string]interface{}{
"MaskLength": maskLength,
"IPVersion": ipVersion,
"Prefixes": enhancedPrefixes,
"Count": len(prefixes),
}
// Check if context is still valid before writing response
select {
case <-r.Context().Done():
// Request was cancelled, don't write response
return
default:
}
tmpl := templates.PrefixLengthTemplate()
if err := tmpl.Execute(w, data); err != nil {
s.logger.Error("Failed to render prefix length template", "error", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}
}
}
// formatAge returns a human-readable age string
func formatAge(timestamp time.Time) string {
age := time.Since(timestamp)
const hoursPerDay = 24
if age < time.Minute {
return "< 1m"
} else if age < time.Hour {
minutes := int(age.Minutes())
return strconv.Itoa(minutes) + "m"
} else if age < hoursPerDay*time.Hour {
hours := int(age.Hours())
return strconv.Itoa(hours) + "h"
}
days := int(age.Hours() / hoursPerDay)
return strconv.Itoa(days) + "d"
}

View File

@ -108,6 +108,7 @@ type timeoutWriter struct {
http.ResponseWriter http.ResponseWriter
mu sync.Mutex mu sync.Mutex
written bool written bool
header http.Header // cached header to prevent concurrent access
} }
func (tw *timeoutWriter) Write(b []byte) (int, error) { func (tw *timeoutWriter) Write(b []byte) (int, error) {
@ -133,6 +134,18 @@ func (tw *timeoutWriter) WriteHeader(statusCode int) {
} }
func (tw *timeoutWriter) Header() http.Header { func (tw *timeoutWriter) Header() http.Header {
tw.mu.Lock()
defer tw.mu.Unlock()
if tw.written {
// Return a copy to prevent modifications after timeout
if tw.header == nil {
tw.header = make(http.Header)
}
return tw.header
}
return tw.ResponseWriter.Header() return tw.ResponseWriter.Header()
} }
@ -153,6 +166,7 @@ func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler {
tw := &timeoutWriter{ tw := &timeoutWriter{
ResponseWriter: w, ResponseWriter: w,
header: make(http.Header),
} }
done := make(chan struct{}) done := make(chan struct{})
@ -178,8 +192,12 @@ func TimeoutMiddleware(timeout time.Duration) func(http.Handler) http.Handler {
tw.markWritten() // Prevent the handler from writing after timeout tw.markWritten() // Prevent the handler from writing after timeout
execTime := time.Since(startTime) execTime := time.Since(startTime)
// Write directly to the underlying writer since we've marked tw as written
// This is safe because markWritten() prevents the handler from writing
tw.mu.Lock()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusRequestTimeout) w.WriteHeader(http.StatusRequestTimeout)
tw.mu.Unlock()
response := map[string]interface{}{ response := map[string]interface{}{
"status": "error", "status": "error",

View File

@ -28,6 +28,7 @@ func (s *Server) setupRoutes() {
// AS and prefix detail pages // AS and prefix detail pages
r.Get("/as/{asn}", s.handleASDetail()) r.Get("/as/{asn}", s.handleASDetail())
r.Get("/prefix/{prefix}", s.handlePrefixDetail()) r.Get("/prefix/{prefix}", s.handlePrefixDetail())
r.Get("/prefixlength/{length}", s.handlePrefixLength())
r.Get("/ip/{ip}", s.handleIPRedirect()) r.Get("/ip/{ip}", s.handleIPRedirect())
// API routes // API routes

View File

@ -7,8 +7,8 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math"
"net/http" "net/http"
"os"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -19,9 +19,12 @@ import (
) )
const ( const (
risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json" risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json&" +
"client=https%3A%2F%2Fgit.eeqj.de%2Fsneak%2Froutewatch"
metricsWindowSize = 60 // seconds for rolling average metricsWindowSize = 60 // seconds for rolling average
metricsUpdateRate = time.Second metricsUpdateRate = time.Second
minBackoffDelay = 5 * time.Second
maxBackoffDelay = 320 * time.Second
metricsLogInterval = 10 * time.Second metricsLogInterval = 10 * time.Second
bytesPerKB = 1024 bytesPerKB = 1024
bytesPerMB = 1024 * 1024 bytesPerMB = 1024 * 1024
@ -95,6 +98,9 @@ func (s *Streamer) RegisterHandler(handler MessageHandler) {
info := &handlerInfo{ info := &handlerInfo{
handler: handler, handler: handler,
queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()), queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()),
metrics: handlerMetrics{
minTime: time.Duration(math.MaxInt64), // Initialize to max so first value sets the floor
},
} }
s.handlers = append(s.handlers, info) s.handlers = append(s.handlers, info)
@ -131,9 +137,7 @@ func (s *Streamer) Start() error {
} }
go func() { go func() {
if err := s.stream(ctx); err != nil { s.streamWithReconnect(ctx)
s.logger.Error("Streaming error", "error", err)
}
s.mu.Lock() s.mu.Lock()
s.running = false s.running = false
s.mu.Unlock() s.mu.Unlock()
@ -170,7 +174,7 @@ func (s *Streamer) runHandlerWorker(info *handlerInfo) {
info.metrics.totalTime += elapsed info.metrics.totalTime += elapsed
// Update min time // Update min time
if info.metrics.minTime == 0 || elapsed < info.metrics.minTime { if elapsed < info.metrics.minTime {
info.metrics.minTime = elapsed info.metrics.minTime = elapsed
} }
@ -320,6 +324,72 @@ func (s *Streamer) updateMetrics(messageBytes int) {
s.metrics.RecordMessage(int64(messageBytes)) s.metrics.RecordMessage(int64(messageBytes))
} }
// streamWithReconnect handles streaming with automatic reconnection and exponential backoff
func (s *Streamer) streamWithReconnect(ctx context.Context) {
backoffDelay := minBackoffDelay
consecutiveFailures := 0
for {
select {
case <-ctx.Done():
s.logger.Info("Stream context cancelled, stopping reconnection attempts")
return
default:
}
// Attempt to stream
startTime := time.Now()
err := s.stream(ctx)
streamDuration := time.Since(startTime)
if err == nil {
// Clean exit (context cancelled)
return
}
// Log the error
s.logger.Error("Stream disconnected",
"error", err,
"consecutive_failures", consecutiveFailures+1,
"stream_duration", streamDuration)
s.metrics.SetConnected(false)
// Check if context is cancelled
if ctx.Err() != nil {
return
}
// If we streamed for more than 30 seconds, reset the backoff
// This indicates we had a successful connection that received data
if streamDuration > 30*time.Second {
s.logger.Info("Resetting backoff delay due to successful connection",
"stream_duration", streamDuration)
backoffDelay = minBackoffDelay
consecutiveFailures = 0
} else {
// Increment consecutive failures
consecutiveFailures++
}
// Wait with exponential backoff
s.logger.Info("Waiting before reconnection attempt",
"delay_seconds", backoffDelay.Seconds(),
"consecutive_failures", consecutiveFailures)
select {
case <-ctx.Done():
return
case <-time.After(backoffDelay):
// Double the backoff delay for next time, up to max
backoffDelay *= 2
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
}
}
}
}
func (s *Streamer) stream(ctx context.Context) error { func (s *Streamer) stream(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, "GET", risLiveURL, nil) req, err := http.NewRequestWithContext(ctx, "GET", risLiveURL, nil)
if err != nil { if err != nil {
@ -390,10 +460,13 @@ func (s *Streamer) stream(ctx context.Context) error {
// Parse the message first // Parse the message first
var wrapper ristypes.RISLiveMessage var wrapper ristypes.RISLiveMessage
if err := json.Unmarshal(line, &wrapper); err != nil { if err := json.Unmarshal(line, &wrapper); err != nil {
// Output the raw line and panic on parse failure // Log the error and return to trigger reconnection
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err) s.logger.Error("Failed to parse JSON",
fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(line)) "error", err,
panic(fmt.Sprintf("JSON parse error: %v", err)) "line", string(line),
"line_length", len(line))
return fmt.Errorf("JSON parse error: %w", err)
} }
// Check if it's a ris_message wrapper // Check if it's a ris_message wrapper
@ -443,18 +516,11 @@ func (s *Streamer) stream(ctx context.Context) error {
// Peer state changes - silently ignore // Peer state changes - silently ignore
continue continue
default: default:
fmt.Fprintf( s.logger.Error("Unknown message type",
os.Stderr, "type", msg.Type,
"UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n", "line", string(line),
msg.Type,
string(line),
)
panic(
fmt.Sprintf(
"Unknown RIS message type: %s",
msg.Type,
),
) )
panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type))
} }
// Dispatch to interested handlers // Dispatch to interested handlers

View File

@ -13,7 +13,8 @@
color: #333; color: #333;
} }
.container { .container {
max-width: 1200px; width: 90%;
max-width: 1600px;
margin: 0 auto; margin: 0 auto;
background: white; background: white;
padding: 30px; padding: 30px;
@ -91,6 +92,7 @@
.route-table td { .route-table td {
padding: 12px; padding: 12px;
border-bottom: 1px solid #e0e0e0; border-bottom: 1px solid #e0e0e0;
white-space: nowrap;
} }
.route-table tr:hover { .route-table tr:hover {
background: #f8f9fa; background: #f8f9fa;
@ -114,9 +116,13 @@
font-family: monospace; font-family: monospace;
font-size: 13px; font-size: 13px;
color: #666; color: #666;
max-width: 300px; max-width: 600px;
overflow-x: auto; word-wrap: break-word;
white-space: nowrap; white-space: normal !important;
line-height: 1.5;
}
.as-path .as-link {
font-weight: 600;
} }
.age { .age {
color: #7f8c8d; color: #7f8c8d;
@ -168,7 +174,7 @@
font-size: 14px; font-size: 14px;
} }
.as-path { .as-path {
max-width: 150px; max-width: 100%;
} }
} }
</style> </style>
@ -234,7 +240,7 @@
<a href="/as/{{.OriginASN}}" class="as-link">AS{{.OriginASN}}</a> <a href="/as/{{.OriginASN}}" class="as-link">AS{{.OriginASN}}</a>
</td> </td>
<td class="peer-ip">{{.PeerIP}}</td> <td class="peer-ip">{{.PeerIP}}</td>
<td class="as-path">{{range $i, $as := .ASPath}}{{if $i}} → {{end}}{{$as}}{{end}}</td> <td class="as-path">{{range $i, $as := .ASPathWithHandle}}{{if $i}} → {{end}}<a href="/as/{{$as.Number}}" class="as-link">{{if $as.Handle}}{{$as.Handle}}{{else}}AS{{$as.Number}}{{end}}</a>{{end}}</td>
<td class="peer-ip">{{.NextHop}}</td> <td class="peer-ip">{{.NextHop}}</td>
<td>{{.LastUpdated.Format "2006-01-02 15:04:05"}}</td> <td>{{.LastUpdated.Format "2006-01-02 15:04:05"}}</td>
<td class="age">{{.LastUpdated | timeSince}}</td> <td class="age">{{.LastUpdated | timeSince}}</td>

View File

@ -0,0 +1,108 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Prefixes with /{{ .MaskLength }} - RouteWatch</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
background: #f5f5f5;
}
h1 {
color: #333;
margin-bottom: 10px;
}
.subtitle {
color: #666;
margin-bottom: 30px;
}
.info-card {
background: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
margin-bottom: 20px;
}
table {
width: 100%;
border-collapse: collapse;
background: white;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
border-radius: 8px;
overflow: hidden;
}
th {
background: #f8f9fa;
padding: 12px;
text-align: left;
font-weight: 600;
color: #333;
border-bottom: 2px solid #dee2e6;
}
td {
padding: 12px;
border-bottom: 1px solid #eee;
}
tr:last-child td {
border-bottom: none;
}
tr:hover {
background: #f8f9fa;
}
a {
color: #0066cc;
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
.prefix-link {
font-family: 'SF Mono', Monaco, 'Cascadia Mono', 'Roboto Mono', Consolas, 'Courier New', monospace;
}
.as-link {
white-space: nowrap;
}
.age {
color: #666;
font-size: 0.9em;
}
.back-link {
display: inline-block;
margin-bottom: 20px;
color: #0066cc;
}
</style>
</head>
<body>
<a href="/status" class="back-link">← Back to Status</a>
<h1>IPv{{ .IPVersion }} Prefixes with /{{ .MaskLength }}</h1>
<p class="subtitle">Showing {{ .Count }} randomly selected prefixes</p>
<table>
<thead>
<tr>
<th>Prefix</th>
<th>Age</th>
<th>Origin AS</th>
</tr>
</thead>
<tbody>
{{ range .Prefixes }}
<tr>
<td><a href="/prefix/{{ .Prefix | urlEncode }}" class="prefix-link">{{ .Prefix }}</a></td>
<td class="age">{{ .Age }}</td>
<td>
<a href="/as/{{ .OriginASN }}" class="as-link">
AS{{ .OriginASN }}{{ if .OriginASDescription }} ({{ .OriginASDescription }}){{ end }}
</a>
</td>
</tr>
{{ end }}
</tbody>
</table>
</body>
</html>

View File

@ -49,6 +49,16 @@
font-family: 'SF Mono', Monaco, 'Cascadia Mono', 'Roboto Mono', Consolas, 'Courier New', monospace; font-family: 'SF Mono', Monaco, 'Cascadia Mono', 'Roboto Mono', Consolas, 'Courier New', monospace;
color: #333; color: #333;
} }
.metric-value.metric-link {
text-decoration: underline;
text-decoration-style: dashed;
text-underline-offset: 2px;
cursor: pointer;
}
.metric-value.metric-link:hover {
color: #0066cc;
text-decoration-style: solid;
}
.connected { .connected {
color: #22c55e; color: #22c55e;
} }
@ -231,7 +241,7 @@
metric.className = 'metric'; metric.className = 'metric';
metric.innerHTML = ` metric.innerHTML = `
<span class="metric-label">/${item.mask_length}</span> <span class="metric-label">/${item.mask_length}</span>
<span class="metric-value">${formatNumber(item.count)}</span> <a href="/prefixlength/${item.mask_length}" class="metric-value metric-link">${formatNumber(item.count)}</a>
`; `;
container.appendChild(metric); container.appendChild(metric);
}); });

View File

@ -18,11 +18,15 @@ var asDetailHTML string
//go:embed prefix_detail.html //go:embed prefix_detail.html
var prefixDetailHTML string var prefixDetailHTML string
//go:embed prefix_length.html
var prefixLengthHTML string
// Templates contains all parsed templates // Templates contains all parsed templates
type Templates struct { type Templates struct {
Status *template.Template Status *template.Template
ASDetail *template.Template ASDetail *template.Template
PrefixDetail *template.Template PrefixDetail *template.Template
PrefixLength *template.Template
} }
var ( var (
@ -99,6 +103,12 @@ func initTemplates() {
if err != nil { if err != nil {
panic("failed to parse prefix detail template: " + err.Error()) panic("failed to parse prefix detail template: " + err.Error())
} }
// Parse prefix length template
defaultTemplates.PrefixLength, err = template.New("prefixLength").Funcs(funcs).Parse(prefixLengthHTML)
if err != nil {
panic("failed to parse prefix length template: " + err.Error())
}
} }
// Get returns the singleton Templates instance // Get returns the singleton Templates instance
@ -122,3 +132,8 @@ func ASDetailTemplate() *template.Template {
func PrefixDetailTemplate() *template.Template { func PrefixDetailTemplate() *template.Template {
return Get().PrefixDetail return Get().PrefixDetail
} }
// PrefixLengthTemplate returns the parsed prefix length template
func PrefixLengthTemplate() *template.Template {
return Get().PrefixLength
}

272071
log.txt

File diff suppressed because it is too large Load Diff