Compare commits

..

4 Commits

Author SHA1 Message Date
1d05372899 Fix linting errors for magic numbers in handler queue sizes
- Define constants for all handler queue capacities
- Fix integer overflow warning in metrics calculation
- Add missing blank lines before continue statements
2025-07-27 23:38:38 +02:00
76ec9f68b7 Add ASN info lookup and periodic routing table statistics
- Add handle and description columns to asns table
- Look up ASN info using asinfo package when creating new ASNs
- Remove noisy debug logging for individual route updates
- Add IPv4/IPv6 route counters and update rate tracking
- Log routing table statistics every 15 seconds with IPv4/IPv6 breakdown
- Track updates per second for both IPv4 and IPv6 routes separately
2025-07-27 23:25:23 +02:00
a555a1dee2 Replace live_routes database table with in-memory routing table
- Remove live_routes table from SQL schema and all related indexes
- Create new internal/routingtable package with thread-safe RoutingTable
- Implement RouteKey-based indexing with secondary indexes for efficient lookups
- Add RoutingTableHandler to manage in-memory routes separately from database
- Update DatabaseHandler to only handle persistent database operations
- Wire up RoutingTable through fx dependency injection
- Update server to get live route count from routing table instead of database
- Remove LiveRoutes field from database.Stats struct
- Update tests to work with new architecture
2025-07-27 23:16:19 +02:00
b49d3ce88c Switch back to CGO SQLite driver
- Replace modernc.org/sqlite with github.com/mattn/go-sqlite3
- Update connection string for go-sqlite3 syntax
- Keep all performance optimizations and pragmas

The CGO driver may provide better performance for write-heavy
workloads compared to the pure Go implementation.
2025-07-27 22:57:53 +02:00
17 changed files with 1103 additions and 427 deletions

View File

@ -15,7 +15,7 @@ lint:
golangci-lint run golangci-lint run
build: build:
go build -o bin/routewatch cmd/routewatch/main.go CGO_ENABLED=1 go build -o bin/routewatch cmd/routewatch/main.go
clean: clean:
rm -rf bin/ rm -rf bin/

14
go.mod
View File

@ -3,24 +3,16 @@ module git.eeqj.de/sneak/routewatch
go 1.24.4 go 1.24.4
require ( require (
github.com/go-chi/chi/v5 v5.2.2
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/mattn/go-sqlite3 v1.14.29
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9
go.uber.org/fx v1.24.0 go.uber.org/fx v1.24.0
modernc.org/sqlite v1.38.1
) )
require ( require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/go-chi/chi/v5 v5.2.2 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
go.uber.org/dig v1.19.0 // indirect go.uber.org/dig v1.19.0 // indirect
go.uber.org/multierr v1.10.0 // indirect go.uber.org/multierr v1.10.0 // indirect
go.uber.org/zap v1.26.0 // indirect go.uber.org/zap v1.26.0 // indirect
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
golang.org/x/sys v0.34.0 // indirect golang.org/x/sys v0.34.0 // indirect
modernc.org/libc v1.66.3 // indirect
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.11.0 // indirect
) )

47
go.sum
View File

@ -1,23 +1,15 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618= github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618=
github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-sqlite3 v1.14.29 h1:1O6nRLJKvsi1H2Sj0Hzdfojwt8GiGKm+LOfLaBFaouQ=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.29/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg=
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4= go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4=
@ -30,42 +22,7 @@ go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo=
golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/cc/v4 v4.26.2 h1:991HMkLjJzYBIfha6ECZdjrIYz2/1ayr+FL8GN+CNzM=
modernc.org/cc/v4 v4.26.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
modernc.org/ccgo/v4 v4.28.0 h1:rjznn6WWehKq7dG4JtLRKxb52Ecv8OUGah8+Z/SfpNU=
modernc.org/ccgo/v4 v4.28.0/go.mod h1:JygV3+9AV6SmPhDasu4JgquwU81XAKLd3OKTUDNOiKE=
modernc.org/fileutil v1.3.8 h1:qtzNm7ED75pd1C7WgAGcK4edm4fvhtBsEiI/0NQ54YM=
modernc.org/fileutil v1.3.8/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc=
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks=
modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI=
modernc.org/libc v1.66.3 h1:cfCbjTUcdsKyyZZfEUKfoHcP3S0Wkvz3jgSzByEWVCQ=
modernc.org/libc v1.66.3/go.mod h1:XD9zO8kt59cANKvHPXpx7yS2ELPheAey0vjIuZOhOU8=
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
modernc.org/sqlite v1.38.1 h1:jNnIjleVta+DKSAr3TnkKK87EEhjPhBLzi6hvIX9Bas=
modernc.org/sqlite v1.38.1/go.mod h1:cPTJYSlgg3Sfg046yBShXENNtPrWrDX8bsbAQBzgQ5E=
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=

View File

