Compare commits
No commits in common. "1d05372899370842dd462cc5e5fc0b1b5da88c8c" and "d328fb0942f2dec91e8fe77f22b37fdb53ffa617" have entirely different histories.
1d05372899
...
d328fb0942
2
Makefile
2
Makefile
@ -15,7 +15,7 @@ lint:
|
||||
golangci-lint run
|
||||
|
||||
build:
|
||||
CGO_ENABLED=1 go build -o bin/routewatch cmd/routewatch/main.go
|
||||
go build -o bin/routewatch cmd/routewatch/main.go
|
||||
|
||||
clean:
|
||||
rm -rf bin/
|
||||
|
14
go.mod
14
go.mod
@ -3,16 +3,24 @@ module git.eeqj.de/sneak/routewatch
|
||||
go 1.24.4
|
||||
|
||||
require (
|
||||
github.com/go-chi/chi/v5 v5.2.2
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/mattn/go-sqlite3 v1.14.29
|
||||
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9
|
||||
go.uber.org/fx v1.24.0
|
||||
modernc.org/sqlite v1.38.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/go-chi/chi/v5 v5.2.2 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/ncruces/go-strftime v0.1.9 // indirect
|
||||
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
go.uber.org/dig v1.19.0 // indirect
|
||||
go.uber.org/multierr v1.10.0 // indirect
|
||||
go.uber.org/zap v1.26.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
|
||||
golang.org/x/sys v0.34.0 // indirect
|
||||
modernc.org/libc v1.66.3 // indirect
|
||||
modernc.org/mathutil v1.7.1 // indirect
|
||||
modernc.org/memory v1.11.0 // indirect
|
||||
)
|
||||
|
47
go.sum
47
go.sum
@ -1,15 +1,23 @@
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618=
|
||||
github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/mattn/go-sqlite3 v1.14.29 h1:1O6nRLJKvsi1H2Sj0Hzdfojwt8GiGKm+LOfLaBFaouQ=
|
||||
github.com/mattn/go-sqlite3 v1.14.29/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
|
||||
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4=
|
||||
@ -22,7 +30,42 @@ go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
|
||||
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
|
||||
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
|
||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
|
||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
|
||||
golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
|
||||
golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
|
||||
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
|
||||
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
|
||||
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo=
|
||||
golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
modernc.org/cc/v4 v4.26.2 h1:991HMkLjJzYBIfha6ECZdjrIYz2/1ayr+FL8GN+CNzM=
|
||||
modernc.org/cc/v4 v4.26.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
|
||||
modernc.org/ccgo/v4 v4.28.0 h1:rjznn6WWehKq7dG4JtLRKxb52Ecv8OUGah8+Z/SfpNU=
|
||||
modernc.org/ccgo/v4 v4.28.0/go.mod h1:JygV3+9AV6SmPhDasu4JgquwU81XAKLd3OKTUDNOiKE=
|
||||
modernc.org/fileutil v1.3.8 h1:qtzNm7ED75pd1C7WgAGcK4edm4fvhtBsEiI/0NQ54YM=
|
||||
modernc.org/fileutil v1.3.8/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc=
|
||||
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
|
||||
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
|
||||
modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks=
|
||||
modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI=
|
||||
modernc.org/libc v1.66.3 h1:cfCbjTUcdsKyyZZfEUKfoHcP3S0Wkvz3jgSzByEWVCQ=
|
||||
modernc.org/libc v1.66.3/go.mod h1:XD9zO8kt59cANKvHPXpx7yS2ELPheAey0vjIuZOhOU8=
|
||||
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
|
||||
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
|
||||
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
|
||||
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
|
||||
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
|
||||
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
|
||||
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
|
||||
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
|
||||
modernc.org/sqlite v1.38.1 h1:jNnIjleVta+DKSAr3TnkKK87EEhjPhBLzi6hvIX9Bas=
|
||||
modernc.org/sqlite v1.38.1/go.mod h1:cPTJYSlgg3Sfg046yBShXENNtPrWrDX8bsbAQBzgQ5E=
|
||||
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
|
||||
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
|
||||
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
|
||||
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
|
||||
|
@ -11,9 +11,8 @@ import (
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/pkg/asinfo"
|
||||
"github.com/google/uuid"
|
||||
_ "github.com/mattn/go-sqlite3" // CGO SQLite driver
|
||||
_ "modernc.org/sqlite" // Pure Go SQLite driver
|
||||
)
|
||||
|
||||
//go:embed schema.sql
|
||||
@ -95,10 +94,10 @@ func NewWithConfig(config Config, logger *slog.Logger) (*Database, error) {
|
||||
return nil, fmt.Errorf("failed to create database directory: %w", err)
|
||||
}
|
||||
|
||||
// Add connection parameters for go-sqlite3
|
||||
// Add connection parameters for modernc.org/sqlite
|
||||
// Enable WAL mode and other performance optimizations
|
||||
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&cache=shared", config.Path)
|
||||
db, err := sql.Open("sqlite3", dsn)
|
||||
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&_cache_size=-512000", config.Path)
|
||||
db, err := sql.Open("sqlite", dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open database: %w", err)
|
||||
}
|
||||
@ -127,7 +126,7 @@ func (d *Database) Initialize() error {
|
||||
pragmas := []string{
|
||||
"PRAGMA journal_mode=WAL", // Already set in connection string
|
||||
"PRAGMA synchronous=NORMAL", // Faster than FULL, still safe
|
||||
"PRAGMA cache_size=-524288", // 512MB cache (negative = KB)
|
||||
"PRAGMA cache_size=-524288", // 512MB cache
|
||||
"PRAGMA temp_store=MEMORY", // Use memory for temp tables
|
||||
"PRAGMA mmap_size=268435456", // 256MB memory-mapped I/O
|
||||
"PRAGMA wal_autocheckpoint=1000", // Checkpoint every 1000 pages
|
||||
@ -175,15 +174,12 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
|
||||
|
||||
var asn ASN
|
||||
var idStr string
|
||||
var handle, description sql.NullString
|
||||
err = tx.QueryRow("SELECT id, number, handle, description, first_seen, last_seen FROM asns WHERE number = ?", number).
|
||||
Scan(&idStr, &asn.Number, &handle, &description, &asn.FirstSeen, &asn.LastSeen)
|
||||
err = tx.QueryRow("SELECT id, number, first_seen, last_seen FROM asns WHERE number = ?", number).
|
||||
Scan(&idStr, &asn.Number, &asn.FirstSeen, &asn.LastSeen)
|
||||
|
||||
if err == nil {
|
||||
// ASN exists, update last_seen
|
||||
asn.ID, _ = uuid.Parse(idStr)
|
||||
asn.Handle = handle.String
|
||||
asn.Description = description.String
|
||||
_, err = tx.Exec("UPDATE asns SET last_seen = ? WHERE id = ?", timestamp, asn.ID.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -203,22 +199,15 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ASN doesn't exist, create it with ASN info lookup
|
||||
// ASN doesn't exist, create it
|
||||
asn = ASN{
|
||||
ID: generateUUID(),
|
||||
Number: number,
|
||||
FirstSeen: timestamp,
|
||||
LastSeen: timestamp,
|
||||
}
|
||||
|
||||
// Look up ASN info
|
||||
if info, ok := asinfo.Get(number); ok {
|
||||
asn.Handle = info.Handle
|
||||
asn.Description = info.Description
|
||||
}
|
||||
|
||||
_, err = tx.Exec("INSERT INTO asns (id, number, handle, description, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
asn.ID.String(), asn.Number, asn.Handle, asn.Description, asn.FirstSeen, asn.LastSeen)
|
||||
_, err = tx.Exec("INSERT INTO asns (id, number, first_seen, last_seen) VALUES (?, ?, ?, ?)",
|
||||
asn.ID.String(), asn.Number, asn.FirstSeen, asn.LastSeen)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -351,6 +340,75 @@ func (d *Database) RecordPeering(fromASNID, toASNID string, timestamp time.Time)
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateLiveRoute updates the live routing table for an announcement
|
||||
func (d *Database) UpdateLiveRoute(
|
||||
prefixID, originASNID uuid.UUID,
|
||||
peerASN int,
|
||||
nextHop string,
|
||||
timestamp time.Time,
|
||||
) error {
|
||||
// Use SQLite's UPSERT capability to avoid the SELECT+UPDATE/INSERT pattern
|
||||
// This reduces the number of queries and improves performance
|
||||
// Note: We removed the WHERE clause from ON CONFLICT UPDATE because
|
||||
// if we're updating, we want to update regardless of withdrawn_at status
|
||||
err := d.exec(`
|
||||
INSERT INTO live_routes (id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at, withdrawn_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, NULL)
|
||||
ON CONFLICT(prefix_id, origin_asn_id, peer_asn) DO UPDATE SET
|
||||
next_hop = excluded.next_hop,
|
||||
announced_at = excluded.announced_at,
|
||||
withdrawn_at = NULL`,
|
||||
generateUUID().String(), prefixID.String(), originASNID.String(),
|
||||
peerASN, nextHop, timestamp)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// WithdrawLiveRoute marks a route as withdrawn in the live routing table
|
||||
func (d *Database) WithdrawLiveRoute(prefixID uuid.UUID, peerASN int, timestamp time.Time) error {
|
||||
err := d.exec(`
|
||||
UPDATE live_routes
|
||||
SET withdrawn_at = ?
|
||||
WHERE prefix_id = ? AND peer_asn = ? AND withdrawn_at IS NULL`,
|
||||
timestamp, prefixID.String(), peerASN)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// GetActiveLiveRoutes returns all currently active routes (not withdrawn)
|
||||
func (d *Database) GetActiveLiveRoutes() ([]LiveRoute, error) {
|
||||
rows, err := d.query(`
|
||||
SELECT id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at
|
||||
FROM live_routes
|
||||
WHERE withdrawn_at IS NULL
|
||||
ORDER BY announced_at DESC`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
_ = rows.Close()
|
||||
}()
|
||||
|
||||
var routes []LiveRoute
|
||||
for rows.Next() {
|
||||
var route LiveRoute
|
||||
var idStr, prefixIDStr, originASNIDStr string
|
||||
err := rows.Scan(&idStr, &prefixIDStr, &originASNIDStr,
|
||||
&route.PeerASN, &route.NextHop, &route.AnnouncedAt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
route.ID, _ = uuid.Parse(idStr)
|
||||
route.PrefixID, _ = uuid.Parse(prefixIDStr)
|
||||
route.OriginASNID, _ = uuid.Parse(originASNIDStr)
|
||||
|
||||
routes = append(routes, route)
|
||||
}
|
||||
|
||||
return routes, rows.Err()
|
||||
}
|
||||
|
||||
// UpdatePeer updates or creates a BGP peer record
|
||||
func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
|
||||
tx, err := d.beginTx()
|
||||
@ -437,6 +495,13 @@ func (d *Database) GetStats() (Stats, error) {
|
||||
return stats, err
|
||||
}
|
||||
|
||||
// Count live routes
|
||||
d.logger.Info("Counting live routes")
|
||||
err = d.queryRow("SELECT COUNT(*) FROM live_routes WHERE withdrawn_at IS NULL").Scan(&stats.LiveRoutes)
|
||||
if err != nil {
|
||||
return stats, err
|
||||
}
|
||||
|
||||
d.logger.Info("Stats collection complete")
|
||||
|
||||
return stats, nil
|
||||
|
@ -2,6 +2,8 @@ package database
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Stats contains database statistics
|
||||
@ -11,6 +13,7 @@ type Stats struct {
|
||||
IPv4Prefixes int
|
||||
IPv6Prefixes int
|
||||
Peerings int
|
||||
LiveRoutes int
|
||||
}
|
||||
|
||||
// Store defines the interface for database operations
|
||||
@ -27,6 +30,11 @@ type Store interface {
|
||||
// Peering operations
|
||||
RecordPeering(fromASNID, toASNID string, timestamp time.Time) error
|
||||
|
||||
// Live route operations
|
||||
UpdateLiveRoute(prefixID, originASNID uuid.UUID, peerASN int, nextHop string, timestamp time.Time) error
|
||||
WithdrawLiveRoute(prefixID uuid.UUID, peerASN int, timestamp time.Time) error
|
||||
GetActiveLiveRoutes() ([]LiveRoute, error)
|
||||
|
||||
// Statistics
|
||||
GetStats() (Stats, error)
|
||||
|
||||
|
@ -8,12 +8,10 @@ import (
|
||||
|
||||
// ASN represents an Autonomous System Number
|
||||
type ASN struct {
|
||||
ID uuid.UUID `json:"id"`
|
||||
Number int `json:"number"`
|
||||
Handle string `json:"handle"`
|
||||
Description string `json:"description"`
|
||||
FirstSeen time.Time `json:"first_seen"`
|
||||
LastSeen time.Time `json:"last_seen"`
|
||||
ID uuid.UUID `json:"id"`
|
||||
Number int `json:"number"`
|
||||
FirstSeen time.Time `json:"first_seen"`
|
||||
LastSeen time.Time `json:"last_seen"`
|
||||
}
|
||||
|
||||
// Prefix represents an IP prefix (CIDR block)
|
||||
@ -45,3 +43,15 @@ type ASNPeering struct {
|
||||
FirstSeen time.Time `json:"first_seen"`
|
||||
LastSeen time.Time `json:"last_seen"`
|
||||
}
|
||||
|
||||
// LiveRoute represents the current state of a route in the live routing table
|
||||
type LiveRoute struct {
|
||||
ID uuid.UUID `json:"id"`
|
||||
PrefixID uuid.UUID `json:"prefix_id"`
|
||||
OriginASNID uuid.UUID `json:"origin_asn_id"`
|
||||
PeerASN int `json:"peer_asn"`
|
||||
Path string `json:"path"`
|
||||
NextHop string `json:"next_hop"`
|
||||
AnnouncedAt time.Time `json:"announced_at"`
|
||||
WithdrawnAt *time.Time `json:"withdrawn_at"`
|
||||
}
|
||||
|
@ -1,8 +1,6 @@
|
||||
CREATE TABLE IF NOT EXISTS asns (
|
||||
id TEXT PRIMARY KEY,
|
||||
number INTEGER UNIQUE NOT NULL,
|
||||
handle TEXT,
|
||||
description TEXT,
|
||||
first_seen DATETIME NOT NULL,
|
||||
last_seen DATETIME NOT NULL
|
||||
);
|
||||
@ -50,6 +48,20 @@ CREATE TABLE IF NOT EXISTS bgp_peers (
|
||||
last_message_type TEXT
|
||||
);
|
||||
|
||||
-- Live routing table: current state of announced routes
|
||||
CREATE TABLE IF NOT EXISTS live_routes (
|
||||
id TEXT PRIMARY KEY,
|
||||
prefix_id TEXT NOT NULL,
|
||||
origin_asn_id TEXT NOT NULL,
|
||||
peer_asn INTEGER NOT NULL,
|
||||
next_hop TEXT,
|
||||
announced_at DATETIME NOT NULL,
|
||||
withdrawn_at DATETIME,
|
||||
FOREIGN KEY (prefix_id) REFERENCES prefixes(id),
|
||||
FOREIGN KEY (origin_asn_id) REFERENCES asns(id),
|
||||
UNIQUE(prefix_id, origin_asn_id, peer_asn)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_prefixes_ip_version ON prefixes(ip_version);
|
||||
CREATE INDEX IF NOT EXISTS idx_prefixes_version_prefix ON prefixes(ip_version, prefix);
|
||||
CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp);
|
||||
@ -59,6 +71,43 @@ CREATE INDEX IF NOT EXISTS idx_asn_peerings_from_asn ON asn_peerings(from_asn_id
|
||||
CREATE INDEX IF NOT EXISTS idx_asn_peerings_to_asn ON asn_peerings(to_asn_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_asn_peerings_lookup ON asn_peerings(from_asn_id, to_asn_id);
|
||||
|
||||
-- Indexes for live routes table
|
||||
CREATE INDEX IF NOT EXISTS idx_live_routes_active
|
||||
ON live_routes(prefix_id, origin_asn_id)
|
||||
WHERE withdrawn_at IS NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_live_routes_origin
|
||||
ON live_routes(origin_asn_id)
|
||||
WHERE withdrawn_at IS NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_live_routes_prefix
|
||||
ON live_routes(prefix_id)
|
||||
WHERE withdrawn_at IS NULL;
|
||||
|
||||
-- Critical index for the most common query pattern
|
||||
CREATE INDEX IF NOT EXISTS idx_live_routes_lookup
|
||||
ON live_routes(prefix_id, origin_asn_id, peer_asn)
|
||||
WHERE withdrawn_at IS NULL;
|
||||
|
||||
-- Index for withdrawal updates by prefix and peer
|
||||
CREATE INDEX IF NOT EXISTS idx_live_routes_withdraw
|
||||
ON live_routes(prefix_id, peer_asn)
|
||||
WHERE withdrawn_at IS NULL;
|
||||
|
||||
-- Covering index for SELECT id queries (includes id in index)
|
||||
CREATE INDEX IF NOT EXISTS idx_live_routes_covering
|
||||
ON live_routes(prefix_id, origin_asn_id, peer_asn, id)
|
||||
WHERE withdrawn_at IS NULL;
|
||||
|
||||
-- Index for UPDATE by id operations
|
||||
CREATE INDEX IF NOT EXISTS idx_live_routes_id
|
||||
ON live_routes(id);
|
||||
|
||||
-- Index for stats queries
|
||||
CREATE INDEX IF NOT EXISTS idx_live_routes_stats
|
||||
ON live_routes(withdrawn_at)
|
||||
WHERE withdrawn_at IS NULL;
|
||||
|
||||
-- Additional indexes for prefixes table
|
||||
CREATE INDEX IF NOT EXISTS idx_prefixes_prefix ON prefixes(prefix);
|
||||
|
||||
|
@ -26,7 +26,6 @@ func (d *Database) queryRow(query string, args ...interface{}) *sql.Row {
|
||||
}
|
||||
|
||||
// query wraps Query with slow query logging
|
||||
// nolint:unused // kept for future use to ensure all queries go through slow query logging
|
||||
func (d *Database) query(query string, args ...interface{}) (*sql.Rows, error) {
|
||||
start := time.Now()
|
||||
defer logSlowQuery(d.logger, query, start)
|
||||
|
@ -4,7 +4,6 @@ package routewatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
@ -12,7 +11,6 @@ import (
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||
"git.eeqj.de/sneak/routewatch/internal/metrics"
|
||||
"git.eeqj.de/sneak/routewatch/internal/routingtable"
|
||||
"git.eeqj.de/sneak/routewatch/internal/server"
|
||||
"git.eeqj.de/sneak/routewatch/internal/streamer"
|
||||
|
||||
@ -24,11 +22,6 @@ type Config struct {
|
||||
MaxRuntime time.Duration // Maximum runtime (0 = run forever)
|
||||
}
|
||||
|
||||
const (
|
||||
// routingTableStatsInterval is how often we log routing table statistics
|
||||
routingTableStatsInterval = 15 * time.Second
|
||||
)
|
||||
|
||||
// NewConfig provides default configuration
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
@ -40,33 +33,30 @@ func NewConfig() Config {
|
||||
type Dependencies struct {
|
||||
fx.In
|
||||
|
||||
DB database.Store
|
||||
RoutingTable *routingtable.RoutingTable
|
||||
Streamer *streamer.Streamer
|
||||
Server *server.Server
|
||||
Logger *slog.Logger
|
||||
Config Config `optional:"true"`
|
||||
DB database.Store
|
||||
Streamer *streamer.Streamer
|
||||
Server *server.Server
|
||||
Logger *slog.Logger
|
||||
Config Config `optional:"true"`
|
||||
}
|
||||
|
||||
// RouteWatch represents the main application instance
|
||||
type RouteWatch struct {
|
||||
db database.Store
|
||||
routingTable *routingtable.RoutingTable
|
||||
streamer *streamer.Streamer
|
||||
server *server.Server
|
||||
logger *slog.Logger
|
||||
maxRuntime time.Duration
|
||||
db database.Store
|
||||
streamer *streamer.Streamer
|
||||
server *server.Server
|
||||
logger *slog.Logger
|
||||
maxRuntime time.Duration
|
||||
}
|
||||
|
||||
// New creates a new RouteWatch instance
|
||||
func New(deps Dependencies) *RouteWatch {
|
||||
return &RouteWatch{
|
||||
db: deps.DB,
|
||||
routingTable: deps.RoutingTable,
|
||||
streamer: deps.Streamer,
|
||||
server: deps.Server,
|
||||
logger: deps.Logger,
|
||||
maxRuntime: deps.Config.MaxRuntime,
|
||||
db: deps.DB,
|
||||
streamer: deps.Streamer,
|
||||
server: deps.Server,
|
||||
logger: deps.Logger,
|
||||
maxRuntime: deps.Config.MaxRuntime,
|
||||
}
|
||||
}
|
||||
|
||||
@ -86,17 +76,10 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
|
||||
dbHandler := NewDatabaseHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(dbHandler)
|
||||
|
||||
// Register routing table handler to maintain in-memory routing table
|
||||
rtHandler := NewRoutingTableHandler(rw.routingTable, rw.logger)
|
||||
rw.streamer.RegisterHandler(rtHandler)
|
||||
|
||||
// Register peer tracking handler to track all peers
|
||||
peerHandler := NewPeerHandler(rw.db, rw.logger)
|
||||
rw.streamer.RegisterHandler(peerHandler)
|
||||
|
||||
// Start periodic routing table stats logging
|
||||
go rw.logRoutingTableStats(ctx)
|
||||
|
||||
// Start streaming
|
||||
if err := rw.streamer.Start(); err != nil {
|
||||
return err
|
||||
@ -134,32 +117,6 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// logRoutingTableStats periodically logs routing table statistics
|
||||
func (rw *RouteWatch) logRoutingTableStats(ctx context.Context) {
|
||||
// Log stats periodically
|
||||
ticker := time.NewTicker(routingTableStatsInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
stats := rw.routingTable.GetDetailedStats()
|
||||
rw.logger.Info("Routing table statistics",
|
||||
"ipv4_routes", stats.IPv4Routes,
|
||||
"ipv6_routes", stats.IPv6Routes,
|
||||
"ipv4_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv4UpdatesRate),
|
||||
"ipv6_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv6UpdatesRate),
|
||||
"total_routes", stats.TotalRoutes,
|
||||
"unique_prefixes", stats.UniquePrefixes,
|
||||
"unique_origins", stats.UniqueOrigins,
|
||||
"unique_peers", stats.UniquePeers,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewLogger creates a structured logger
|
||||
func NewLogger() *slog.Logger {
|
||||
level := slog.LevelInfo
|
||||
@ -197,12 +154,8 @@ func getModule() fx.Option {
|
||||
},
|
||||
fx.As(new(database.Store)),
|
||||
),
|
||||
routingtable.New,
|
||||
streamer.New,
|
||||
fx.Annotate(
|
||||
server.New,
|
||||
fx.ParamTags(``, ``, ``, ``),
|
||||
),
|
||||
server.New,
|
||||
New,
|
||||
),
|
||||
)
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||
"git.eeqj.de/sneak/routewatch/internal/metrics"
|
||||
"git.eeqj.de/sneak/routewatch/internal/routingtable"
|
||||
"git.eeqj.de/sneak/routewatch/internal/server"
|
||||
"git.eeqj.de/sneak/routewatch/internal/streamer"
|
||||
"github.com/google/uuid"
|
||||
@ -130,6 +129,35 @@ func (m *mockStore) RecordPeering(fromASNID, toASNID string, _ time.Time) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateLiveRoute mock implementation
|
||||
func (m *mockStore) UpdateLiveRoute(prefixID, originASNID uuid.UUID, peerASN int, _ string, _ time.Time) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
key := prefixID.String() + "_" + originASNID.String() + "_" + string(rune(peerASN))
|
||||
if !m.Routes[key] {
|
||||
m.Routes[key] = true
|
||||
m.RouteCount++
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WithdrawLiveRoute mock implementation
|
||||
func (m *mockStore) WithdrawLiveRoute(_ uuid.UUID, _ int, _ time.Time) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.WithdrawalCount++
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetActiveLiveRoutes mock implementation
|
||||
func (m *mockStore) GetActiveLiveRoutes() ([]database.LiveRoute, error) {
|
||||
return []database.LiveRoute{}, nil
|
||||
}
|
||||
|
||||
// UpdatePeer mock implementation
|
||||
func (m *mockStore) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
|
||||
// Simple mock - just return nil
|
||||
@ -152,6 +180,7 @@ func (m *mockStore) GetStats() (database.Stats, error) {
|
||||
IPv4Prefixes: m.IPv4Prefixes,
|
||||
IPv6Prefixes: m.IPv6Prefixes,
|
||||
Peerings: m.PeeringCount,
|
||||
LiveRoutes: m.RouteCount,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -168,19 +197,15 @@ func TestRouteWatchLiveFeed(t *testing.T) {
|
||||
// Create streamer
|
||||
s := streamer.New(logger, metricsTracker)
|
||||
|
||||
// Create routing table
|
||||
rt := routingtable.New()
|
||||
|
||||
// Create server
|
||||
srv := server.New(mockDB, rt, s, logger)
|
||||
srv := server.New(mockDB, s, logger)
|
||||
|
||||
// Create RouteWatch with 5 second limit
|
||||
deps := Dependencies{
|
||||
DB: mockDB,
|
||||
RoutingTable: rt,
|
||||
Streamer: s,
|
||||
Server: srv,
|
||||
Logger: logger,
|
||||
DB: mockDB,
|
||||
Streamer: s,
|
||||
Server: srv,
|
||||
Logger: logger,
|
||||
Config: Config{
|
||||
MaxRuntime: 5 * time.Second,
|
||||
},
|
||||
@ -217,4 +242,8 @@ func TestRouteWatchLiveFeed(t *testing.T) {
|
||||
}
|
||||
t.Logf("Recorded %d AS peering relationships in 5 seconds", stats.Peerings)
|
||||
|
||||
if stats.LiveRoutes == 0 {
|
||||
t.Error("Expected to have some active routes")
|
||||
}
|
||||
t.Logf("Active routes: %d", stats.LiveRoutes)
|
||||
}
|
||||
|
@ -2,16 +2,12 @@ package routewatch
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"strconv"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||
)
|
||||
|
||||
const (
|
||||
// databaseHandlerQueueSize is the queue capacity for database operations
|
||||
databaseHandlerQueueSize = 100
|
||||
)
|
||||
|
||||
// DatabaseHandler handles BGP messages and stores them in the database
|
||||
type DatabaseHandler struct {
|
||||
db database.Store
|
||||
@ -32,17 +28,19 @@ func (h *DatabaseHandler) WantsMessage(messageType string) bool {
|
||||
return messageType == "UPDATE"
|
||||
}
|
||||
|
||||
// QueueCapacity returns the desired queue capacity for this handler
|
||||
func (h *DatabaseHandler) QueueCapacity() int {
|
||||
// Database operations are slow, so use a smaller queue
|
||||
return databaseHandlerQueueSize
|
||||
}
|
||||
|
||||
// HandleMessage processes a RIS message and updates the database
|
||||
func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
// Use the pre-parsed timestamp
|
||||
timestamp := msg.ParsedTimestamp
|
||||
|
||||
// Parse peer ASN
|
||||
peerASN, err := strconv.Atoi(msg.PeerASN)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to parse peer ASN", "peer_asn", msg.PeerASN, "error", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Get origin ASN from path (last element)
|
||||
var originASN int
|
||||
if len(msg.Path) > 0 {
|
||||
@ -53,7 +51,7 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
for _, announcement := range msg.Announcements {
|
||||
for _, prefix := range announcement.Prefixes {
|
||||
// Get or create prefix
|
||||
_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
|
||||
p, err := h.db.GetOrCreatePrefix(prefix, timestamp)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
|
||||
|
||||
@ -61,13 +59,30 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
}
|
||||
|
||||
// Get or create origin ASN
|
||||
_, err = h.db.GetOrCreateASN(originASN, timestamp)
|
||||
asn, err := h.db.GetOrCreateASN(originASN, timestamp)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to get/create ASN", "asn", originASN, "error", err)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// Update live route
|
||||
err = h.db.UpdateLiveRoute(
|
||||
p.ID,
|
||||
asn.ID,
|
||||
peerASN,
|
||||
announcement.NextHop,
|
||||
timestamp,
|
||||
)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to update live route",
|
||||
"prefix", prefix,
|
||||
"origin_asn", originASN,
|
||||
"peer_asn", peerASN,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
|
||||
// TODO: Record the announcement in the announcements table
|
||||
// Process AS path to update peerings
|
||||
if len(msg.Path) > 1 {
|
||||
@ -107,13 +122,23 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
// Process withdrawals
|
||||
for _, prefix := range msg.Withdrawals {
|
||||
// Get prefix
|
||||
_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
|
||||
p, err := h.db.GetOrCreatePrefix(prefix, timestamp)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to get prefix for withdrawal", "prefix", prefix, "error", err)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: Record the withdrawal in the announcements table as a withdrawal
|
||||
// Withdraw the route
|
||||
err = h.db.WithdrawLiveRoute(p.ID, peerASN, timestamp)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to withdraw route",
|
||||
"prefix", prefix,
|
||||
"peer_asn", peerASN,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
|
||||
// TODO: Record the withdrawal in the withdrawals table
|
||||
}
|
||||
}
|
||||
|
@ -8,11 +8,6 @@ import (
|
||||
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||
)
|
||||
|
||||
const (
|
||||
// peerHandlerQueueSize is the queue capacity for peer tracking operations
|
||||
peerHandlerQueueSize = 500
|
||||
)
|
||||
|
||||
// PeerHandler tracks BGP peers from all message types
|
||||
type PeerHandler struct {
|
||||
db database.Store
|
||||
@ -32,12 +27,6 @@ func (h *PeerHandler) WantsMessage(_ string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// QueueCapacity returns the desired queue capacity for this handler
|
||||
func (h *PeerHandler) QueueCapacity() int {
|
||||
// Peer tracking is lightweight but involves database ops, use moderate queue
|
||||
return peerHandlerQueueSize
|
||||
}
|
||||
|
||||
// HandleMessage processes a message to track peer information
|
||||
func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
// Parse peer ASN from string
|
||||
|
@ -1,131 +0,0 @@
|
||||
package routewatch
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"strconv"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||
"git.eeqj.de/sneak/routewatch/internal/routingtable"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
// routingTableHandlerQueueSize is the queue capacity for in-memory routing table operations
|
||||
routingTableHandlerQueueSize = 10000
|
||||
)
|
||||
|
||||
// RoutingTableHandler handles BGP messages and updates the in-memory routing table
|
||||
type RoutingTableHandler struct {
|
||||
rt *routingtable.RoutingTable
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewRoutingTableHandler creates a new routing table handler
|
||||
func NewRoutingTableHandler(rt *routingtable.RoutingTable, logger *slog.Logger) *RoutingTableHandler {
|
||||
return &RoutingTableHandler{
|
||||
rt: rt,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// WantsMessage returns true if this handler wants to process messages of the given type
|
||||
func (h *RoutingTableHandler) WantsMessage(messageType string) bool {
|
||||
// We only care about UPDATE messages for the routing table
|
||||
return messageType == "UPDATE"
|
||||
}
|
||||
|
||||
// QueueCapacity returns the desired queue capacity for this handler
|
||||
func (h *RoutingTableHandler) QueueCapacity() int {
|
||||
// In-memory operations are very fast, so use a large queue
|
||||
return routingTableHandlerQueueSize
|
||||
}
|
||||
|
||||
// HandleMessage processes a RIS message and updates the routing table
|
||||
func (h *RoutingTableHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||
// Use the pre-parsed timestamp
|
||||
timestamp := msg.ParsedTimestamp
|
||||
|
||||
// Parse peer ASN
|
||||
peerASN, err := strconv.Atoi(msg.PeerASN)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to parse peer ASN", "peer_asn", msg.PeerASN, "error", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Get origin ASN from path (last element)
|
||||
var originASN int
|
||||
if len(msg.Path) > 0 {
|
||||
originASN = msg.Path[len(msg.Path)-1]
|
||||
}
|
||||
|
||||
// Process announcements
|
||||
for _, announcement := range msg.Announcements {
|
||||
for _, prefix := range announcement.Prefixes {
|
||||
// Generate deterministic UUIDs based on the prefix and origin ASN
|
||||
// This ensures consistency across restarts
|
||||
prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix))
|
||||
originASNID := uuid.NewSHA1(uuid.NameSpaceOID, []byte(strconv.Itoa(originASN)))
|
||||
|
||||
// Create route for the routing table
|
||||
route := &routingtable.Route{
|
||||
PrefixID: prefixID,
|
||||
Prefix: prefix,
|
||||
OriginASNID: originASNID,
|
||||
OriginASN: originASN,
|
||||
PeerASN: peerASN,
|
||||
ASPath: msg.Path,
|
||||
NextHop: announcement.NextHop,
|
||||
AnnouncedAt: timestamp,
|
||||
}
|
||||
|
||||
// Add route to routing table
|
||||
h.rt.AddRoute(route)
|
||||
}
|
||||
}
|
||||
|
||||
// Process withdrawals
|
||||
for _, prefix := range msg.Withdrawals {
|
||||
// Generate deterministic UUID for the prefix
|
||||
prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix))
|
||||
|
||||
// Withdraw all routes for this prefix from this peer
|
||||
h.rt.WithdrawRoutesByPrefixAndPeer(prefixID, peerASN)
|
||||
}
|
||||
}
|
||||
|
||||
// GetRoutingTableStats returns statistics about the routing table
|
||||
func (h *RoutingTableHandler) GetRoutingTableStats() map[string]int {
|
||||
return h.rt.Stats()
|
||||
}
|
||||
|
||||
// GetActiveRouteCount returns the number of active routes
|
||||
func (h *RoutingTableHandler) GetActiveRouteCount() int {
|
||||
return h.rt.Size()
|
||||
}
|
||||
|
||||
// GetRoutesByPrefix returns all routes for a specific prefix
|
||||
func (h *RoutingTableHandler) GetRoutesByPrefix(prefixID uuid.UUID) []*routingtable.Route {
|
||||
return h.rt.GetRoutesByPrefix(prefixID)
|
||||
}
|
||||
|
||||
// GetRoutesByOriginASN returns all routes originated by a specific ASN
|
||||
func (h *RoutingTableHandler) GetRoutesByOriginASN(originASNID uuid.UUID) []*routingtable.Route {
|
||||
return h.rt.GetRoutesByOriginASN(originASNID)
|
||||
}
|
||||
|
||||
// GetRoutesByPeerASN returns all routes received from a specific peer ASN
|
||||
func (h *RoutingTableHandler) GetRoutesByPeerASN(peerASN int) []*routingtable.Route {
|
||||
return h.rt.GetRoutesByPeerASN(peerASN)
|
||||
}
|
||||
|
||||
// GetAllRoutes returns all active routes
|
||||
func (h *RoutingTableHandler) GetAllRoutes() []*routingtable.Route {
|
||||
return h.rt.GetAllRoutes()
|
||||
}
|
||||
|
||||
// ClearRoutingTable clears all routes from the routing table
|
||||
func (h *RoutingTableHandler) ClearRoutingTable() {
|
||||
h.rt.Clear()
|
||||
h.logger.Info("Cleared routing table")
|
||||
}
|
@ -1,397 +0,0 @@
|
||||
// Package routingtable provides a thread-safe in-memory representation of the DFZ routing table.
|
||||
package routingtable
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Route represents a single route entry in the routing table
|
||||
type Route struct {
|
||||
PrefixID uuid.UUID `json:"prefix_id"`
|
||||
Prefix string `json:"prefix"` // The actual prefix string (e.g., "10.0.0.0/8")
|
||||
OriginASNID uuid.UUID `json:"origin_asn_id"`
|
||||
OriginASN int `json:"origin_asn"` // The actual ASN number
|
||||
PeerASN int `json:"peer_asn"`
|
||||
ASPath []int `json:"as_path"` // Full AS path
|
||||
NextHop string `json:"next_hop"`
|
||||
AnnouncedAt time.Time `json:"announced_at"`
|
||||
}
|
||||
|
||||
// RouteKey uniquely identifies a route in the table
|
||||
type RouteKey struct {
|
||||
PrefixID uuid.UUID
|
||||
OriginASNID uuid.UUID
|
||||
PeerASN int
|
||||
}
|
||||
|
||||
// RoutingTable is a thread-safe in-memory routing table
|
||||
type RoutingTable struct {
|
||||
mu sync.RWMutex
|
||||
routes map[RouteKey]*Route
|
||||
|
||||
// Secondary indexes for efficient lookups
|
||||
byPrefix map[uuid.UUID]map[RouteKey]*Route // Routes indexed by prefix ID
|
||||
byOriginASN map[uuid.UUID]map[RouteKey]*Route // Routes indexed by origin ASN ID
|
||||
byPeerASN map[int]map[RouteKey]*Route // Routes indexed by peer ASN
|
||||
|
||||
// Metrics tracking
|
||||
ipv4Routes int
|
||||
ipv6Routes int
|
||||
ipv4Updates uint64 // Updates counter for rate calculation
|
||||
ipv6Updates uint64 // Updates counter for rate calculation
|
||||
lastMetricsReset time.Time
|
||||
}
|
||||
|
||||
// New creates a new empty routing table
|
||||
func New() *RoutingTable {
|
||||
return &RoutingTable{
|
||||
routes: make(map[RouteKey]*Route),
|
||||
byPrefix: make(map[uuid.UUID]map[RouteKey]*Route),
|
||||
byOriginASN: make(map[uuid.UUID]map[RouteKey]*Route),
|
||||
byPeerASN: make(map[int]map[RouteKey]*Route),
|
||||
lastMetricsReset: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// AddRoute adds or updates a route in the routing table
|
||||
func (rt *RoutingTable) AddRoute(route *Route) {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
|
||||
key := RouteKey{
|
||||
PrefixID: route.PrefixID,
|
||||
OriginASNID: route.OriginASNID,
|
||||
PeerASN: route.PeerASN,
|
||||
}
|
||||
|
||||
// If route already exists, remove it from indexes first
|
||||
if existingRoute, exists := rt.routes[key]; exists {
|
||||
rt.removeFromIndexes(key, existingRoute)
|
||||
// Decrement counter for existing route
|
||||
if isIPv6(existingRoute.Prefix) {
|
||||
rt.ipv6Routes--
|
||||
} else {
|
||||
rt.ipv4Routes--
|
||||
}
|
||||
}
|
||||
|
||||
// Add to main map
|
||||
rt.routes[key] = route
|
||||
|
||||
// Update indexes
|
||||
rt.addToIndexes(key, route)
|
||||
|
||||
// Update metrics
|
||||
if isIPv6(route.Prefix) {
|
||||
rt.ipv6Routes++
|
||||
atomic.AddUint64(&rt.ipv6Updates, 1)
|
||||
} else {
|
||||
rt.ipv4Routes++
|
||||
atomic.AddUint64(&rt.ipv4Updates, 1)
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveRoute removes a route from the routing table
|
||||
func (rt *RoutingTable) RemoveRoute(prefixID, originASNID uuid.UUID, peerASN int) bool {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
|
||||
key := RouteKey{
|
||||
PrefixID: prefixID,
|
||||
OriginASNID: originASNID,
|
||||
PeerASN: peerASN,
|
||||
}
|
||||
|
||||
route, exists := rt.routes[key]
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
|
||||
// Remove from indexes
|
||||
rt.removeFromIndexes(key, route)
|
||||
|
||||
// Remove from main map
|
||||
delete(rt.routes, key)
|
||||
|
||||
// Update metrics
|
||||
if isIPv6(route.Prefix) {
|
||||
rt.ipv6Routes--
|
||||
atomic.AddUint64(&rt.ipv6Updates, 1)
|
||||
} else {
|
||||
rt.ipv4Routes--
|
||||
atomic.AddUint64(&rt.ipv4Updates, 1)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// WithdrawRoutesByPrefixAndPeer removes all routes for a specific prefix from a specific peer
|
||||
func (rt *RoutingTable) WithdrawRoutesByPrefixAndPeer(prefixID uuid.UUID, peerASN int) int {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
|
||||
prefixRoutes, exists := rt.byPrefix[prefixID]
|
||||
if !exists {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Collect keys to delete (can't delete while iterating)
|
||||
var keysToDelete []RouteKey
|
||||
for key, route := range prefixRoutes {
|
||||
if route.PeerASN == peerASN {
|
||||
keysToDelete = append(keysToDelete, key)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete the routes
|
||||
count := 0
|
||||
for _, key := range keysToDelete {
|
||||
route, exists := rt.routes[key]
|
||||
if !exists {
|
||||
continue
|
||||
}
|
||||
|
||||
rt.removeFromIndexes(key, route)
|
||||
delete(rt.routes, key)
|
||||
count++
|
||||
|
||||
// Update metrics
|
||||
if isIPv6(route.Prefix) {
|
||||
rt.ipv6Routes--
|
||||
atomic.AddUint64(&rt.ipv6Updates, 1)
|
||||
} else {
|
||||
rt.ipv4Routes--
|
||||
atomic.AddUint64(&rt.ipv4Updates, 1)
|
||||
}
|
||||
}
|
||||
|
||||
return count
|
||||
}
|
||||
|
||||
// GetRoute retrieves a specific route
|
||||
func (rt *RoutingTable) GetRoute(prefixID, originASNID uuid.UUID, peerASN int) (*Route, bool) {
|
||||
rt.mu.RLock()
|
||||
defer rt.mu.RUnlock()
|
||||
|
||||
key := RouteKey{
|
||||
PrefixID: prefixID,
|
||||
OriginASNID: originASNID,
|
||||
PeerASN: peerASN,
|
||||
}
|
||||
|
||||
route, exists := rt.routes[key]
|
||||
if !exists {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Return a copy to prevent external modification
|
||||
routeCopy := *route
|
||||
|
||||
return &routeCopy, true
|
||||
}
|
||||
|
||||
// GetRoutesByPrefix returns all routes for a specific prefix
|
||||
func (rt *RoutingTable) GetRoutesByPrefix(prefixID uuid.UUID) []*Route {
|
||||
rt.mu.RLock()
|
||||
defer rt.mu.RUnlock()
|
||||
|
||||
routes := make([]*Route, 0)
|
||||
if prefixRoutes, exists := rt.byPrefix[prefixID]; exists {
|
||||
for _, route := range prefixRoutes {
|
||||
routeCopy := *route
|
||||
routes = append(routes, &routeCopy)
|
||||
}
|
||||
}
|
||||
|
||||
return routes
|
||||
}
|
||||
|
||||
// GetRoutesByOriginASN returns all routes originated by a specific ASN
|
||||
func (rt *RoutingTable) GetRoutesByOriginASN(originASNID uuid.UUID) []*Route {
|
||||
rt.mu.RLock()
|
||||
defer rt.mu.RUnlock()
|
||||
|
||||
routes := make([]*Route, 0)
|
||||
if asnRoutes, exists := rt.byOriginASN[originASNID]; exists {
|
||||
for _, route := range asnRoutes {
|
||||
routeCopy := *route
|
||||
routes = append(routes, &routeCopy)
|
||||
}
|
||||
}
|
||||
|
||||
return routes
|
||||
}
|
||||
|
||||
// GetRoutesByPeerASN returns all routes received from a specific peer ASN
|
||||
func (rt *RoutingTable) GetRoutesByPeerASN(peerASN int) []*Route {
|
||||
rt.mu.RLock()
|
||||
defer rt.mu.RUnlock()
|
||||
|
||||
routes := make([]*Route, 0)
|
||||
if peerRoutes, exists := rt.byPeerASN[peerASN]; exists {
|
||||
for _, route := range peerRoutes {
|
||||
routeCopy := *route
|
||||
routes = append(routes, &routeCopy)
|
||||
}
|
||||
}
|
||||
|
||||
return routes
|
||||
}
|
||||
|
||||
// GetAllRoutes returns all active routes in the routing table
|
||||
func (rt *RoutingTable) GetAllRoutes() []*Route {
|
||||
rt.mu.RLock()
|
||||
defer rt.mu.RUnlock()
|
||||
|
||||
routes := make([]*Route, 0, len(rt.routes))
|
||||
for _, route := range rt.routes {
|
||||
routeCopy := *route
|
||||
routes = append(routes, &routeCopy)
|
||||
}
|
||||
|
||||
return routes
|
||||
}
|
||||
|
||||
// Size returns the total number of routes in the table
|
||||
func (rt *RoutingTable) Size() int {
|
||||
rt.mu.RLock()
|
||||
defer rt.mu.RUnlock()
|
||||
|
||||
return len(rt.routes)
|
||||
}
|
||||
|
||||
// Stats returns statistics about the routing table
|
||||
func (rt *RoutingTable) Stats() map[string]int {
|
||||
rt.mu.RLock()
|
||||
defer rt.mu.RUnlock()
|
||||
|
||||
stats := map[string]int{
|
||||
"total_routes": len(rt.routes),
|
||||
"unique_prefixes": len(rt.byPrefix),
|
||||
"unique_origins": len(rt.byOriginASN),
|
||||
"unique_peers": len(rt.byPeerASN),
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
// DetailedStats contains detailed routing table statistics
|
||||
type DetailedStats struct {
|
||||
IPv4Routes int
|
||||
IPv6Routes int
|
||||
IPv4UpdatesRate float64
|
||||
IPv6UpdatesRate float64
|
||||
TotalRoutes int
|
||||
UniquePrefixes int
|
||||
UniqueOrigins int
|
||||
UniquePeers int
|
||||
}
|
||||
|
||||
// GetDetailedStats returns detailed statistics including IPv4/IPv6 breakdown and update rates
|
||||
func (rt *RoutingTable) GetDetailedStats() DetailedStats {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
|
||||
// Calculate update rates
|
||||
elapsed := time.Since(rt.lastMetricsReset).Seconds()
|
||||
ipv4Updates := atomic.LoadUint64(&rt.ipv4Updates)
|
||||
ipv6Updates := atomic.LoadUint64(&rt.ipv6Updates)
|
||||
|
||||
stats := DetailedStats{
|
||||
IPv4Routes: rt.ipv4Routes,
|
||||
IPv6Routes: rt.ipv6Routes,
|
||||
IPv4UpdatesRate: float64(ipv4Updates) / elapsed,
|
||||
IPv6UpdatesRate: float64(ipv6Updates) / elapsed,
|
||||
TotalRoutes: len(rt.routes),
|
||||
UniquePrefixes: len(rt.byPrefix),
|
||||
UniqueOrigins: len(rt.byOriginASN),
|
||||
UniquePeers: len(rt.byPeerASN),
|
||||
}
|
||||
|
||||
// Reset counters for next period
|
||||
atomic.StoreUint64(&rt.ipv4Updates, 0)
|
||||
atomic.StoreUint64(&rt.ipv6Updates, 0)
|
||||
rt.lastMetricsReset = time.Now()
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
// Clear removes all routes from the routing table
|
||||
func (rt *RoutingTable) Clear() {
|
||||
rt.mu.Lock()
|
||||
defer rt.mu.Unlock()
|
||||
|
||||
rt.routes = make(map[RouteKey]*Route)
|
||||
rt.byPrefix = make(map[uuid.UUID]map[RouteKey]*Route)
|
||||
rt.byOriginASN = make(map[uuid.UUID]map[RouteKey]*Route)
|
||||
rt.byPeerASN = make(map[int]map[RouteKey]*Route)
|
||||
rt.ipv4Routes = 0
|
||||
rt.ipv6Routes = 0
|
||||
atomic.StoreUint64(&rt.ipv4Updates, 0)
|
||||
atomic.StoreUint64(&rt.ipv6Updates, 0)
|
||||
rt.lastMetricsReset = time.Now()
|
||||
}
|
||||
|
||||
// Helper methods for index management
|
||||
|
||||
func (rt *RoutingTable) addToIndexes(key RouteKey, route *Route) {
|
||||
// Add to prefix index
|
||||
if rt.byPrefix[route.PrefixID] == nil {
|
||||
rt.byPrefix[route.PrefixID] = make(map[RouteKey]*Route)
|
||||
}
|
||||
rt.byPrefix[route.PrefixID][key] = route
|
||||
|
||||
// Add to origin ASN index
|
||||
if rt.byOriginASN[route.OriginASNID] == nil {
|
||||
rt.byOriginASN[route.OriginASNID] = make(map[RouteKey]*Route)
|
||||
}
|
||||
rt.byOriginASN[route.OriginASNID][key] = route
|
||||
|
||||
// Add to peer ASN index
|
||||
if rt.byPeerASN[route.PeerASN] == nil {
|
||||
rt.byPeerASN[route.PeerASN] = make(map[RouteKey]*Route)
|
||||
}
|
||||
rt.byPeerASN[route.PeerASN][key] = route
|
||||
}
|
||||
|
||||
func (rt *RoutingTable) removeFromIndexes(key RouteKey, route *Route) {
|
||||
// Remove from prefix index
|
||||
if prefixRoutes, exists := rt.byPrefix[route.PrefixID]; exists {
|
||||
delete(prefixRoutes, key)
|
||||
if len(prefixRoutes) == 0 {
|
||||
delete(rt.byPrefix, route.PrefixID)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from origin ASN index
|
||||
if asnRoutes, exists := rt.byOriginASN[route.OriginASNID]; exists {
|
||||
delete(asnRoutes, key)
|
||||
if len(asnRoutes) == 0 {
|
||||
delete(rt.byOriginASN, route.OriginASNID)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from peer ASN index
|
||||
if peerRoutes, exists := rt.byPeerASN[route.PeerASN]; exists {
|
||||
delete(peerRoutes, key)
|
||||
if len(peerRoutes) == 0 {
|
||||
delete(rt.byPeerASN, route.PeerASN)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// String returns a string representation of the route key
|
||||
func (k RouteKey) String() string {
|
||||
return fmt.Sprintf("%s/%s/%d", k.PrefixID, k.OriginASNID, k.PeerASN)
|
||||
}
|
||||
|
||||
// isIPv6 returns true if the prefix is an IPv6 address
|
||||
func isIPv6(prefix string) bool {
|
||||
return strings.Contains(prefix, ":")
|
||||
}
|
@ -1,219 +0,0 @@
|
||||
package routingtable
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func TestRoutingTable(t *testing.T) {
|
||||
rt := New()
|
||||
|
||||
// Test data
|
||||
prefixID1 := uuid.New()
|
||||
prefixID2 := uuid.New()
|
||||
originASNID1 := uuid.New()
|
||||
originASNID2 := uuid.New()
|
||||
|
||||
route1 := &Route{
|
||||
PrefixID: prefixID1,
|
||||
Prefix: "10.0.0.0/8",
|
||||
OriginASNID: originASNID1,
|
||||
OriginASN: 64512,
|
||||
PeerASN: 64513,
|
||||
ASPath: []int{64513, 64512},
|
||||
NextHop: "192.168.1.1",
|
||||
AnnouncedAt: time.Now(),
|
||||
}
|
||||
|
||||
route2 := &Route{
|
||||
PrefixID: prefixID2,
|
||||
Prefix: "192.168.0.0/16",
|
||||
OriginASNID: originASNID2,
|
||||
OriginASN: 64514,
|
||||
PeerASN: 64513,
|
||||
ASPath: []int{64513, 64514},
|
||||
NextHop: "192.168.1.1",
|
||||
AnnouncedAt: time.Now(),
|
||||
}
|
||||
|
||||
// Test AddRoute
|
||||
rt.AddRoute(route1)
|
||||
rt.AddRoute(route2)
|
||||
|
||||
if rt.Size() != 2 {
|
||||
t.Errorf("Expected 2 routes, got %d", rt.Size())
|
||||
}
|
||||
|
||||
// Test GetRoute
|
||||
retrievedRoute, exists := rt.GetRoute(prefixID1, originASNID1, 64513)
|
||||
if !exists {
|
||||
t.Error("Route 1 should exist")
|
||||
}
|
||||
if retrievedRoute.Prefix != "10.0.0.0/8" {
|
||||
t.Errorf("Expected prefix 10.0.0.0/8, got %s", retrievedRoute.Prefix)
|
||||
}
|
||||
|
||||
// Test GetRoutesByPrefix
|
||||
prefixRoutes := rt.GetRoutesByPrefix(prefixID1)
|
||||
if len(prefixRoutes) != 1 {
|
||||
t.Errorf("Expected 1 route for prefix, got %d", len(prefixRoutes))
|
||||
}
|
||||
|
||||
// Test GetRoutesByPeerASN
|
||||
peerRoutes := rt.GetRoutesByPeerASN(64513)
|
||||
if len(peerRoutes) != 2 {
|
||||
t.Errorf("Expected 2 routes from peer 64513, got %d", len(peerRoutes))
|
||||
}
|
||||
|
||||
// Test RemoveRoute
|
||||
removed := rt.RemoveRoute(prefixID1, originASNID1, 64513)
|
||||
if !removed {
|
||||
t.Error("Route should have been removed")
|
||||
}
|
||||
if rt.Size() != 1 {
|
||||
t.Errorf("Expected 1 route after removal, got %d", rt.Size())
|
||||
}
|
||||
|
||||
// Test WithdrawRoutesByPrefixAndPeer
|
||||
// Add the route back first
|
||||
rt.AddRoute(route1)
|
||||
|
||||
// Add another route for the same prefix from the same peer
|
||||
route3 := &Route{
|
||||
PrefixID: prefixID1,
|
||||
Prefix: "10.0.0.0/8",
|
||||
OriginASNID: originASNID2, // Different origin
|
||||
OriginASN: 64515,
|
||||
PeerASN: 64513,
|
||||
ASPath: []int{64513, 64515},
|
||||
NextHop: "192.168.1.1",
|
||||
AnnouncedAt: time.Now(),
|
||||
}
|
||||
rt.AddRoute(route3)
|
||||
|
||||
count := rt.WithdrawRoutesByPrefixAndPeer(prefixID1, 64513)
|
||||
if count != 2 {
|
||||
t.Errorf("Expected to withdraw 2 routes, withdrew %d", count)
|
||||
}
|
||||
|
||||
// Should only have route2 left
|
||||
if rt.Size() != 1 {
|
||||
t.Errorf("Expected 1 route after withdrawal, got %d", rt.Size())
|
||||
}
|
||||
|
||||
// Test Stats
|
||||
stats := rt.Stats()
|
||||
if stats["total_routes"] != 1 {
|
||||
t.Errorf("Expected 1 total route in stats, got %d", stats["total_routes"])
|
||||
}
|
||||
|
||||
// Test Clear
|
||||
rt.Clear()
|
||||
if rt.Size() != 0 {
|
||||
t.Errorf("Expected 0 routes after clear, got %d", rt.Size())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRoutingTableConcurrency(t *testing.T) {
|
||||
rt := New()
|
||||
|
||||
// Test concurrent access
|
||||
var wg sync.WaitGroup
|
||||
numGoroutines := 10
|
||||
numOperations := 100
|
||||
|
||||
// Start multiple goroutines that add/remove routes
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
|
||||
for j := 0; j < numOperations; j++ {
|
||||
prefixID := uuid.New()
|
||||
originASNID := uuid.New()
|
||||
|
||||
route := &Route{
|
||||
PrefixID: prefixID,
|
||||
Prefix: "10.0.0.0/8",
|
||||
OriginASNID: originASNID,
|
||||
OriginASN: 64512 + id,
|
||||
PeerASN: 64500,
|
||||
ASPath: []int{64500, 64512 + id},
|
||||
NextHop: "192.168.1.1",
|
||||
AnnouncedAt: time.Now(),
|
||||
}
|
||||
|
||||
// Add route
|
||||
rt.AddRoute(route)
|
||||
|
||||
// Try to get it
|
||||
_, _ = rt.GetRoute(prefixID, originASNID, 64500)
|
||||
|
||||
// Get stats
|
||||
_ = rt.Stats()
|
||||
|
||||
// Remove it
|
||||
rt.RemoveRoute(prefixID, originASNID, 64500)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Table should be empty after all operations
|
||||
if rt.Size() != 0 {
|
||||
t.Errorf("Expected empty table after concurrent operations, got %d routes", rt.Size())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouteUpdate(t *testing.T) {
|
||||
rt := New()
|
||||
|
||||
prefixID := uuid.New()
|
||||
originASNID := uuid.New()
|
||||
|
||||
route1 := &Route{
|
||||
PrefixID: prefixID,
|
||||
Prefix: "10.0.0.0/8",
|
||||
OriginASNID: originASNID,
|
||||
OriginASN: 64512,
|
||||
PeerASN: 64513,
|
||||
ASPath: []int{64513, 64512},
|
||||
NextHop: "192.168.1.1",
|
||||
AnnouncedAt: time.Now(),
|
||||
}
|
||||
|
||||
// Add initial route
|
||||
rt.AddRoute(route1)
|
||||
|
||||
// Update the same route with new next hop
|
||||
route2 := &Route{
|
||||
PrefixID: prefixID,
|
||||
Prefix: "10.0.0.0/8",
|
||||
OriginASNID: originASNID,
|
||||
OriginASN: 64512,
|
||||
PeerASN: 64513,
|
||||
ASPath: []int{64513, 64512},
|
||||
NextHop: "192.168.1.2", // Changed
|
||||
AnnouncedAt: time.Now().Add(1 * time.Minute),
|
||||
}
|
||||
|
||||
rt.AddRoute(route2)
|
||||
|
||||
// Should still have only 1 route
|
||||
if rt.Size() != 1 {
|
||||
t.Errorf("Expected 1 route after update, got %d", rt.Size())
|
||||
}
|
||||
|
||||
// Check that the route was updated
|
||||
retrievedRoute, exists := rt.GetRoute(prefixID, originASNID, 64513)
|
||||
if !exists {
|
||||
t.Error("Route should exist after update")
|
||||
}
|
||||
if retrievedRoute.NextHop != "192.168.1.2" {
|
||||
t.Errorf("Expected updated next hop 192.168.1.2, got %s", retrievedRoute.NextHop)
|
||||
}
|
||||
}
|
@ -10,7 +10,6 @@ import (
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||
"git.eeqj.de/sneak/routewatch/internal/routingtable"
|
||||
"git.eeqj.de/sneak/routewatch/internal/streamer"
|
||||
"git.eeqj.de/sneak/routewatch/internal/templates"
|
||||
"github.com/go-chi/chi/v5"
|
||||
@ -19,21 +18,19 @@ import (
|
||||
|
||||
// Server provides HTTP endpoints for status monitoring
|
||||
type Server struct {
|
||||
router *chi.Mux
|
||||
db database.Store
|
||||
routingTable *routingtable.RoutingTable
|
||||
streamer *streamer.Streamer
|
||||
logger *slog.Logger
|
||||
srv *http.Server
|
||||
router *chi.Mux
|
||||
db database.Store
|
||||
streamer *streamer.Streamer
|
||||
logger *slog.Logger
|
||||
srv *http.Server
|
||||
}
|
||||
|
||||
// New creates a new HTTP server
|
||||
func New(db database.Store, rt *routingtable.RoutingTable, streamer *streamer.Streamer, logger *slog.Logger) *Server {
|
||||
func New(db database.Store, streamer *streamer.Streamer, logger *slog.Logger) *Server {
|
||||
s := &Server{
|
||||
db: db,
|
||||
routingTable: rt,
|
||||
streamer: streamer,
|
||||
logger: logger,
|
||||
db: db,
|
||||
streamer: streamer,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
s.setupRoutes()
|
||||
@ -203,7 +200,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
|
||||
IPv4Prefixes: dbStats.IPv4Prefixes,
|
||||
IPv6Prefixes: dbStats.IPv6Prefixes,
|
||||
Peerings: dbStats.Peerings,
|
||||
LiveRoutes: s.routingTable.Size(),
|
||||
LiveRoutes: dbStats.LiveRoutes,
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
@ -303,7 +300,7 @@ func (s *Server) handleStats() http.HandlerFunc {
|
||||
IPv4Prefixes: dbStats.IPv4Prefixes,
|
||||
IPv6Prefixes: dbStats.IPv6Prefixes,
|
||||
Peerings: dbStats.Peerings,
|
||||
LiveRoutes: s.routingTable.Size(),
|
||||
LiveRoutes: dbStats.LiveRoutes,
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
@ -25,7 +25,7 @@ const (
|
||||
metricsLogInterval = 10 * time.Second
|
||||
bytesPerKB = 1024
|
||||
bytesPerMB = 1024 * 1024
|
||||
maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers
|
||||
maxConcurrentHandlers = 100 // Maximum number of concurrent message handlers
|
||||
)
|
||||
|
||||
// MessageHandler is an interface for handling RIS messages
|
||||
@ -35,43 +35,23 @@ type MessageHandler interface {
|
||||
|
||||
// HandleMessage processes a RIS message
|
||||
HandleMessage(msg *ristypes.RISMessage)
|
||||
|
||||
// QueueCapacity returns the desired queue capacity for this handler
|
||||
// Handlers that process quickly can have larger queues
|
||||
QueueCapacity() int
|
||||
}
|
||||
|
||||
// RawMessageHandler is a callback for handling raw JSON lines from the stream
|
||||
type RawMessageHandler func(line string)
|
||||
|
||||
// handlerMetrics tracks performance metrics for a handler
|
||||
type handlerMetrics struct {
|
||||
processedCount uint64 // Total messages processed
|
||||
droppedCount uint64 // Total messages dropped
|
||||
totalTime time.Duration // Total processing time (for average calculation)
|
||||
minTime time.Duration // Minimum processing time
|
||||
maxTime time.Duration // Maximum processing time
|
||||
mu sync.Mutex // Protects the metrics
|
||||
}
|
||||
|
||||
// handlerInfo wraps a handler with its queue and metrics
|
||||
type handlerInfo struct {
|
||||
handler MessageHandler
|
||||
queue chan *ristypes.RISMessage
|
||||
metrics handlerMetrics
|
||||
}
|
||||
|
||||
// Streamer handles streaming BGP updates from RIS Live
|
||||
type Streamer struct {
|
||||
logger *slog.Logger
|
||||
client *http.Client
|
||||
handlers []*handlerInfo
|
||||
rawHandler RawMessageHandler
|
||||
mu sync.RWMutex
|
||||
cancel context.CancelFunc
|
||||
running bool
|
||||
metrics *metrics.Tracker
|
||||
totalDropped uint64 // Total dropped messages across all handlers
|
||||
logger *slog.Logger
|
||||
client *http.Client
|
||||
handlers []MessageHandler
|
||||
rawHandler RawMessageHandler
|
||||
mu sync.RWMutex
|
||||
cancel context.CancelFunc
|
||||
running bool
|
||||
metrics *metrics.Tracker
|
||||
semaphore chan struct{} // Limits concurrent message processing
|
||||
droppedMessages uint64 // Atomic counter for dropped messages
|
||||
}
|
||||
|
||||
// New creates a new RIS streamer
|
||||
@ -81,8 +61,9 @@ func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer {
|
||||
client: &http.Client{
|
||||
Timeout: 0, // No timeout for streaming
|
||||
},
|
||||
handlers: make([]*handlerInfo, 0),
|
||||
metrics: metrics,
|
||||
handlers: make([]MessageHandler, 0),
|
||||
metrics: metrics,
|
||||
semaphore: make(chan struct{}, maxConcurrentHandlers),
|
||||
}
|
||||
}
|
||||
|
||||
@ -90,19 +71,7 @@ func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer {
|
||||
func (s *Streamer) RegisterHandler(handler MessageHandler) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Create handler info with its own queue based on capacity
|
||||
info := &handlerInfo{
|
||||
handler: handler,
|
||||
queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()),
|
||||
}
|
||||
|
||||
s.handlers = append(s.handlers, info)
|
||||
|
||||
// If we're already running, start a worker for this handler
|
||||
if s.running {
|
||||
go s.runHandlerWorker(info)
|
||||
}
|
||||
s.handlers = append(s.handlers, handler)
|
||||
}
|
||||
|
||||
// RegisterRawHandler sets a callback for raw message lines
|
||||
@ -125,11 +94,6 @@ func (s *Streamer) Start() error {
|
||||
s.cancel = cancel
|
||||
s.running = true
|
||||
|
||||
// Start workers for each handler
|
||||
for _, info := range s.handlers {
|
||||
go s.runHandlerWorker(info)
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := s.stream(ctx); err != nil {
|
||||
s.logger.Error("Streaming error", "error", err)
|
||||
@ -148,40 +112,10 @@ func (s *Streamer) Stop() {
|
||||
if s.cancel != nil {
|
||||
s.cancel()
|
||||
}
|
||||
// Close all handler queues to signal workers to stop
|
||||
for _, info := range s.handlers {
|
||||
close(info.queue)
|
||||
}
|
||||
s.running = false
|
||||
s.mu.Unlock()
|
||||
s.metrics.SetConnected(false)
|
||||
}
|
||||
|
||||
// runHandlerWorker processes messages for a specific handler
|
||||
func (s *Streamer) runHandlerWorker(info *handlerInfo) {
|
||||
for msg := range info.queue {
|
||||
start := time.Now()
|
||||
info.handler.HandleMessage(msg)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// Update metrics
|
||||
info.metrics.mu.Lock()
|
||||
info.metrics.processedCount++
|
||||
info.metrics.totalTime += elapsed
|
||||
|
||||
// Update min time
|
||||
if info.metrics.minTime == 0 || elapsed < info.metrics.minTime {
|
||||
info.metrics.minTime = elapsed
|
||||
}
|
||||
|
||||
// Update max time
|
||||
if elapsed > info.metrics.maxTime {
|
||||
info.metrics.maxTime = elapsed
|
||||
}
|
||||
info.metrics.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// IsRunning returns whether the streamer is currently active
|
||||
func (s *Streamer) IsRunning() bool {
|
||||
s.mu.RLock()
|
||||
@ -197,7 +131,7 @@ func (s *Streamer) GetMetrics() metrics.StreamMetrics {
|
||||
|
||||
// GetDroppedMessages returns the total number of dropped messages
|
||||
func (s *Streamer) GetDroppedMessages() uint64 {
|
||||
return atomic.LoadUint64(&s.totalDropped)
|
||||
return atomic.LoadUint64(&s.droppedMessages)
|
||||
}
|
||||
|
||||
// logMetrics logs the current streaming statistics
|
||||
@ -206,57 +140,18 @@ func (s *Streamer) logMetrics() {
|
||||
uptime := time.Since(metrics.ConnectedSince)
|
||||
|
||||
const bitsPerMegabit = 1000000
|
||||
totalDropped := atomic.LoadUint64(&s.totalDropped)
|
||||
|
||||
s.logger.Info(
|
||||
"Stream statistics",
|
||||
"uptime",
|
||||
uptime,
|
||||
"total_messages",
|
||||
metrics.TotalMessages,
|
||||
"total_bytes",
|
||||
metrics.TotalBytes,
|
||||
"total_mb",
|
||||
fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
|
||||
"messages_per_sec",
|
||||
fmt.Sprintf("%.2f", metrics.MessagesPerSec),
|
||||
"bits_per_sec",
|
||||
fmt.Sprintf("%.0f", metrics.BitsPerSec),
|
||||
"mbps",
|
||||
fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
|
||||
"total_dropped",
|
||||
totalDropped,
|
||||
droppedMessages := atomic.LoadUint64(&s.droppedMessages)
|
||||
s.logger.Info("Stream statistics",
|
||||
"uptime", uptime,
|
||||
"total_messages", metrics.TotalMessages,
|
||||
"total_bytes", metrics.TotalBytes,
|
||||
"total_mb", fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
|
||||
"messages_per_sec", fmt.Sprintf("%.2f", metrics.MessagesPerSec),
|
||||
"bits_per_sec", fmt.Sprintf("%.0f", metrics.BitsPerSec),
|
||||
"mbps", fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
|
||||
"dropped_messages", droppedMessages,
|
||||
"active_handlers", len(s.semaphore),
|
||||
)
|
||||
|
||||
// Log per-handler statistics
|
||||
s.mu.RLock()
|
||||
for i, info := range s.handlers {
|
||||
info.metrics.mu.Lock()
|
||||
if info.metrics.processedCount > 0 {
|
||||
// Safe conversion: processedCount is bounded by maxInt64
|
||||
processedCount := info.metrics.processedCount
|
||||
const maxInt64 = 1<<63 - 1
|
||||
if processedCount > maxInt64 {
|
||||
processedCount = maxInt64
|
||||
}
|
||||
//nolint:gosec // processedCount is explicitly bounded above
|
||||
avgTime := info.metrics.totalTime / time.Duration(processedCount)
|
||||
s.logger.Info(
|
||||
"Handler statistics",
|
||||
"handler", fmt.Sprintf("%T", info.handler),
|
||||
"index", i,
|
||||
"queue_len", len(info.queue),
|
||||
"queue_cap", cap(info.queue),
|
||||
"processed", info.metrics.processedCount,
|
||||
"dropped", info.metrics.droppedCount,
|
||||
"avg_time", avgTime,
|
||||
"min_time", info.metrics.minTime,
|
||||
"max_time", info.metrics.maxTime,
|
||||
)
|
||||
}
|
||||
info.metrics.mu.Unlock()
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
}
|
||||
|
||||
// updateMetrics updates the metrics counters and rates
|
||||
@ -331,91 +226,92 @@ func (s *Streamer) stream(ctx context.Context) error {
|
||||
rawHandler(string(line))
|
||||
}
|
||||
|
||||
// Parse the message first
|
||||
var wrapper ristypes.RISLiveMessage
|
||||
if err := json.Unmarshal(line, &wrapper); err != nil {
|
||||
// Output the raw line and panic on parse failure
|
||||
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err)
|
||||
fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(line))
|
||||
panic(fmt.Sprintf("JSON parse error: %v", err))
|
||||
}
|
||||
|
||||
// Check if it's a ris_message wrapper
|
||||
if wrapper.Type != "ris_message" {
|
||||
s.logger.Error("Unexpected wrapper type",
|
||||
"type", wrapper.Type,
|
||||
"line", string(line),
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// Get the actual message
|
||||
msg := wrapper.Data
|
||||
|
||||
// Parse the timestamp
|
||||
msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC()
|
||||
|
||||
// Process based on message type
|
||||
switch msg.Type {
|
||||
case "UPDATE":
|
||||
// Process BGP UPDATE messages
|
||||
// Will be dispatched to handlers
|
||||
case "RIS_PEER_STATE":
|
||||
// RIS peer state messages - silently ignore
|
||||
continue
|
||||
case "KEEPALIVE":
|
||||
// BGP keepalive messages - silently process
|
||||
continue
|
||||
case "OPEN":
|
||||
// BGP open messages
|
||||
s.logger.Info("BGP session opened",
|
||||
"peer", msg.Peer,
|
||||
"peer_asn", msg.PeerASN,
|
||||
)
|
||||
|
||||
continue
|
||||
case "NOTIFICATION":
|
||||
// BGP notification messages (errors)
|
||||
s.logger.Warn("BGP notification",
|
||||
"peer", msg.Peer,
|
||||
"peer_asn", msg.PeerASN,
|
||||
)
|
||||
|
||||
continue
|
||||
case "STATE":
|
||||
// Peer state changes - silently ignore
|
||||
continue
|
||||
default:
|
||||
fmt.Fprintf(
|
||||
os.Stderr,
|
||||
"UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n",
|
||||
msg.Type,
|
||||
string(line),
|
||||
)
|
||||
panic(
|
||||
fmt.Sprintf(
|
||||
"Unknown RIS message type: %s",
|
||||
msg.Type,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
// Dispatch to interested handlers
|
||||
// Get current handlers
|
||||
s.mu.RLock()
|
||||
for _, info := range s.handlers {
|
||||
if info.handler.WantsMessage(msg.Type) {
|
||||
select {
|
||||
case info.queue <- &msg:
|
||||
// Message queued successfully
|
||||
default:
|
||||
// Queue is full, drop the message
|
||||
atomic.AddUint64(&info.metrics.droppedCount, 1)
|
||||
atomic.AddUint64(&s.totalDropped, 1)
|
||||
handlers := make([]MessageHandler, len(s.handlers))
|
||||
copy(handlers, s.handlers)
|
||||
s.mu.RUnlock()
|
||||
|
||||
// Try to acquire semaphore, drop message if at capacity
|
||||
select {
|
||||
case s.semaphore <- struct{}{}:
|
||||
// Successfully acquired semaphore, process message
|
||||
go func(rawLine []byte, messageHandlers []MessageHandler) {
|
||||
defer func() { <-s.semaphore }() // Release semaphore when done
|
||||
|
||||
// Parse the outer wrapper first
|
||||
var wrapper ristypes.RISLiveMessage
|
||||
if err := json.Unmarshal(rawLine, &wrapper); err != nil {
|
||||
// Output the raw line and panic on parse failure
|
||||
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err)
|
||||
fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(rawLine))
|
||||
panic(fmt.Sprintf("JSON parse error: %v", err))
|
||||
}
|
||||
|
||||
// Check if it's a ris_message wrapper
|
||||
if wrapper.Type != "ris_message" {
|
||||
s.logger.Error("Unexpected wrapper type",
|
||||
"type", wrapper.Type,
|
||||
"line", string(rawLine),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Get the actual message
|
||||
msg := wrapper.Data
|
||||
|
||||
// Parse the timestamp
|
||||
msg.ParsedTimestamp = time.Unix(int64(msg.Timestamp), 0).UTC()
|
||||
|
||||
// Process based on message type
|
||||
switch msg.Type {
|
||||
case "UPDATE":
|
||||
// Process BGP UPDATE messages
|
||||
// Will be handled by registered handlers
|
||||
case "RIS_PEER_STATE":
|
||||
// RIS peer state messages - silently ignore
|
||||
case "KEEPALIVE":
|
||||
// BGP keepalive messages - silently process
|
||||
case "OPEN":
|
||||
// BGP open messages
|
||||
s.logger.Info("BGP session opened",
|
||||
"peer", msg.Peer,
|
||||
"peer_asn", msg.PeerASN,
|
||||
)
|
||||
case "NOTIFICATION":
|
||||
// BGP notification messages (errors)
|
||||
s.logger.Warn("BGP notification",
|
||||
"peer", msg.Peer,
|
||||
"peer_asn", msg.PeerASN,
|
||||
)
|
||||
case "STATE":
|
||||
// Peer state changes - silently ignore
|
||||
default:
|
||||
fmt.Fprintf(
|
||||
os.Stderr,
|
||||
"UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n",
|
||||
msg.Type,
|
||||
string(rawLine),
|
||||
)
|
||||
panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type))
|
||||
}
|
||||
|
||||
// Call handlers synchronously within this goroutine
|
||||
// This prevents unbounded goroutine growth at the handler level
|
||||
for _, handler := range messageHandlers {
|
||||
if handler.WantsMessage(msg.Type) {
|
||||
handler.HandleMessage(&msg)
|
||||
}
|
||||
}
|
||||
}(append([]byte(nil), line...), handlers) // Copy the line to avoid data races
|
||||
default:
|
||||
// Semaphore is full, drop the message
|
||||
dropped := atomic.AddUint64(&s.droppedMessages, 1)
|
||||
if dropped%1000 == 0 { // Log every 1000 dropped messages
|
||||
s.logger.Warn("Dropping messages due to overload", "total_dropped", dropped, "max_handlers", maxConcurrentHandlers)
|
||||
}
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user