@ -11,8 +11,9 @@ import (
"runtime" "runtime"
"time" "time"
"git.eeqj.de/sneak/routewatch/pkg/asinfo"
"github.com/google/uuid" "github.com/google/uuid"
_ "modernc.org/sqlite" // Pure Go SQLite driver _ "github.com/mattn/go-sqlite3" // CGO SQLite driver
) )
//go:embed schema.sql //go:embed schema.sql
@ -94,10 +95,10 @@ func NewWithConfig(config Config, logger *slog.Logger) (*Database, error) {
return nil, fmt.Errorf("failed to create database directory: %w", err) return nil, fmt.Errorf("failed to create database directory: %w", err)
} }
// Add connection parameters for modernc.org/sqlite // Add connection parameters for go-sqlite3
// Enable WAL mode and other performance optimizations // Enable WAL mode and other performance optimizations
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&_cache_size=-512000", config.Path) dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&cache=shared", config.Path)
db, err := sql.Open("sqlite", dsn) db, err := sql.Open("sqlite3", dsn)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err) return nil, fmt.Errorf("failed to open database: %w", err)
} }
@ -126,7 +127,7 @@ func (d *Database) Initialize() error {
pragmas := []string{ pragmas := []string{
"PRAGMA journal_mode=WAL", // Already set in connection string "PRAGMA journal_mode=WAL", // Already set in connection string
"PRAGMA synchronous=NORMAL", // Faster than FULL, still safe "PRAGMA synchronous=NORMAL", // Faster than FULL, still safe
"PRAGMA cache_size=-524288", // 512MB cache "PRAGMA cache_size=-524288", // 512MB cache (negative = KB)
"PRAGMA temp_store=MEMORY", // Use memory for temp tables "PRAGMA temp_store=MEMORY", // Use memory for temp tables
"PRAGMA mmap_size=268435456", // 256MB memory-mapped I/O "PRAGMA mmap_size=268435456", // 256MB memory-mapped I/O
"PRAGMA wal_autocheckpoint=1000", // Checkpoint every 1000 pages "PRAGMA wal_autocheckpoint=1000", // Checkpoint every 1000 pages
@ -174,12 +175,15 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
var asn ASN var asn ASN
var idStr string var idStr string
err = tx.QueryRow("SELECT id, number, first_seen, last_seen FROM asns WHERE number = ?", number). var handle, description sql.NullString
Scan(&idStr, &asn.Number, &asn.FirstSeen, &asn.LastSeen) err = tx.QueryRow("SELECT id, number, handle, description, first_seen, last_seen FROM asns WHERE number = ?", number).
Scan(&idStr, &asn.Number, &handle, &description, &asn.FirstSeen, &asn.LastSeen)
if err == nil { if err == nil {
// ASN exists, update last_seen // ASN exists, update last_seen
asn.ID, _ = uuid.Parse(idStr) asn.ID, _ = uuid.Parse(idStr)
asn.Handle = handle.String
asn.Description = description.String
_, err = tx.Exec("UPDATE asns SET last_seen = ? WHERE id = ?", timestamp, asn.ID.String()) _, err = tx.Exec("UPDATE asns SET last_seen = ? WHERE id = ?", timestamp, asn.ID.String())
if err != nil { if err != nil {
return nil, err return nil, err
@ -199,15 +203,22 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
return nil, err return nil, err
} }
// ASN doesn't exist, create it // ASN doesn't exist, create it with ASN info lookup
asn = ASN{ asn = ASN{
ID: generateUUID(), ID: generateUUID(),
Number: number, Number: number,
FirstSeen: timestamp, FirstSeen: timestamp,
LastSeen: timestamp, LastSeen: timestamp,
} }
_, err = tx.Exec("INSERT INTO asns (id, number, first_seen, last_seen) VALUES (?, ?, ?, ?)",
asn.ID.String(), asn.Number, asn.FirstSeen, asn.LastSeen) // Look up ASN info
if info, ok := asinfo.Get(number); ok {
asn.Handle = info.Handle
asn.Description = info.Description
}
_, err = tx.Exec("INSERT INTO asns (id, number, handle, description, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?)",
asn.ID.String(), asn.Number, asn.Handle, asn.Description, asn.FirstSeen, asn.LastSeen)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -340,75 +351,6 @@ func (d *Database) RecordPeering(fromASNID, toASNID string, timestamp time.Time)
return nil return nil
} }
// UpdateLiveRoute updates the live routing table for an announcement
func (d *Database) UpdateLiveRoute(
prefixID, originASNID uuid.UUID,
peerASN int,
nextHop string,
timestamp time.Time,
) error {
// Use SQLite's UPSERT capability to avoid the SELECT+UPDATE/INSERT pattern
// This reduces the number of queries and improves performance
// Note: We removed the WHERE clause from ON CONFLICT UPDATE because
// if we're updating, we want to update regardless of withdrawn_at status
err := d.exec(`
INSERT INTO live_routes (id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at, withdrawn_at)
VALUES (?, ?, ?, ?, ?, ?, NULL)
ON CONFLICT(prefix_id, origin_asn_id, peer_asn) DO UPDATE SET
next_hop = excluded.next_hop,
announced_at = excluded.announced_at,
withdrawn_at = NULL`,
generateUUID().String(), prefixID.String(), originASNID.String(),
peerASN, nextHop, timestamp)
return err
}
// WithdrawLiveRoute marks a route as withdrawn in the live routing table
func (d *Database) WithdrawLiveRoute(prefixID uuid.UUID, peerASN int, timestamp time.Time) error {
err := d.exec(`
UPDATE live_routes
SET withdrawn_at = ?
WHERE prefix_id = ? AND peer_asn = ? AND withdrawn_at IS NULL`,
timestamp, prefixID.String(), peerASN)
return err
}
// GetActiveLiveRoutes returns all currently active routes (not withdrawn)
func (d *Database) GetActiveLiveRoutes() ([]LiveRoute, error) {
rows, err := d.query(`
SELECT id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at
FROM live_routes
WHERE withdrawn_at IS NULL
ORDER BY announced_at DESC`)
if err != nil {
return nil, err
}
defer func() {
_ = rows.Close()
}()
var routes []LiveRoute
for rows.Next() {
var route LiveRoute
var idStr, prefixIDStr, originASNIDStr string
err := rows.Scan(&idStr, &prefixIDStr, &originASNIDStr,
&route.PeerASN, &route.NextHop, &route.AnnouncedAt)
if err != nil {
return nil, err
}
route.ID, _ = uuid.Parse(idStr)
route.PrefixID, _ = uuid.Parse(prefixIDStr)
route.OriginASNID, _ = uuid.Parse(originASNIDStr)
routes = append(routes, route)
}
return routes, rows.Err()
}
// UpdatePeer updates or creates a BGP peer record // UpdatePeer updates or creates a BGP peer record
func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error { func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
tx, err := d.beginTx() tx, err := d.beginTx()
@ -495,13 +437,6 @@ func (d *Database) GetStats() (Stats, error) {
return stats, err return stats, err
} }
// Count live routes
d.logger.Info("Counting live routes")
err = d.queryRow("SELECT COUNT(*) FROM live_routes WHERE withdrawn_at IS NULL").Scan(&stats.LiveRoutes)
if err != nil {
return stats, err
}
d.logger.Info("Stats collection complete") d.logger.Info("Stats collection complete")
return stats, nil return stats, nil

View File

@ -2,8 +2,6 @@ package database
import ( import (
"time" "time"
"github.com/google/uuid"
) )
// Stats contains database statistics // Stats contains database statistics
@ -13,7 +11,6 @@ type Stats struct {
IPv4Prefixes int IPv4Prefixes int
IPv6Prefixes int IPv6Prefixes int
Peerings int Peerings int
LiveRoutes int
} }
// Store defines the interface for database operations // Store defines the interface for database operations
@ -30,11 +27,6 @@ type Store interface {
// Peering operations // Peering operations
RecordPeering(fromASNID, toASNID string, timestamp time.Time) error RecordPeering(fromASNID, toASNID string, timestamp time.Time) error
// Live route operations
UpdateLiveRoute(prefixID, originASNID uuid.UUID, peerASN int, nextHop string, timestamp time.Time) error
WithdrawLiveRoute(prefixID uuid.UUID, peerASN int, timestamp time.Time) error
GetActiveLiveRoutes() ([]LiveRoute, error)
// Statistics // Statistics
GetStats() (Stats, error) GetStats() (Stats, error)

View File

@ -10,6 +10,8 @@ import (
type ASN struct { type ASN struct {
ID uuid.UUID `json:"id"` ID uuid.UUID `json:"id"`
Number int `json:"number"` Number int `json:"number"`
Handle string `json:"handle"`
Description string `json:"description"`
FirstSeen time.Time `json:"first_seen"` FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"` LastSeen time.Time `json:"last_seen"`
} }
@ -43,15 +45,3 @@ type ASNPeering struct {
FirstSeen time.Time `json:"first_seen"` FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"` LastSeen time.Time `json:"last_seen"`
} }
// LiveRoute represents the current state of a route in the live routing table
type LiveRoute struct {
ID uuid.UUID `json:"id"`
PrefixID uuid.UUID `json:"prefix_id"`
OriginASNID uuid.UUID `json:"origin_asn_id"`
PeerASN int `json:"peer_asn"`
Path string `json:"path"`
NextHop string `json:"next_hop"`
AnnouncedAt time.Time `json:"announced_at"`
WithdrawnAt *time.Time `json:"withdrawn_at"`
}

View File

@ -1,6 +1,8 @@
CREATE TABLE IF NOT EXISTS asns ( CREATE TABLE IF NOT EXISTS asns (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
number INTEGER UNIQUE NOT NULL, number INTEGER UNIQUE NOT NULL,
handle TEXT,
description TEXT,
first_seen DATETIME NOT NULL, first_seen DATETIME NOT NULL,
last_seen DATETIME NOT NULL last_seen DATETIME NOT NULL
); );
@ -48,20 +50,6 @@ CREATE TABLE IF NOT EXISTS bgp_peers (
last_message_type TEXT last_message_type TEXT
); );
-- Live routing table: current state of announced routes
CREATE TABLE IF NOT EXISTS live_routes (
id TEXT PRIMARY KEY,
prefix_id TEXT NOT NULL,
origin_asn_id TEXT NOT NULL,
peer_asn INTEGER NOT NULL,
next_hop TEXT,
announced_at DATETIME NOT NULL,
withdrawn_at DATETIME,
FOREIGN KEY (prefix_id) REFERENCES prefixes(id),
FOREIGN KEY (origin_asn_id) REFERENCES asns(id),
UNIQUE(prefix_id, origin_asn_id, peer_asn)
);
CREATE INDEX IF NOT EXISTS idx_prefixes_ip_version ON prefixes(ip_version); CREATE INDEX IF NOT EXISTS idx_prefixes_ip_version ON prefixes(ip_version);
CREATE INDEX IF NOT EXISTS idx_prefixes_version_prefix ON prefixes(ip_version, prefix); CREATE INDEX IF NOT EXISTS idx_prefixes_version_prefix ON prefixes(ip_version, prefix);
CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp); CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp);
@ -71,43 +59,6 @@ CREATE INDEX IF NOT EXISTS idx_asn_peerings_from_asn ON asn_peerings(from_asn_id
CREATE INDEX IF NOT EXISTS idx_asn_peerings_to_asn ON asn_peerings(to_asn_id); CREATE INDEX IF NOT EXISTS idx_asn_peerings_to_asn ON asn_peerings(to_asn_id);
CREATE INDEX IF NOT EXISTS idx_asn_peerings_lookup ON asn_peerings(from_asn_id, to_asn_id); CREATE INDEX IF NOT EXISTS idx_asn_peerings_lookup ON asn_peerings(from_asn_id, to_asn_id);
-- Indexes for live routes table
CREATE INDEX IF NOT EXISTS idx_live_routes_active
ON live_routes(prefix_id, origin_asn_id)
WHERE withdrawn_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_live_routes_origin
ON live_routes(origin_asn_id)
WHERE withdrawn_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_live_routes_prefix
ON live_routes(prefix_id)
WHERE withdrawn_at IS NULL;
-- Critical index for the most common query pattern
CREATE INDEX IF NOT EXISTS idx_live_routes_lookup
ON live_routes(prefix_id, origin_asn_id, peer_asn)
WHERE withdrawn_at IS NULL;
-- Index for withdrawal updates by prefix and peer
CREATE INDEX IF NOT EXISTS idx_live_routes_withdraw
ON live_routes(prefix_id, peer_asn)
WHERE withdrawn_at IS NULL;
-- Covering index for SELECT id queries (includes id in index)
CREATE INDEX IF NOT EXISTS idx_live_routes_covering
ON live_routes(prefix_id, origin_asn_id, peer_asn, id)
WHERE withdrawn_at IS NULL;
-- Index for UPDATE by id operations
CREATE INDEX IF NOT EXISTS idx_live_routes_id
ON live_routes(id);
-- Index for stats queries
CREATE INDEX IF NOT EXISTS idx_live_routes_stats
ON live_routes(withdrawn_at)
WHERE withdrawn_at IS NULL;
-- Additional indexes for prefixes table -- Additional indexes for prefixes table
CREATE INDEX IF NOT EXISTS idx_prefixes_prefix ON prefixes(prefix); CREATE INDEX IF NOT EXISTS idx_prefixes_prefix ON prefixes(prefix);

View File

@ -26,6 +26,7 @@ func (d *Database) queryRow(query string, args ...interface{}) *sql.Row {
} }
// query wraps Query with slow query logging // query wraps Query with slow query logging
// nolint:unused // kept for future use to ensure all queries go through slow query logging
func (d *Database) query(query string, args ...interface{}) (*sql.Rows, error) { func (d *Database) query(query string, args ...interface{}) (*sql.Rows, error) {
start := time.Now() start := time.Now()
defer logSlowQuery(d.logger, query, start) defer logSlowQuery(d.logger, query, start)

View File

@ -4,6 +4,7 @@ package routewatch
import ( import (
"context" "context"
"fmt"
"log/slog" "log/slog"
"os" "os"
"strings" "strings"
@ -11,6 +12,7 @@ import (
"git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/metrics" "git.eeqj.de/sneak/routewatch/internal/metrics"
"git.eeqj.de/sneak/routewatch/internal/routingtable"
"git.eeqj.de/sneak/routewatch/internal/server" "git.eeqj.de/sneak/routewatch/internal/server"
"git.eeqj.de/sneak/routewatch/internal/streamer" "git.eeqj.de/sneak/routewatch/internal/streamer"
@ -22,6 +24,11 @@ type Config struct {
MaxRuntime time.Duration // Maximum runtime (0 = run forever) MaxRuntime time.Duration // Maximum runtime (0 = run forever)
} }
const (
// routingTableStatsInterval is how often we log routing table statistics
routingTableStatsInterval = 15 * time.Second
)
// NewConfig provides default configuration // NewConfig provides default configuration
func NewConfig() Config { func NewConfig() Config {
return Config{ return Config{
@ -34,6 +41,7 @@ type Dependencies struct {
fx.In fx.In
DB database.Store DB database.Store
RoutingTable *routingtable.RoutingTable
Streamer *streamer.Streamer Streamer *streamer.Streamer
Server *server.Server Server *server.Server
Logger *slog.Logger Logger *slog.Logger
@ -43,6 +51,7 @@ type Dependencies struct {
// RouteWatch represents the main application instance // RouteWatch represents the main application instance
type RouteWatch struct { type RouteWatch struct {
db database.Store db database.Store
routingTable *routingtable.RoutingTable
streamer *streamer.Streamer streamer *streamer.Streamer
server *server.Server server *server.Server
logger *slog.Logger logger *slog.Logger
@ -53,6 +62,7 @@ type RouteWatch struct {
func New(deps Dependencies) *RouteWatch { func New(deps Dependencies) *RouteWatch {
return &RouteWatch{ return &RouteWatch{
db: deps.DB, db: deps.DB,
routingTable: deps.RoutingTable,
streamer: deps.Streamer, streamer: deps.Streamer,
server: deps.Server, server: deps.Server,
logger: deps.Logger, logger: deps.Logger,
@ -76,10 +86,17 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
dbHandler := NewDatabaseHandler(rw.db, rw.logger) dbHandler := NewDatabaseHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(dbHandler) rw.streamer.RegisterHandler(dbHandler)
// Register routing table handler to maintain in-memory routing table
rtHandler := NewRoutingTableHandler(rw.routingTable, rw.logger)
rw.streamer.RegisterHandler(rtHandler)
// Register peer tracking handler to track all peers // Register peer tracking handler to track all peers
peerHandler := NewPeerHandler(rw.db, rw.logger) peerHandler := NewPeerHandler(rw.db, rw.logger)
rw.streamer.RegisterHandler(peerHandler) rw.streamer.RegisterHandler(peerHandler)
// Start periodic routing table stats logging
go rw.logRoutingTableStats(ctx)
// Start streaming // Start streaming
if err := rw.streamer.Start(); err != nil { if err := rw.streamer.Start(); err != nil {
return err return err
@ -117,6 +134,32 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
return nil return nil
} }
// logRoutingTableStats periodically logs routing table statistics
func (rw *RouteWatch) logRoutingTableStats(ctx context.Context) {
// Log stats periodically
ticker := time.NewTicker(routingTableStatsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
stats := rw.routingTable.GetDetailedStats()
rw.logger.Info("Routing table statistics",
"ipv4_routes", stats.IPv4Routes,
"ipv6_routes", stats.IPv6Routes,
"ipv4_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv4UpdatesRate),
"ipv6_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv6UpdatesRate),
"total_routes", stats.TotalRoutes,
"unique_prefixes", stats.UniquePrefixes,
"unique_origins", stats.UniqueOrigins,
"unique_peers", stats.UniquePeers,
)
}
}
}
// NewLogger creates a structured logger // NewLogger creates a structured logger
func NewLogger() *slog.Logger { func NewLogger() *slog.Logger {
level := slog.LevelInfo level := slog.LevelInfo
@ -154,8 +197,12 @@ func getModule() fx.Option {
}, },
fx.As(new(database.Store)), fx.As(new(database.Store)),
), ),
routingtable.New,
streamer.New, streamer.New,
fx.Annotate(
server.New, server.New,
fx.ParamTags(``, ``, ``, ``),
),
New, New,
), ),
) )

View File

@ -9,6 +9,7 @@ import (
"git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/metrics" "git.eeqj.de/sneak/routewatch/internal/metrics"
"git.eeqj.de/sneak/routewatch/internal/routingtable"
"git.eeqj.de/sneak/routewatch/internal/server" "git.eeqj.de/sneak/routewatch/internal/server"
"git.eeqj.de/sneak/routewatch/internal/streamer" "git.eeqj.de/sneak/routewatch/internal/streamer"
"github.com/google/uuid" "github.com/google/uuid"
@ -129,35 +130,6 @@ func (m *mockStore) RecordPeering(fromASNID, toASNID string, _ time.Time) error
return nil return nil
} }
// UpdateLiveRoute mock implementation
func (m *mockStore) UpdateLiveRoute(prefixID, originASNID uuid.UUID, peerASN int, _ string, _ time.Time) error {
m.mu.Lock()
defer m.mu.Unlock()
key := prefixID.String() + "_" + originASNID.String() + "_" + string(rune(peerASN))
if !m.Routes[key] {
m.Routes[key] = true
m.RouteCount++
}
return nil
}
// WithdrawLiveRoute mock implementation
func (m *mockStore) WithdrawLiveRoute(_ uuid.UUID, _ int, _ time.Time) error {
m.mu.Lock()
defer m.mu.Unlock()
m.WithdrawalCount++
return nil
}
// GetActiveLiveRoutes mock implementation
func (m *mockStore) GetActiveLiveRoutes() ([]database.LiveRoute, error) {
return []database.LiveRoute{}, nil
}
// UpdatePeer mock implementation // UpdatePeer mock implementation
func (m *mockStore) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error { func (m *mockStore) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
// Simple mock - just return nil // Simple mock - just return nil
@ -180,7 +152,6 @@ func (m *mockStore) GetStats() (database.Stats, error) {
IPv4Prefixes: m.IPv4Prefixes, IPv4Prefixes: m.IPv4Prefixes,
IPv6Prefixes: m.IPv6Prefixes, IPv6Prefixes: m.IPv6Prefixes,
Peerings: m.PeeringCount, Peerings: m.PeeringCount,
LiveRoutes: m.RouteCount,
}, nil }, nil
} }
@ -197,12 +168,16 @@ func TestRouteWatchLiveFeed(t *testing.T) {
// Create streamer // Create streamer
s := streamer.New(logger, metricsTracker) s := streamer.New(logger, metricsTracker)
// Create routing table
rt := routingtable.New()
// Create server // Create server
srv := server.New(mockDB, s, logger) srv := server.New(mockDB, rt, s, logger)
// Create RouteWatch with 5 second limit // Create RouteWatch with 5 second limit
deps := Dependencies{ deps := Dependencies{
DB: mockDB, DB: mockDB,
RoutingTable: rt,
Streamer: s, Streamer: s,
Server: srv, Server: srv,
Logger: logger, Logger: logger,
@ -242,8 +217,4 @@ func TestRouteWatchLiveFeed(t *testing.T) {
} }
t.Logf("Recorded %d AS peering relationships in 5 seconds", stats.Peerings) t.Logf("Recorded %d AS peering relationships in 5 seconds", stats.Peerings)
if stats.LiveRoutes == 0 {
t.Error("Expected to have some active routes")
}
t.Logf("Active routes: %d", stats.LiveRoutes)
} }

View File

@ -2,12 +2,16 @@ package routewatch
import ( import (
"log/slog" "log/slog"
"strconv"
"git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/ristypes" "git.eeqj.de/sneak/routewatch/internal/ristypes"
) )
const (
// databaseHandlerQueueSize is the queue capacity for database operations
databaseHandlerQueueSize = 100
)
// DatabaseHandler handles BGP messages and stores them in the database // DatabaseHandler handles BGP messages and stores them in the database
type DatabaseHandler struct { type DatabaseHandler struct {
db database.Store db database.Store
@ -28,19 +32,17 @@ func (h *DatabaseHandler) WantsMessage(messageType string) bool {
return messageType == "UPDATE" return messageType == "UPDATE"
} }
// QueueCapacity returns the desired queue capacity for this handler
func (h *DatabaseHandler) QueueCapacity() int {
// Database operations are slow, so use a smaller queue
return databaseHandlerQueueSize
}
// HandleMessage processes a RIS message and updates the database // HandleMessage processes a RIS message and updates the database
func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) { func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp // Use the pre-parsed timestamp
timestamp := msg.ParsedTimestamp timestamp := msg.ParsedTimestamp
// Parse peer ASN
peerASN, err := strconv.Atoi(msg.PeerASN)
if err != nil {
h.logger.Error("Failed to parse peer ASN", "peer_asn", msg.PeerASN, "error", err)
return
}
// Get origin ASN from path (last element) // Get origin ASN from path (last element)
var originASN int var originASN int
if len(msg.Path) > 0 { if len(msg.Path) > 0 {
@ -51,7 +53,7 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
for _, announcement := range msg.Announcements { for _, announcement := range msg.Announcements {
for _, prefix := range announcement.Prefixes { for _, prefix := range announcement.Prefixes {
// Get or create prefix // Get or create prefix
p, err := h.db.GetOrCreatePrefix(prefix, timestamp) _, err := h.db.GetOrCreatePrefix(prefix, timestamp)
if err != nil { if err != nil {
h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err) h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
@ -59,30 +61,13 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
} }
// Get or create origin ASN // Get or create origin ASN
asn, err := h.db.GetOrCreateASN(originASN, timestamp) _, err = h.db.GetOrCreateASN(originASN, timestamp)
if err != nil { if err != nil {
h.logger.Error("Failed to get/create ASN", "asn", originASN, "error", err) h.logger.Error("Failed to get/create ASN", "asn", originASN, "error", err)
continue continue
} }
// Update live route
err = h.db.UpdateLiveRoute(
p.ID,
asn.ID,
peerASN,
announcement.NextHop,
timestamp,
)
if err != nil {
h.logger.Error("Failed to update live route",
"prefix", prefix,
"origin_asn", originASN,
"peer_asn", peerASN,
"error", err,
)
}
// TODO: Record the announcement in the announcements table // TODO: Record the announcement in the announcements table
// Process AS path to update peerings // Process AS path to update peerings
if len(msg.Path) > 1 { if len(msg.Path) > 1 {
@ -122,23 +107,13 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
// Process withdrawals // Process withdrawals
for _, prefix := range msg.Withdrawals { for _, prefix := range msg.Withdrawals {
// Get prefix // Get prefix
p, err := h.db.GetOrCreatePrefix(prefix, timestamp) _, err := h.db.GetOrCreatePrefix(prefix, timestamp)
if err != nil { if err != nil {
h.logger.Error("Failed to get prefix for withdrawal", "prefix", prefix, "error", err) h.logger.Error("Failed to get prefix for withdrawal", "prefix", prefix, "error", err)
continue continue
} }
// Withdraw the route // TODO: Record the withdrawal in the announcements table as a withdrawal
err = h.db.WithdrawLiveRoute(p.ID, peerASN, timestamp)
if err != nil {
h.logger.Error("Failed to withdraw route",
"prefix", prefix,
"peer_asn", peerASN,
"error", err,
)
}
// TODO: Record the withdrawal in the withdrawals table
} }
} }

View File

@ -8,6 +8,11 @@ import (
"git.eeqj.de/sneak/routewatch/internal/ristypes" "git.eeqj.de/sneak/routewatch/internal/ristypes"
) )
const (
// peerHandlerQueueSize is the queue capacity for peer tracking operations
peerHandlerQueueSize = 500
)
// PeerHandler tracks BGP peers from all message types // PeerHandler tracks BGP peers from all message types
type PeerHandler struct { type PeerHandler struct {
db database.Store db database.Store
@ -27,6 +32,12 @@ func (h *PeerHandler) WantsMessage(_ string) bool {
return true return true
} }
// QueueCapacity returns the desired queue capacity for this handler
func (h *PeerHandler) QueueCapacity() int {
// Peer tracking is lightweight but involves database ops, use moderate queue
return peerHandlerQueueSize
}
// HandleMessage processes a message to track peer information // HandleMessage processes a message to track peer information
func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) { func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) {
// Parse peer ASN from string // Parse peer ASN from string

View File

@ -0,0 +1,131 @@
package routewatch
import (
"log/slog"
"strconv"
"git.eeqj.de/sneak/routewatch/internal/ristypes"
"git.eeqj.de/sneak/routewatch/internal/routingtable"
"github.com/google/uuid"
)
const (
// routingTableHandlerQueueSize is the queue capacity for in-memory routing table operations
routingTableHandlerQueueSize = 10000
)
// RoutingTableHandler handles BGP messages and updates the in-memory routing table
type RoutingTableHandler struct {
rt *routingtable.RoutingTable
logger *slog.Logger
}
// NewRoutingTableHandler creates a new routing table handler
func NewRoutingTableHandler(rt *routingtable.RoutingTable, logger *slog.Logger) *RoutingTableHandler {
return &RoutingTableHandler{
rt: rt,
logger: logger,
}
}
// WantsMessage returns true if this handler wants to process messages of the given type
func (h *RoutingTableHandler) WantsMessage(messageType string) bool {
// We only care about UPDATE messages for the routing table
return messageType == "UPDATE"
}
// QueueCapacity returns the desired queue capacity for this handler
func (h *RoutingTableHandler) QueueCapacity() int {
// In-memory operations are very fast, so use a large queue
return routingTableHandlerQueueSize
}
// HandleMessage processes a RIS message and updates the routing table
func (h *RoutingTableHandler) HandleMessage(msg *ristypes.RISMessage) {
// Use the pre-parsed timestamp
timestamp := msg.ParsedTimestamp
// Parse peer ASN
peerASN, err := strconv.Atoi(msg.PeerASN)
if err != nil {
h.logger.Error("Failed to parse peer ASN", "peer_asn", msg.PeerASN, "error", err)
return
}
// Get origin ASN from path (last element)
var originASN int
if len(msg.Path) > 0 {
originASN = msg.Path[len(msg.Path)-1]
}
// Process announcements
for _, announcement := range msg.Announcements {
for _, prefix := range announcement.Prefixes {
// Generate deterministic UUIDs based on the prefix and origin ASN
// This ensures consistency across restarts
prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix))
originASNID := uuid.NewSHA1(uuid.NameSpaceOID, []byte(strconv.Itoa(originASN)))
// Create route for the routing table
route := &routingtable.Route{
PrefixID: prefixID,
Prefix: prefix,
OriginASNID: originASNID,
OriginASN: originASN,
PeerASN: peerASN,
ASPath: msg.Path,
NextHop: announcement.NextHop,
AnnouncedAt: timestamp,
}
// Add route to routing table
h.rt.AddRoute(route)
}
}
// Process withdrawals
for _, prefix := range msg.Withdrawals {
// Generate deterministic UUID for the prefix
prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix))
// Withdraw all routes for this prefix from this peer
h.rt.WithdrawRoutesByPrefixAndPeer(prefixID, peerASN)
}
}
// GetRoutingTableStats returns statistics about the routing table
func (h *RoutingTableHandler) GetRoutingTableStats() map[string]int {
return h.rt.Stats()
}
// GetActiveRouteCount returns the number of active routes
func (h *RoutingTableHandler) GetActiveRouteCount() int {
return h.rt.Size()
}
// GetRoutesByPrefix returns all routes for a specific prefix
func (h *RoutingTableHandler) GetRoutesByPrefix(prefixID uuid.UUID) []*routingtable.Route {
return h.rt.GetRoutesByPrefix(prefixID)
}
// GetRoutesByOriginASN returns all routes originated by a specific ASN
func (h *RoutingTableHandler) GetRoutesByOriginASN(originASNID uuid.UUID) []*routingtable.Route {
return h.rt.GetRoutesByOriginASN(originASNID)
}
// GetRoutesByPeerASN returns all routes received from a specific peer ASN
func (h *RoutingTableHandler) GetRoutesByPeerASN(peerASN int) []*routingtable.Route {
return h.rt.GetRoutesByPeerASN(peerASN)
}
// GetAllRoutes returns all active routes
func (h *RoutingTableHandler) GetAllRoutes() []*routingtable.Route {
return h.rt.GetAllRoutes()
}
// ClearRoutingTable clears all routes from the routing table
func (h *RoutingTableHandler) ClearRoutingTable() {
h.rt.Clear()
h.logger.Info("Cleared routing table")
}

View File

@ -0,0 +1,397 @@
// Package routingtable provides a thread-safe in-memory representation of the DFZ routing table.
package routingtable
import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
)
// Route represents a single route entry in the routing table
type Route struct {
PrefixID uuid.UUID `json:"prefix_id"`
Prefix string `json:"prefix"` // The actual prefix string (e.g., "10.0.0.0/8")
OriginASNID uuid.UUID `json:"origin_asn_id"`
OriginASN int `json:"origin_asn"` // The actual ASN number
PeerASN int `json:"peer_asn"`
ASPath []int `json:"as_path"` // Full AS path
NextHop string `json:"next_hop"`
AnnouncedAt time.Time `json:"announced_at"`
}
// RouteKey uniquely identifies a route in the table
type RouteKey struct {
PrefixID uuid.UUID
OriginASNID uuid.UUID
PeerASN int
}
// RoutingTable is a thread-safe in-memory routing table
type RoutingTable struct {
mu sync.RWMutex
routes map[RouteKey]*Route
// Secondary indexes for efficient lookups
byPrefix map[uuid.UUID]map[RouteKey]*Route // Routes indexed by prefix ID
byOriginASN map[uuid.UUID]map[RouteKey]*Route // Routes indexed by origin ASN ID
byPeerASN map[int]map[RouteKey]*Route // Routes indexed by peer ASN
// Metrics tracking
ipv4Routes int
ipv6Routes int
ipv4Updates uint64 // Updates counter for rate calculation
ipv6Updates uint64 // Updates counter for rate calculation
lastMetricsReset time.Time
}
// New creates a new empty routing table
func New() *RoutingTable {
return &RoutingTable{
routes: make(map[RouteKey]*Route),
byPrefix: make(map[uuid.UUID]map[RouteKey]*Route),
byOriginASN: make(map[uuid.UUID]map[RouteKey]*Route),
byPeerASN: make(map[int]map[RouteKey]*Route),
lastMetricsReset: time.Now(),
}
}
// AddRoute adds or updates a route in the routing table
func (rt *RoutingTable) AddRoute(route *Route) {
rt.mu.Lock()
defer rt.mu.Unlock()
key := RouteKey{
PrefixID: route.PrefixID,
OriginASNID: route.OriginASNID,
PeerASN: route.PeerASN,
}
// If route already exists, remove it from indexes first
if existingRoute, exists := rt.routes[key]; exists {
rt.removeFromIndexes(key, existingRoute)
// Decrement counter for existing route
if isIPv6(existingRoute.Prefix) {
rt.ipv6Routes--
} else {
rt.ipv4Routes--
}
}
// Add to main map
rt.routes[key] = route
// Update indexes
rt.addToIndexes(key, route)
// Update metrics
if isIPv6(route.Prefix) {
rt.ipv6Routes++
atomic.AddUint64(&rt.ipv6Updates, 1)
} else {
rt.ipv4Routes++
atomic.AddUint64(&rt.ipv4Updates, 1)
}
}
// RemoveRoute removes a route from the routing table
func (rt *RoutingTable) RemoveRoute(prefixID, originASNID uuid.UUID, peerASN int) bool {
rt.mu.Lock()
defer rt.mu.Unlock()
key := RouteKey{
PrefixID: prefixID,
OriginASNID: originASNID,
PeerASN: peerASN,
}
route, exists := rt.routes[key]
if !exists {
return false
}
// Remove from indexes
rt.removeFromIndexes(key, route)
// Remove from main map
delete(rt.routes, key)
// Update metrics
if isIPv6(route.Prefix) {
rt.ipv6Routes--
atomic.AddUint64(&rt.ipv6Updates, 1)
} else {
rt.ipv4Routes--
atomic.AddUint64(&rt.ipv4Updates, 1)
}
return true
}
// WithdrawRoutesByPrefixAndPeer removes all routes for a specific prefix from a specific peer
func (rt *RoutingTable) WithdrawRoutesByPrefixAndPeer(prefixID uuid.UUID, peerASN int) int {
rt.mu.Lock()
defer rt.mu.Unlock()
prefixRoutes, exists := rt.byPrefix[prefixID]
if !exists {
return 0
}
// Collect keys to delete (can't delete while iterating)
var keysToDelete []RouteKey
for key, route := range prefixRoutes {
if route.PeerASN == peerASN {
keysToDelete = append(keysToDelete, key)
}
}
// Delete the routes
count := 0
for _, key := range keysToDelete {
route, exists := rt.routes[key]
if !exists {
continue
}
rt.removeFromIndexes(key, route)
delete(rt.routes, key)
count++
// Update metrics
if isIPv6(route.Prefix) {
rt.ipv6Routes--
atomic.AddUint64(&rt.ipv6Updates, 1)
} else {
rt.ipv4Routes--
atomic.AddUint64(&rt.ipv4Updates, 1)
}
}
return count
}
// GetRoute retrieves a specific route
func (rt *RoutingTable) GetRoute(prefixID, originASNID uuid.UUID, peerASN int) (*Route, bool) {
rt.mu.RLock()
defer rt.mu.RUnlock()
key := RouteKey{
PrefixID: prefixID,
OriginASNID: originASNID,
PeerASN: peerASN,
}
route, exists := rt.routes[key]
if !exists {
return nil, false
}
// Return a copy to prevent external modification
routeCopy := *route
return &routeCopy, true
}
// GetRoutesByPrefix returns all routes for a specific prefix
func (rt *RoutingTable) GetRoutesByPrefix(prefixID uuid.UUID) []*Route {
rt.mu.RLock()
defer rt.mu.RUnlock()
routes := make([]*Route, 0)
if prefixRoutes, exists := rt.byPrefix[prefixID]; exists {
for _, route := range prefixRoutes {
routeCopy := *route
routes = append(routes, &routeCopy)
}
}
return routes
}
// GetRoutesByOriginASN returns all routes originated by a specific ASN
func (rt *RoutingTable) GetRoutesByOriginASN(originASNID uuid.UUID) []*Route {
rt.mu.RLock()
defer rt.mu.RUnlock()
routes := make([]*Route, 0)
if asnRoutes, exists := rt.byOriginASN[originASNID]; exists {
for _, route := range asnRoutes {
routeCopy := *route
routes = append(routes, &routeCopy)
}
}
return routes
}
// GetRoutesByPeerASN returns all routes received from a specific peer ASN
func (rt *RoutingTable) GetRoutesByPeerASN(peerASN int) []*Route {
rt.mu.RLock()
defer rt.mu.RUnlock()
routes := make([]*Route, 0)
if peerRoutes, exists := rt.byPeerASN[peerASN]; exists {
for _, route := range peerRoutes {
routeCopy := *route
routes = append(routes, &routeCopy)
}
}
return routes
}
// GetAllRoutes returns all active routes in the routing table
func (rt *RoutingTable) GetAllRoutes() []*Route {
rt.mu.RLock()
defer rt.mu.RUnlock()
routes := make([]*Route, 0, len(rt.routes))
for _, route := range rt.routes {
routeCopy := *route
routes = append(routes, &routeCopy)
}
return routes
}
// Size returns the total number of routes in the table
func (rt *RoutingTable) Size() int {
rt.mu.RLock()
defer rt.mu.RUnlock()
return len(rt.routes)
}
// Stats returns statistics about the routing table
func (rt *RoutingTable) Stats() map[string]int {
rt.mu.RLock()
defer rt.mu.RUnlock()
stats := map[string]int{
"total_routes": len(rt.routes),
"unique_prefixes": len(rt.byPrefix),
"unique_origins": len(rt.byOriginASN),
"unique_peers": len(rt.byPeerASN),
}
return stats
}
// DetailedStats contains detailed routing table statistics
type DetailedStats struct {
IPv4Routes int
IPv6Routes int
IPv4UpdatesRate float64
IPv6UpdatesRate float64
TotalRoutes int
UniquePrefixes int
UniqueOrigins int
UniquePeers int
}
// GetDetailedStats returns detailed statistics including IPv4/IPv6 breakdown and update rates
func (rt *RoutingTable) GetDetailedStats() DetailedStats {
rt.mu.Lock()
defer rt.mu.Unlock()
// Calculate update rates
elapsed := time.Since(rt.lastMetricsReset).Seconds()
ipv4Updates := atomic.LoadUint64(&rt.ipv4Updates)
ipv6Updates := atomic.LoadUint64(&rt.ipv6Updates)
stats := DetailedStats{
IPv4Routes: rt.ipv4Routes,
IPv6Routes: rt.ipv6Routes,
IPv4UpdatesRate: float64(ipv4Updates) / elapsed,
IPv6UpdatesRate: float64(ipv6Updates) / elapsed,
TotalRoutes: len(rt.routes),
UniquePrefixes: len(rt.byPrefix),
UniqueOrigins: len(rt.byOriginASN),
UniquePeers: len(rt.byPeerASN),
}
// Reset counters for next period
atomic.StoreUint64(&rt.ipv4Updates, 0)
atomic.StoreUint64(&rt.ipv6Updates, 0)
rt.lastMetricsReset = time.Now()
return stats
}
// Clear removes all routes from the routing table
func (rt *RoutingTable) Clear() {
rt.mu.Lock()
defer rt.mu.Unlock()
rt.routes = make(map[RouteKey]*Route)
rt.byPrefix = make(map[uuid.UUID]map[RouteKey]*Route)
rt.byOriginASN = make(map[uuid.UUID]map[RouteKey]*Route)
rt.byPeerASN = make(map[int]map[RouteKey]*Route)
rt.ipv4Routes = 0
rt.ipv6Routes = 0
atomic.StoreUint64(&rt.ipv4Updates, 0)
atomic.StoreUint64(&rt.ipv6Updates, 0)
rt.lastMetricsReset = time.Now()
}
// Helper methods for index management
func (rt *RoutingTable) addToIndexes(key RouteKey, route *Route) {
// Add to prefix index
if rt.byPrefix[route.PrefixID] == nil {
rt.byPrefix[route.PrefixID] = make(map[RouteKey]*Route)
}
rt.byPrefix[route.PrefixID][key] = route
// Add to origin ASN index
if rt.byOriginASN[route.OriginASNID] == nil {
rt.byOriginASN[route.OriginASNID] = make(map[RouteKey]*Route)
}
rt.byOriginASN[route.OriginASNID][key] = route
// Add to peer ASN index
if rt.byPeerASN[route.PeerASN] == nil {
rt.byPeerASN[route.PeerASN] = make(map[RouteKey]*Route)
}
rt.byPeerASN[route.PeerASN][key] = route
}
func (rt *RoutingTable) removeFromIndexes(key RouteKey, route *Route) {
// Remove from prefix index
if prefixRoutes, exists := rt.byPrefix[route.PrefixID]; exists {
delete(prefixRoutes, key)
if len(prefixRoutes) == 0 {
delete(rt.byPrefix, route.PrefixID)
}
}
// Remove from origin ASN index
if asnRoutes, exists := rt.byOriginASN[route.OriginASNID]; exists {
delete(asnRoutes, key)
if len(asnRoutes) == 0 {
delete(rt.byOriginASN, route.OriginASNID)
}
}
// Remove from peer ASN index
if peerRoutes, exists := rt.byPeerASN[route.PeerASN]; exists {
delete(peerRoutes, key)
if len(peerRoutes) == 0 {
delete(rt.byPeerASN, route.PeerASN)
}
}
}
// String returns a string representation of the route key
func (k RouteKey) String() string {
return fmt.Sprintf("%s/%s/%d", k.PrefixID, k.OriginASNID, k.PeerASN)
}
// isIPv6 returns true if the prefix is an IPv6 address
func isIPv6(prefix string) bool {
return strings.Contains(prefix, ":")
}

View File

@ -0,0 +1,219 @@
package routingtable
import (
"sync"
"testing"
"time"
"github.com/google/uuid"
)
func TestRoutingTable(t *testing.T) {
rt := New()
// Test data
prefixID1 := uuid.New()
prefixID2 := uuid.New()
originASNID1 := uuid.New()
originASNID2 := uuid.New()
route1 := &Route{
PrefixID: prefixID1,
Prefix: "10.0.0.0/8",
OriginASNID: originASNID1,
OriginASN: 64512,
PeerASN: 64513,
ASPath: []int{64513, 64512},
NextHop: "192.168.1.1",
AnnouncedAt: time.Now(),
}
route2 := &Route{
PrefixID: prefixID2,
Prefix: "192.168.0.0/16",
OriginASNID: originASNID2,
OriginASN: 64514,
PeerASN: 64513,
ASPath: []int{64513, 64514},
NextHop: "192.168.1.1",
AnnouncedAt: time.Now(),
}
// Test AddRoute
rt.AddRoute(route1)
rt.AddRoute(route2)
if rt.Size() != 2 {
t.Errorf("Expected 2 routes, got %d", rt.Size())
}
// Test GetRoute
retrievedRoute, exists := rt.GetRoute(prefixID1, originASNID1, 64513)
if !exists {
t.Error("Route 1 should exist")
}
if retrievedRoute.Prefix != "10.0.0.0/8" {
t.Errorf("Expected prefix 10.0.0.0/8, got %s", retrievedRoute.Prefix)
}
// Test GetRoutesByPrefix
prefixRoutes := rt.GetRoutesByPrefix(prefixID1)
if len(prefixRoutes) != 1 {
t.Errorf("Expected 1 route for prefix, got %d", len(prefixRoutes))
}
// Test GetRoutesByPeerASN
peerRoutes := rt.GetRoutesByPeerASN(64513)
if len(peerRoutes) != 2 {
t.Errorf("Expected 2 routes from peer 64513, got %d", len(peerRoutes))
}
// Test RemoveRoute
removed := rt.RemoveRoute(prefixID1, originASNID1, 64513)
if !removed {
t.Error("Route should have been removed")
}
if rt.Size() != 1 {
t.Errorf("Expected 1 route after removal, got %d", rt.Size())
}
// Test WithdrawRoutesByPrefixAndPeer
// Add the route back first
rt.AddRoute(route1)
// Add another route for the same prefix from the same peer
route3 := &Route{
PrefixID: prefixID1,
Prefix: "10.0.0.0/8",
OriginASNID: originASNID2, // Different origin
OriginASN: 64515,
PeerASN: 64513,
ASPath: []int{64513, 64515},
NextHop: "192.168.1.1",
AnnouncedAt: time.Now(),
}
rt.AddRoute(route3)
count := rt.WithdrawRoutesByPrefixAndPeer(prefixID1, 64513)
if count != 2 {
t.Errorf("Expected to withdraw 2 routes, withdrew %d", count)
}
// Should only have route2 left
if rt.Size() != 1 {
t.Errorf("Expected 1 route after withdrawal, got %d", rt.Size())
}
// Test Stats
stats := rt.Stats()
if stats["total_routes"] != 1 {
t.Errorf("Expected 1 total route in stats, got %d", stats["total_routes"])
}
// Test Clear
rt.Clear()
if rt.Size() != 0 {
t.Errorf("Expected 0 routes after clear, got %d", rt.Size())
}
}
func TestRoutingTableConcurrency(t *testing.T) {
rt := New()
// Test concurrent access
var wg sync.WaitGroup
numGoroutines := 10
numOperations := 100
// Start multiple goroutines that add/remove routes
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
prefixID := uuid.New()
originASNID := uuid.New()
route := &Route{
PrefixID: prefixID,
Prefix: "10.0.0.0/8",
OriginASNID: originASNID,
OriginASN: 64512 + id,
PeerASN: 64500,
ASPath: []int{64500, 64512 + id},
NextHop: "192.168.1.1",
AnnouncedAt: time.Now(),
}
// Add route
rt.AddRoute(route)
// Try to get it
_, _ = rt.GetRoute(prefixID, originASNID, 64500)
// Get stats
_ = rt.Stats()
// Remove it
rt.RemoveRoute(prefixID, originASNID, 64500)
}
}(i)
}
wg.Wait()
// Table should be empty after all operations
if rt.Size() != 0 {
t.Errorf("Expected empty table after concurrent operations, got %d routes", rt.Size())
}
}
func TestRouteUpdate(t *testing.T) {
rt := New()
prefixID := uuid.New()
originASNID := uuid.New()
route1 := &Route{
PrefixID: prefixID,
Prefix: "10.0.0.0/8",
OriginASNID: originASNID,
OriginASN: 64512,
PeerASN: 64513,
ASPath: []int{64513, 64512},
NextHop: "192.168.1.1",
AnnouncedAt: time.Now(),
}
// Add initial route
rt.AddRoute(route1)
// Update the same route with new next hop
route2 := &Route{
PrefixID: prefixID,
Prefix: "10.0.0.0/8",
OriginASNID: originASNID,
OriginASN: 64512,
PeerASN: 64513,
ASPath: []int{64513, 64512},
NextHop: "192.168.1.2", // Changed
AnnouncedAt: time.Now().Add(1 * time.Minute),
}
rt.AddRoute(route2)
// Should still have only 1 route
if rt.Size() != 1 {
t.Errorf("Expected 1 route after update, got %d", rt.Size())
}
// Check that the route was updated
retrievedRoute, exists := rt.GetRoute(prefixID, originASNID, 64513)
if !exists {
t.Error("Route should exist after update")
}
if retrievedRoute.NextHop != "192.168.1.2" {
t.Errorf("Expected updated next hop 192.168.1.2, got %s", retrievedRoute.NextHop)
}
}

View File

@ -10,6 +10,7 @@ import (
"time" "time"
"git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/routingtable"
"git.eeqj.de/sneak/routewatch/internal/streamer" "git.eeqj.de/sneak/routewatch/internal/streamer"
"git.eeqj.de/sneak/routewatch/internal/templates" "git.eeqj.de/sneak/routewatch/internal/templates"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
@ -20,15 +21,17 @@ import (
type Server struct { type Server struct {
router *chi.Mux router *chi.Mux
db database.Store db database.Store
routingTable *routingtable.RoutingTable
streamer *streamer.Streamer streamer *streamer.Streamer
logger *slog.Logger logger *slog.Logger
srv *http.Server srv *http.Server
} }
// New creates a new HTTP server // New creates a new HTTP server
func New(db database.Store, streamer *streamer.Streamer, logger *slog.Logger) *Server { func New(db database.Store, rt *routingtable.RoutingTable, streamer *streamer.Streamer, logger *slog.Logger) *Server {
s := &Server{ s := &Server{
db: db, db: db,
routingTable: rt,
streamer: streamer, streamer: streamer,
logger: logger, logger: logger,
} }
@ -200,7 +203,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
IPv4Prefixes: dbStats.IPv4Prefixes, IPv4Prefixes: dbStats.IPv4Prefixes,
IPv6Prefixes: dbStats.IPv6Prefixes, IPv6Prefixes: dbStats.IPv6Prefixes,
Peerings: dbStats.Peerings, Peerings: dbStats.Peerings,
LiveRoutes: dbStats.LiveRoutes, LiveRoutes: s.routingTable.Size(),
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@ -300,7 +303,7 @@ func (s *Server) handleStats() http.HandlerFunc {
IPv4Prefixes: dbStats.IPv4Prefixes, IPv4Prefixes: dbStats.IPv4Prefixes,
IPv6Prefixes: dbStats.IPv6Prefixes, IPv6Prefixes: dbStats.IPv6Prefixes,
Peerings: dbStats.Peerings, Peerings: dbStats.Peerings,
LiveRoutes: dbStats.LiveRoutes, LiveRoutes: s.routingTable.Size(),
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")

View File

@ -25,7 +25,7 @@ const (
metricsLogInterval = 10 * time.Second metricsLogInterval = 10 * time.Second
bytesPerKB = 1024 bytesPerKB = 1024
bytesPerMB = 1024 * 1024 bytesPerMB = 1024 * 1024
maxConcurrentHandlers = 100 // Maximum number of concurrent message handlers maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers
) )
// MessageHandler is an interface for handling RIS messages // MessageHandler is an interface for handling RIS messages
@ -35,23 +35,43 @@ type MessageHandler interface {
// HandleMessage processes a RIS message // HandleMessage processes a RIS message
HandleMessage(msg *ristypes.RISMessage) HandleMessage(msg *ristypes.RISMessage)
// QueueCapacity returns the desired queue capacity for this handler
// Handlers that process quickly can have larger queues
QueueCapacity() int
} }
// RawMessageHandler is a callback for handling raw JSON lines from the stream // RawMessageHandler is a callback for handling raw JSON lines from the stream
type RawMessageHandler func(line string) type RawMessageHandler func(line string)
// handlerMetrics tracks performance metrics for a handler
type handlerMetrics struct {
processedCount uint64 // Total messages processed
droppedCount uint64 // Total messages dropped
totalTime time.Duration // Total processing time (for average calculation)
minTime time.Duration // Minimum processing time
maxTime time.Duration // Maximum processing time
mu sync.Mutex // Protects the metrics
}
// handlerInfo wraps a handler with its queue and metrics
type handlerInfo struct {
handler MessageHandler
queue chan *ristypes.RISMessage
metrics handlerMetrics
}
// Streamer handles streaming BGP updates from RIS Live // Streamer handles streaming BGP updates from RIS Live
type Streamer struct { type Streamer struct {
logger *slog.Logger logger *slog.Logger
client *http.Client client *http.Client
handlers []MessageHandler handlers []*handlerInfo
rawHandler RawMessageHandler rawHandler RawMessageHandler
mu sync.RWMutex mu sync.RWMutex
cancel context.CancelFunc cancel context.CancelFunc
running bool running bool
metrics *metrics.Tracker metrics *metrics.Tracker
semaphore chan struct{} // Limits concurrent message processing totalDropped uint64 // Total dropped messages across all handlers
droppedMessages uint64 // Atomic counter for dropped messages
} }
// New creates a new RIS streamer // New creates a new RIS streamer
@ -61,9 +81,8 @@ func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer {
client: &http.Client{ client: &http.Client{
Timeout: 0, // No timeout for streaming Timeout: 0, // No timeout for streaming
}, },
handlers: make([]MessageHandler, 0), handlers: make([]*handlerInfo, 0),
metrics: metrics, metrics: metrics,
semaphore: make(chan struct{}, maxConcurrentHandlers),
} }
} }
@ -71,7 +90,19 @@ func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer {
func (s *Streamer) RegisterHandler(handler MessageHandler) { func (s *Streamer) RegisterHandler(handler MessageHandler) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.handlers = append(s.handlers, handler)
// Create handler info with its own queue based on capacity
info := &handlerInfo{
handler: handler,
queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()),
}
s.handlers = append(s.handlers, info)
// If we're already running, start a worker for this handler
if s.running {
go s.runHandlerWorker(info)
}
} }
// RegisterRawHandler sets a callback for raw message lines // RegisterRawHandler sets a callback for raw message lines
@ -94,6 +125,11 @@ func (s *Streamer) Start() error {
s.cancel = cancel s.cancel = cancel
s.running = true s.running = true
// Start workers for each handler
for _, info := range s.handlers {
go s.runHandlerWorker(info)
}
go func() { go func() {
if err := s.stream(ctx); err != nil { if err := s.stream(ctx); err != nil {
s.logger.Error("Streaming error", "error", err) s.logger.Error("Streaming error", "error", err)
@ -112,10 +148,40 @@ func (s *Streamer) Stop() {
if s.cancel != nil { if s.cancel != nil {
s.cancel() s.cancel()
} }
// Close all handler queues to signal workers to stop
for _, info := range s.handlers {
close(info.queue)
}
s.running = false
s.mu.Unlock() s.mu.Unlock()
s.metrics.SetConnected(false) s.metrics.SetConnected(false)
} }
// runHandlerWorker processes messages for a specific handler
func (s *Streamer) runHandlerWorker(info *handlerInfo) {
for msg := range info.queue {
start := time.Now()
info.handler.HandleMessage(msg)
elapsed := time.Since(start)
// Update metrics
info.metrics.mu.Lock()
info.metrics.processedCount++
info.metrics.totalTime += elapsed
// Update min time
if info.metrics.minTime == 0 || elapsed < info.metrics.minTime {
info.metrics.minTime = elapsed
}
// Update max time
if elapsed > info.metrics.maxTime {
info.metrics.maxTime = elapsed
}
info.metrics.mu.Unlock()
}
}
// IsRunning returns whether the streamer is currently active // IsRunning returns whether the streamer is currently active
func (s *Streamer) IsRunning() bool { func (s *Streamer) IsRunning() bool {
s.mu.RLock() s.mu.RLock()
@ -131,7 +197,7 @@ func (s *Streamer) GetMetrics() metrics.StreamMetrics {
// GetDroppedMessages returns the total number of dropped messages // GetDroppedMessages returns the total number of dropped messages
func (s *Streamer) GetDroppedMessages() uint64 { func (s *Streamer) GetDroppedMessages() uint64 {
return atomic.LoadUint64(&s.droppedMessages) return atomic.LoadUint64(&s.totalDropped)
} }
// logMetrics logs the current streaming statistics // logMetrics logs the current streaming statistics
@ -140,18 +206,57 @@ func (s *Streamer) logMetrics() {
uptime := time.Since(metrics.ConnectedSince) uptime := time.Since(metrics.ConnectedSince)
const bitsPerMegabit = 1000000 const bitsPerMegabit = 1000000
droppedMessages := atomic.LoadUint64(&s.droppedMessages) totalDropped := atomic.LoadUint64(&s.totalDropped)
s.logger.Info("Stream statistics",
"uptime", uptime, s.logger.Info(
"total_messages", metrics.TotalMessages, "Stream statistics",
"total_bytes", metrics.TotalBytes, "uptime",
"total_mb", fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB), uptime,
"messages_per_sec", fmt.Sprintf("%.2f", metrics.MessagesPerSec), "total_messages",
"bits_per_sec", fmt.Sprintf("%.0f", metrics.BitsPerSec), metrics.TotalMessages,
"mbps", fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit), "total_bytes",
"dropped_messages", droppedMessages, metrics.TotalBytes,
"active_handlers", len(s.semaphore), "total_mb",
fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
"messages_per_sec",
fmt.Sprintf("%.2f", metrics.MessagesPerSec),
"bits_per_sec",
fmt.Sprintf("%.0f", metrics.BitsPerSec),
"mbps",
fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
"total_dropped",
totalDropped,
) )
// Log per-handler statistics
s.mu.RLock()
for i, info := range s.handlers {
info.metrics.mu.Lock()
if info.metrics.processedCount > 0 {
// Safe conversion: processedCount is bounded by maxInt64
processedCount := info.metrics.processedCount
const maxInt64 = 1<<63 - 1
if processedCount > maxInt64 {
processedCount = maxInt64
}
//nolint:gosec // processedCount is explicitly bounded above
avgTime := info.metrics.totalTime / time.Duration(processedCount)
s.logger.Info(
"Handler statistics",
"handler", fmt.Sprintf("%T", info.handler),
"index", i,
"queue_len", len(info.queue),
"queue_cap", cap(info.queue),
"processed", info.metrics.processedCount,
"dropped", info.metrics.droppedCount,
"avg_time", avgTime,
"min_time", info.metrics.minTime,
"max_time", info.metrics.maxTime,
)
}
info.metrics.mu.Unlock()
}
s.mu.RUnlock()
} }
// updateMetrics updates the metrics counters and rates // updateMetrics updates the metrics counters and rates
@ -226,25 +331,12 @@ func (s *Streamer) stream(ctx context.Context) error {
rawHandler(string(line)) rawHandler(string(line))
} }
// Get current handlers // Parse the message first
s.mu.RLock()
handlers := make([]MessageHandler, len(s.handlers))
copy(handlers, s.handlers)
s.mu.RUnlock()
// Try to acquire semaphore, drop message if at capacity
select {
case s.semaphore <- struct{}{}:
// Successfully acquired semaphore, process message
go func(rawLine []byte, messageHandlers []MessageHandler) {
defer func() { <-s.semaphore }() // Release semaphore when done
// Parse the outer wrapper first
var wrapper ristypes.RISLiveMessage var wrapper ristypes.RISLiveMessage
if err := json.Unmarshal(rawLine, &wrapper); err != nil { if err := json.Unmarshal(line, &wrapper); err != nil {
// Output the raw line and panic on parse failure // Output the raw line and panic on parse failure
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err) fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err)
fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(rawLine)) fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(line))
panic(fmt.Sprintf("JSON parse error: %v", err)) panic(fmt.Sprintf("JSON parse error: %v", err))
} }
@ -252,10 +344,10 @@ func (s *Streamer) stream(ctx context.Context) error {
if wrapper.Type != "ris_message" { if wrapper.Type != "ris_message" {
s.logger.Error("Unexpected wrapper type", s.logger.Error("Unexpected wrapper type",
"type", wrapper.Type, "type", wrapper.Type,
"line", string(rawLine), "line", string(line),
) )
return continue
} }
// Get the actual message // Get the actual message
@ -268,51 +360,63 @@ func (s *Streamer) stream(ctx context.Context) error {
switch msg.Type { switch msg.Type {
case "UPDATE": case "UPDATE":
// Process BGP UPDATE messages // Process BGP UPDATE messages
// Will be handled by registered handlers // Will be dispatched to handlers
case "RIS_PEER_STATE": case "RIS_PEER_STATE":
// RIS peer state messages - silently ignore // RIS peer state messages - silently ignore
continue
case "KEEPALIVE": case "KEEPALIVE":
// BGP keepalive messages - silently process // BGP keepalive messages - silently process
continue
case "OPEN": case "OPEN":
// BGP open messages // BGP open messages
s.logger.Info("BGP session opened", s.logger.Info("BGP session opened",
"peer", msg.Peer, "peer", msg.Peer,
"peer_asn", msg.PeerASN, "peer_asn", msg.PeerASN,
) )
continue
case "NOTIFICATION": case "NOTIFICATION":
// BGP notification messages (errors) // BGP notification messages (errors)
s.logger.Warn("BGP notification", s.logger.Warn("BGP notification",
"peer", msg.Peer, "peer", msg.Peer,
"peer_asn", msg.PeerASN, "peer_asn", msg.PeerASN,
) )
continue
case "STATE": case "STATE":
// Peer state changes - silently ignore // Peer state changes - silently ignore
continue
default: default:
fmt.Fprintf( fmt.Fprintf(
os.Stderr, os.Stderr,
"UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n", "UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n",
msg.Type, msg.Type,
string(rawLine), string(line),
)
panic(
fmt.Sprintf(
"Unknown RIS message type: %s",
msg.Type,
),
) )
panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type))
} }
// Call handlers synchronously within this goroutine // Dispatch to interested handlers
// This prevents unbounded goroutine growth at the handler level s.mu.RLock()
for _, handler := range messageHandlers { for _, info := range s.handlers {
if handler.WantsMessage(msg.Type) { if info.handler.WantsMessage(msg.Type) {
handler.HandleMessage(&msg) select {
} case info.queue <- &msg:
} // Message queued successfully
}(append([]byte(nil), line...), handlers) // Copy the line to avoid data races
default: default:
// Semaphore is full, drop the message // Queue is full, drop the message
dropped := atomic.AddUint64(&s.droppedMessages, 1) atomic.AddUint64(&info.metrics.droppedCount, 1)
if dropped%1000 == 0 { // Log every 1000 dropped messages atomic.AddUint64(&s.totalDropped, 1)
s.logger.Warn("Dropping messages due to overload", "total_dropped", dropped, "max_handlers", maxConcurrentHandlers)
} }
} }
} }
s.mu.RUnlock()
}
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
return fmt.Errorf("scanner error: %w", err) return fmt.Errorf("scanner error: %w", err)