Compare commits
4 Commits
d328fb0942
...
1d05372899
Author | SHA1 | Date | |
---|---|---|---|
1d05372899 | |||
76ec9f68b7 | |||
a555a1dee2 | |||
b49d3ce88c |
2
Makefile
2
Makefile
@ -15,7 +15,7 @@ lint:
|
|||||||
golangci-lint run
|
golangci-lint run
|
||||||
|
|
||||||
build:
|
build:
|
||||||
go build -o bin/routewatch cmd/routewatch/main.go
|
CGO_ENABLED=1 go build -o bin/routewatch cmd/routewatch/main.go
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
rm -rf bin/
|
rm -rf bin/
|
||||||
|
14
go.mod
14
go.mod
@ -3,24 +3,16 @@ module git.eeqj.de/sneak/routewatch
|
|||||||
go 1.24.4
|
go 1.24.4
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/go-chi/chi/v5 v5.2.2
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
|
github.com/mattn/go-sqlite3 v1.14.29
|
||||||
|
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9
|
||||||
go.uber.org/fx v1.24.0
|
go.uber.org/fx v1.24.0
|
||||||
modernc.org/sqlite v1.38.1
|
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
|
||||||
github.com/go-chi/chi/v5 v5.2.2 // indirect
|
|
||||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
|
||||||
github.com/ncruces/go-strftime v0.1.9 // indirect
|
|
||||||
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
|
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
|
||||||
go.uber.org/dig v1.19.0 // indirect
|
go.uber.org/dig v1.19.0 // indirect
|
||||||
go.uber.org/multierr v1.10.0 // indirect
|
go.uber.org/multierr v1.10.0 // indirect
|
||||||
go.uber.org/zap v1.26.0 // indirect
|
go.uber.org/zap v1.26.0 // indirect
|
||||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
|
|
||||||
golang.org/x/sys v0.34.0 // indirect
|
golang.org/x/sys v0.34.0 // indirect
|
||||||
modernc.org/libc v1.66.3 // indirect
|
|
||||||
modernc.org/mathutil v1.7.1 // indirect
|
|
||||||
modernc.org/memory v1.11.0 // indirect
|
|
||||||
)
|
)
|
||||||
|
47
go.sum
47
go.sum
@ -1,23 +1,15 @@
|
|||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
|
||||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
|
||||||
github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618=
|
github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618=
|
||||||
github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
|
github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
|
||||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
|
||||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
github.com/mattn/go-sqlite3 v1.14.29 h1:1O6nRLJKvsi1H2Sj0Hzdfojwt8GiGKm+LOfLaBFaouQ=
|
||||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
github.com/mattn/go-sqlite3 v1.14.29/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||||
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
|
|
||||||
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg=
|
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg=
|
||||||
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
|
||||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||||
go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4=
|
go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4=
|
||||||
@ -30,42 +22,7 @@ go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
|
|||||||
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||||
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
|
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
|
||||||
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
|
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
|
||||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
|
|
||||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
|
|
||||||
golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
|
|
||||||
golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
|
|
||||||
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
|
|
||||||
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
|
||||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
|
||||||
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
|
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
|
||||||
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||||
golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo=
|
|
||||||
golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg=
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
modernc.org/cc/v4 v4.26.2 h1:991HMkLjJzYBIfha6ECZdjrIYz2/1ayr+FL8GN+CNzM=
|
|
||||||
modernc.org/cc/v4 v4.26.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
|
|
||||||
modernc.org/ccgo/v4 v4.28.0 h1:rjznn6WWehKq7dG4JtLRKxb52Ecv8OUGah8+Z/SfpNU=
|
|
||||||
modernc.org/ccgo/v4 v4.28.0/go.mod h1:JygV3+9AV6SmPhDasu4JgquwU81XAKLd3OKTUDNOiKE=
|
|
||||||
modernc.org/fileutil v1.3.8 h1:qtzNm7ED75pd1C7WgAGcK4edm4fvhtBsEiI/0NQ54YM=
|
|
||||||
modernc.org/fileutil v1.3.8/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc=
|
|
||||||
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
|
|
||||||
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
|
|
||||||
modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks=
|
|
||||||
modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI=
|
|
||||||
modernc.org/libc v1.66.3 h1:cfCbjTUcdsKyyZZfEUKfoHcP3S0Wkvz3jgSzByEWVCQ=
|
|
||||||
modernc.org/libc v1.66.3/go.mod h1:XD9zO8kt59cANKvHPXpx7yS2ELPheAey0vjIuZOhOU8=
|
|
||||||
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
|
|
||||||
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
|
|
||||||
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
|
|
||||||
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
|
|
||||||
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
|
|
||||||
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
|
|
||||||
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
|
|
||||||
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
|
|
||||||
modernc.org/sqlite v1.38.1 h1:jNnIjleVta+DKSAr3TnkKK87EEhjPhBLzi6hvIX9Bas=
|
|
||||||
modernc.org/sqlite v1.38.1/go.mod h1:cPTJYSlgg3Sfg046yBShXENNtPrWrDX8bsbAQBzgQ5E=
|
|
||||||
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
|
|
||||||
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
|
|
||||||
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
|
|
||||||
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
|
|
||||||
|
@ -11,8 +11,9 @@ import (
|
|||||||
"runtime"
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.eeqj.de/sneak/routewatch/pkg/asinfo"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
_ "modernc.org/sqlite" // Pure Go SQLite driver
|
_ "github.com/mattn/go-sqlite3" // CGO SQLite driver
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:embed schema.sql
|
//go:embed schema.sql
|
||||||
@ -94,10 +95,10 @@ func NewWithConfig(config Config, logger *slog.Logger) (*Database, error) {
|
|||||||
return nil, fmt.Errorf("failed to create database directory: %w", err)
|
return nil, fmt.Errorf("failed to create database directory: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add connection parameters for modernc.org/sqlite
|
// Add connection parameters for go-sqlite3
|
||||||
// Enable WAL mode and other performance optimizations
|
// Enable WAL mode and other performance optimizations
|
||||||
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&_cache_size=-512000", config.Path)
|
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&cache=shared", config.Path)
|
||||||
db, err := sql.Open("sqlite", dsn)
|
db, err := sql.Open("sqlite3", dsn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to open database: %w", err)
|
return nil, fmt.Errorf("failed to open database: %w", err)
|
||||||
}
|
}
|
||||||
@ -126,7 +127,7 @@ func (d *Database) Initialize() error {
|
|||||||
pragmas := []string{
|
pragmas := []string{
|
||||||
"PRAGMA journal_mode=WAL", // Already set in connection string
|
"PRAGMA journal_mode=WAL", // Already set in connection string
|
||||||
"PRAGMA synchronous=NORMAL", // Faster than FULL, still safe
|
"PRAGMA synchronous=NORMAL", // Faster than FULL, still safe
|
||||||
"PRAGMA cache_size=-524288", // 512MB cache
|
"PRAGMA cache_size=-524288", // 512MB cache (negative = KB)
|
||||||
"PRAGMA temp_store=MEMORY", // Use memory for temp tables
|
"PRAGMA temp_store=MEMORY", // Use memory for temp tables
|
||||||
"PRAGMA mmap_size=268435456", // 256MB memory-mapped I/O
|
"PRAGMA mmap_size=268435456", // 256MB memory-mapped I/O
|
||||||
"PRAGMA wal_autocheckpoint=1000", // Checkpoint every 1000 pages
|
"PRAGMA wal_autocheckpoint=1000", // Checkpoint every 1000 pages
|
||||||
@ -174,12 +175,15 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
|
|||||||
|
|
||||||
var asn ASN
|
var asn ASN
|
||||||
var idStr string
|
var idStr string
|
||||||
err = tx.QueryRow("SELECT id, number, first_seen, last_seen FROM asns WHERE number = ?", number).
|
var handle, description sql.NullString
|
||||||
Scan(&idStr, &asn.Number, &asn.FirstSeen, &asn.LastSeen)
|
err = tx.QueryRow("SELECT id, number, handle, description, first_seen, last_seen FROM asns WHERE number = ?", number).
|
||||||
|
Scan(&idStr, &asn.Number, &handle, &description, &asn.FirstSeen, &asn.LastSeen)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// ASN exists, update last_seen
|
// ASN exists, update last_seen
|
||||||
asn.ID, _ = uuid.Parse(idStr)
|
asn.ID, _ = uuid.Parse(idStr)
|
||||||
|
asn.Handle = handle.String
|
||||||
|
asn.Description = description.String
|
||||||
_, err = tx.Exec("UPDATE asns SET last_seen = ? WHERE id = ?", timestamp, asn.ID.String())
|
_, err = tx.Exec("UPDATE asns SET last_seen = ? WHERE id = ?", timestamp, asn.ID.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -199,15 +203,22 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ASN doesn't exist, create it
|
// ASN doesn't exist, create it with ASN info lookup
|
||||||
asn = ASN{
|
asn = ASN{
|
||||||
ID: generateUUID(),
|
ID: generateUUID(),
|
||||||
Number: number,
|
Number: number,
|
||||||
FirstSeen: timestamp,
|
FirstSeen: timestamp,
|
||||||
LastSeen: timestamp,
|
LastSeen: timestamp,
|
||||||
}
|
}
|
||||||
_, err = tx.Exec("INSERT INTO asns (id, number, first_seen, last_seen) VALUES (?, ?, ?, ?)",
|
|
||||||
asn.ID.String(), asn.Number, asn.FirstSeen, asn.LastSeen)
|
// Look up ASN info
|
||||||
|
if info, ok := asinfo.Get(number); ok {
|
||||||
|
asn.Handle = info.Handle
|
||||||
|
asn.Description = info.Description
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec("INSERT INTO asns (id, number, handle, description, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?)",
|
||||||
|
asn.ID.String(), asn.Number, asn.Handle, asn.Description, asn.FirstSeen, asn.LastSeen)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -340,75 +351,6 @@ func (d *Database) RecordPeering(fromASNID, toASNID string, timestamp time.Time)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateLiveRoute updates the live routing table for an announcement
|
|
||||||
func (d *Database) UpdateLiveRoute(
|
|
||||||
prefixID, originASNID uuid.UUID,
|
|
||||||
peerASN int,
|
|
||||||
nextHop string,
|
|
||||||
timestamp time.Time,
|
|
||||||
) error {
|
|
||||||
// Use SQLite's UPSERT capability to avoid the SELECT+UPDATE/INSERT pattern
|
|
||||||
// This reduces the number of queries and improves performance
|
|
||||||
// Note: We removed the WHERE clause from ON CONFLICT UPDATE because
|
|
||||||
// if we're updating, we want to update regardless of withdrawn_at status
|
|
||||||
err := d.exec(`
|
|
||||||
INSERT INTO live_routes (id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at, withdrawn_at)
|
|
||||||
VALUES (?, ?, ?, ?, ?, ?, NULL)
|
|
||||||
ON CONFLICT(prefix_id, origin_asn_id, peer_asn) DO UPDATE SET
|
|
||||||
next_hop = excluded.next_hop,
|
|
||||||
announced_at = excluded.announced_at,
|
|
||||||
withdrawn_at = NULL`,
|
|
||||||
generateUUID().String(), prefixID.String(), originASNID.String(),
|
|
||||||
peerASN, nextHop, timestamp)
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithdrawLiveRoute marks a route as withdrawn in the live routing table
|
|
||||||
func (d *Database) WithdrawLiveRoute(prefixID uuid.UUID, peerASN int, timestamp time.Time) error {
|
|
||||||
err := d.exec(`
|
|
||||||
UPDATE live_routes
|
|
||||||
SET withdrawn_at = ?
|
|
||||||
WHERE prefix_id = ? AND peer_asn = ? AND withdrawn_at IS NULL`,
|
|
||||||
timestamp, prefixID.String(), peerASN)
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetActiveLiveRoutes returns all currently active routes (not withdrawn)
|
|
||||||
func (d *Database) GetActiveLiveRoutes() ([]LiveRoute, error) {
|
|
||||||
rows, err := d.query(`
|
|
||||||
SELECT id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at
|
|
||||||
FROM live_routes
|
|
||||||
WHERE withdrawn_at IS NULL
|
|
||||||
ORDER BY announced_at DESC`)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
_ = rows.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
var routes []LiveRoute
|
|
||||||
for rows.Next() {
|
|
||||||
var route LiveRoute
|
|
||||||
var idStr, prefixIDStr, originASNIDStr string
|
|
||||||
err := rows.Scan(&idStr, &prefixIDStr, &originASNIDStr,
|
|
||||||
&route.PeerASN, &route.NextHop, &route.AnnouncedAt)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
route.ID, _ = uuid.Parse(idStr)
|
|
||||||
route.PrefixID, _ = uuid.Parse(prefixIDStr)
|
|
||||||
route.OriginASNID, _ = uuid.Parse(originASNIDStr)
|
|
||||||
|
|
||||||
routes = append(routes, route)
|
|
||||||
}
|
|
||||||
|
|
||||||
return routes, rows.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdatePeer updates or creates a BGP peer record
|
// UpdatePeer updates or creates a BGP peer record
|
||||||
func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
|
func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
|
||||||
tx, err := d.beginTx()
|
tx, err := d.beginTx()
|
||||||
@ -495,13 +437,6 @@ func (d *Database) GetStats() (Stats, error) {
|
|||||||
return stats, err
|
return stats, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Count live routes
|
|
||||||
d.logger.Info("Counting live routes")
|
|
||||||
err = d.queryRow("SELECT COUNT(*) FROM live_routes WHERE withdrawn_at IS NULL").Scan(&stats.LiveRoutes)
|
|
||||||
if err != nil {
|
|
||||||
return stats, err
|
|
||||||
}
|
|
||||||
|
|
||||||
d.logger.Info("Stats collection complete")
|
d.logger.Info("Stats collection complete")
|
||||||
|
|
||||||
return stats, nil
|
return stats, nil
|
||||||
|
@ -2,8 +2,6 @@ package database
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Stats contains database statistics
|
// Stats contains database statistics
|
||||||
@ -13,7 +11,6 @@ type Stats struct {
|
|||||||
IPv4Prefixes int
|
IPv4Prefixes int
|
||||||
IPv6Prefixes int
|
IPv6Prefixes int
|
||||||
Peerings int
|
Peerings int
|
||||||
LiveRoutes int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store defines the interface for database operations
|
// Store defines the interface for database operations
|
||||||
@ -30,11 +27,6 @@ type Store interface {
|
|||||||
// Peering operations
|
// Peering operations
|
||||||
RecordPeering(fromASNID, toASNID string, timestamp time.Time) error
|
RecordPeering(fromASNID, toASNID string, timestamp time.Time) error
|
||||||
|
|
||||||
// Live route operations
|
|
||||||
UpdateLiveRoute(prefixID, originASNID uuid.UUID, peerASN int, nextHop string, timestamp time.Time) error
|
|
||||||
WithdrawLiveRoute(prefixID uuid.UUID, peerASN int, timestamp time.Time) error
|
|
||||||
GetActiveLiveRoutes() ([]LiveRoute, error)
|
|
||||||
|
|
||||||
// Statistics
|
// Statistics
|
||||||
GetStats() (Stats, error)
|
GetStats() (Stats, error)
|
||||||
|
|
||||||
|
@ -10,6 +10,8 @@ import (
|
|||||||
type ASN struct {
|
type ASN struct {
|
||||||
ID uuid.UUID `json:"id"`
|
ID uuid.UUID `json:"id"`
|
||||||
Number int `json:"number"`
|
Number int `json:"number"`
|
||||||
|
Handle string `json:"handle"`
|
||||||
|
Description string `json:"description"`
|
||||||
FirstSeen time.Time `json:"first_seen"`
|
FirstSeen time.Time `json:"first_seen"`
|
||||||
LastSeen time.Time `json:"last_seen"`
|
LastSeen time.Time `json:"last_seen"`
|
||||||
}
|
}
|
||||||
@ -43,15 +45,3 @@ type ASNPeering struct {
|
|||||||
FirstSeen time.Time `json:"first_seen"`
|
FirstSeen time.Time `json:"first_seen"`
|
||||||
LastSeen time.Time `json:"last_seen"`
|
LastSeen time.Time `json:"last_seen"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// LiveRoute represents the current state of a route in the live routing table
|
|
||||||
type LiveRoute struct {
|
|
||||||
ID uuid.UUID `json:"id"`
|
|
||||||
PrefixID uuid.UUID `json:"prefix_id"`
|
|
||||||
OriginASNID uuid.UUID `json:"origin_asn_id"`
|
|
||||||
PeerASN int `json:"peer_asn"`
|
|
||||||
Path string `json:"path"`
|
|
||||||
NextHop string `json:"next_hop"`
|
|
||||||
AnnouncedAt time.Time `json:"announced_at"`
|
|
||||||
WithdrawnAt *time.Time `json:"withdrawn_at"`
|
|
||||||
}
|
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
CREATE TABLE IF NOT EXISTS asns (
|
CREATE TABLE IF NOT EXISTS asns (
|
||||||
id TEXT PRIMARY KEY,
|
id TEXT PRIMARY KEY,
|
||||||
number INTEGER UNIQUE NOT NULL,
|
number INTEGER UNIQUE NOT NULL,
|
||||||
|
handle TEXT,
|
||||||
|
description TEXT,
|
||||||
first_seen DATETIME NOT NULL,
|
first_seen DATETIME NOT NULL,
|
||||||
last_seen DATETIME NOT NULL
|
last_seen DATETIME NOT NULL
|
||||||
);
|
);
|
||||||
@ -48,20 +50,6 @@ CREATE TABLE IF NOT EXISTS bgp_peers (
|
|||||||
last_message_type TEXT
|
last_message_type TEXT
|
||||||
);
|
);
|
||||||
|
|
||||||
-- Live routing table: current state of announced routes
|
|
||||||
CREATE TABLE IF NOT EXISTS live_routes (
|
|
||||||
id TEXT PRIMARY KEY,
|
|
||||||
prefix_id TEXT NOT NULL,
|
|
||||||
origin_asn_id TEXT NOT NULL,
|
|
||||||
peer_asn INTEGER NOT NULL,
|
|
||||||
next_hop TEXT,
|
|
||||||
announced_at DATETIME NOT NULL,
|
|
||||||
withdrawn_at DATETIME,
|
|
||||||
FOREIGN KEY (prefix_id) REFERENCES prefixes(id),
|
|
||||||
FOREIGN KEY (origin_asn_id) REFERENCES asns(id),
|
|
||||||
UNIQUE(prefix_id, origin_asn_id, peer_asn)
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_prefixes_ip_version ON prefixes(ip_version);
|
CREATE INDEX IF NOT EXISTS idx_prefixes_ip_version ON prefixes(ip_version);
|
||||||
CREATE INDEX IF NOT EXISTS idx_prefixes_version_prefix ON prefixes(ip_version, prefix);
|
CREATE INDEX IF NOT EXISTS idx_prefixes_version_prefix ON prefixes(ip_version, prefix);
|
||||||
CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp);
|
CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp);
|
||||||
@ -71,43 +59,6 @@ CREATE INDEX IF NOT EXISTS idx_asn_peerings_from_asn ON asn_peerings(from_asn_id
|
|||||||
CREATE INDEX IF NOT EXISTS idx_asn_peerings_to_asn ON asn_peerings(to_asn_id);
|
CREATE INDEX IF NOT EXISTS idx_asn_peerings_to_asn ON asn_peerings(to_asn_id);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asn_peerings_lookup ON asn_peerings(from_asn_id, to_asn_id);
|
CREATE INDEX IF NOT EXISTS idx_asn_peerings_lookup ON asn_peerings(from_asn_id, to_asn_id);
|
||||||
|
|
||||||
-- Indexes for live routes table
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_live_routes_active
|
|
||||||
ON live_routes(prefix_id, origin_asn_id)
|
|
||||||
WHERE withdrawn_at IS NULL;
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_live_routes_origin
|
|
||||||
ON live_routes(origin_asn_id)
|
|
||||||
WHERE withdrawn_at IS NULL;
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_live_routes_prefix
|
|
||||||
ON live_routes(prefix_id)
|
|
||||||
WHERE withdrawn_at IS NULL;
|
|
||||||
|
|
||||||
-- Critical index for the most common query pattern
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_live_routes_lookup
|
|
||||||
ON live_routes(prefix_id, origin_asn_id, peer_asn)
|
|
||||||
WHERE withdrawn_at IS NULL;
|
|
||||||
|
|
||||||
-- Index for withdrawal updates by prefix and peer
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_live_routes_withdraw
|
|
||||||
ON live_routes(prefix_id, peer_asn)
|
|
||||||
WHERE withdrawn_at IS NULL;
|
|
||||||
|
|
||||||
-- Covering index for SELECT id queries (includes id in index)
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_live_routes_covering
|
|
||||||
ON live_routes(prefix_id, origin_asn_id, peer_asn, id)
|
|
||||||
WHERE withdrawn_at IS NULL;
|
|
||||||
|
|
||||||
-- Index for UPDATE by id operations
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_live_routes_id
|
|
||||||
ON live_routes(id);
|
|
||||||
|
|
||||||
-- Index for stats queries
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_live_routes_stats
|
|
||||||
ON live_routes(withdrawn_at)
|
|
||||||
WHERE withdrawn_at IS NULL;
|
|
||||||
|
|
||||||
-- Additional indexes for prefixes table
|
-- Additional indexes for prefixes table
|
||||||
CREATE INDEX IF NOT EXISTS idx_prefixes_prefix ON prefixes(prefix);
|
CREATE INDEX IF NOT EXISTS idx_prefixes_prefix ON prefixes(prefix);
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ func (d *Database) queryRow(query string, args ...interface{}) *sql.Row {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// query wraps Query with slow query logging
|
// query wraps Query with slow query logging
|
||||||
|
// nolint:unused // kept for future use to ensure all queries go through slow query logging
|
||||||
func (d *Database) query(query string, args ...interface{}) (*sql.Rows, error) {
|
func (d *Database) query(query string, args ...interface{}) (*sql.Rows, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer logSlowQuery(d.logger, query, start)
|
defer logSlowQuery(d.logger, query, start)
|
||||||
|
@ -4,6 +4,7 @@ package routewatch
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
@ -11,6 +12,7 @@ import (
|
|||||||
|
|
||||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||||
"git.eeqj.de/sneak/routewatch/internal/metrics"
|
"git.eeqj.de/sneak/routewatch/internal/metrics"
|
||||||
|
"git.eeqj.de/sneak/routewatch/internal/routingtable"
|
||||||
"git.eeqj.de/sneak/routewatch/internal/server"
|
"git.eeqj.de/sneak/routewatch/internal/server"
|
||||||
"git.eeqj.de/sneak/routewatch/internal/streamer"
|
"git.eeqj.de/sneak/routewatch/internal/streamer"
|
||||||
|
|
||||||
@ -22,6 +24,11 @@ type Config struct {
|
|||||||
MaxRuntime time.Duration // Maximum runtime (0 = run forever)
|
MaxRuntime time.Duration // Maximum runtime (0 = run forever)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// routingTableStatsInterval is how often we log routing table statistics
|
||||||
|
routingTableStatsInterval = 15 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
// NewConfig provides default configuration
|
// NewConfig provides default configuration
|
||||||
func NewConfig() Config {
|
func NewConfig() Config {
|
||||||
return Config{
|
return Config{
|
||||||
@ -34,6 +41,7 @@ type Dependencies struct {
|
|||||||
fx.In
|
fx.In
|
||||||
|
|
||||||
DB database.Store
|
DB database.Store
|
||||||
|
RoutingTable *routingtable.RoutingTable
|
||||||
Streamer *streamer.Streamer
|
Streamer *streamer.Streamer
|
||||||
Server *server.Server
|
Server *server.Server
|
||||||
Logger *slog.Logger
|
Logger *slog.Logger
|
||||||
@ -43,6 +51,7 @@ type Dependencies struct {
|
|||||||
// RouteWatch represents the main application instance
|
// RouteWatch represents the main application instance
|
||||||
type RouteWatch struct {
|
type RouteWatch struct {
|
||||||
db database.Store
|
db database.Store
|
||||||
|
routingTable *routingtable.RoutingTable
|
||||||
streamer *streamer.Streamer
|
streamer *streamer.Streamer
|
||||||
server *server.Server
|
server *server.Server
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
@ -53,6 +62,7 @@ type RouteWatch struct {
|
|||||||
func New(deps Dependencies) *RouteWatch {
|
func New(deps Dependencies) *RouteWatch {
|
||||||
return &RouteWatch{
|
return &RouteWatch{
|
||||||
db: deps.DB,
|
db: deps.DB,
|
||||||
|
routingTable: deps.RoutingTable,
|
||||||
streamer: deps.Streamer,
|
streamer: deps.Streamer,
|
||||||
server: deps.Server,
|
server: deps.Server,
|
||||||
logger: deps.Logger,
|
logger: deps.Logger,
|
||||||
@ -76,10 +86,17 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
|
|||||||
dbHandler := NewDatabaseHandler(rw.db, rw.logger)
|
dbHandler := NewDatabaseHandler(rw.db, rw.logger)
|
||||||
rw.streamer.RegisterHandler(dbHandler)
|
rw.streamer.RegisterHandler(dbHandler)
|
||||||
|
|
||||||
|
// Register routing table handler to maintain in-memory routing table
|
||||||
|
rtHandler := NewRoutingTableHandler(rw.routingTable, rw.logger)
|
||||||
|
rw.streamer.RegisterHandler(rtHandler)
|
||||||
|
|
||||||
// Register peer tracking handler to track all peers
|
// Register peer tracking handler to track all peers
|
||||||
peerHandler := NewPeerHandler(rw.db, rw.logger)
|
peerHandler := NewPeerHandler(rw.db, rw.logger)
|
||||||
rw.streamer.RegisterHandler(peerHandler)
|
rw.streamer.RegisterHandler(peerHandler)
|
||||||
|
|
||||||
|
// Start periodic routing table stats logging
|
||||||
|
go rw.logRoutingTableStats(ctx)
|
||||||
|
|
||||||
// Start streaming
|
// Start streaming
|
||||||
if err := rw.streamer.Start(); err != nil {
|
if err := rw.streamer.Start(); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -117,6 +134,32 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// logRoutingTableStats periodically logs routing table statistics
|
||||||
|
func (rw *RouteWatch) logRoutingTableStats(ctx context.Context) {
|
||||||
|
// Log stats periodically
|
||||||
|
ticker := time.NewTicker(routingTableStatsInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
stats := rw.routingTable.GetDetailedStats()
|
||||||
|
rw.logger.Info("Routing table statistics",
|
||||||
|
"ipv4_routes", stats.IPv4Routes,
|
||||||
|
"ipv6_routes", stats.IPv6Routes,
|
||||||
|
"ipv4_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv4UpdatesRate),
|
||||||
|
"ipv6_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv6UpdatesRate),
|
||||||
|
"total_routes", stats.TotalRoutes,
|
||||||
|
"unique_prefixes", stats.UniquePrefixes,
|
||||||
|
"unique_origins", stats.UniqueOrigins,
|
||||||
|
"unique_peers", stats.UniquePeers,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewLogger creates a structured logger
|
// NewLogger creates a structured logger
|
||||||
func NewLogger() *slog.Logger {
|
func NewLogger() *slog.Logger {
|
||||||
level := slog.LevelInfo
|
level := slog.LevelInfo
|
||||||
@ -154,8 +197,12 @@ func getModule() fx.Option {
|
|||||||
},
|
},
|
||||||
fx.As(new(database.Store)),
|
fx.As(new(database.Store)),
|
||||||
),
|
),
|
||||||
|
routingtable.New,
|
||||||
streamer.New,
|
streamer.New,
|
||||||
|
fx.Annotate(
|
||||||
server.New,
|
server.New,
|
||||||
|
fx.ParamTags(``, ``, ``, ``),
|
||||||
|
),
|
||||||
New,
|
New,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||||
"git.eeqj.de/sneak/routewatch/internal/metrics"
|
"git.eeqj.de/sneak/routewatch/internal/metrics"
|
||||||
|
"git.eeqj.de/sneak/routewatch/internal/routingtable"
|
||||||
"git.eeqj.de/sneak/routewatch/internal/server"
|
"git.eeqj.de/sneak/routewatch/internal/server"
|
||||||
"git.eeqj.de/sneak/routewatch/internal/streamer"
|
"git.eeqj.de/sneak/routewatch/internal/streamer"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
@ -129,35 +130,6 @@ func (m *mockStore) RecordPeering(fromASNID, toASNID string, _ time.Time) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateLiveRoute mock implementation
|
|
||||||
func (m *mockStore) UpdateLiveRoute(prefixID, originASNID uuid.UUID, peerASN int, _ string, _ time.Time) error {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
|
|
||||||
key := prefixID.String() + "_" + originASNID.String() + "_" + string(rune(peerASN))
|
|
||||||
if !m.Routes[key] {
|
|
||||||
m.Routes[key] = true
|
|
||||||
m.RouteCount++
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithdrawLiveRoute mock implementation
|
|
||||||
func (m *mockStore) WithdrawLiveRoute(_ uuid.UUID, _ int, _ time.Time) error {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
|
|
||||||
m.WithdrawalCount++
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetActiveLiveRoutes mock implementation
|
|
||||||
func (m *mockStore) GetActiveLiveRoutes() ([]database.LiveRoute, error) {
|
|
||||||
return []database.LiveRoute{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdatePeer mock implementation
|
// UpdatePeer mock implementation
|
||||||
func (m *mockStore) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
|
func (m *mockStore) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
|
||||||
// Simple mock - just return nil
|
// Simple mock - just return nil
|
||||||
@ -180,7 +152,6 @@ func (m *mockStore) GetStats() (database.Stats, error) {
|
|||||||
IPv4Prefixes: m.IPv4Prefixes,
|
IPv4Prefixes: m.IPv4Prefixes,
|
||||||
IPv6Prefixes: m.IPv6Prefixes,
|
IPv6Prefixes: m.IPv6Prefixes,
|
||||||
Peerings: m.PeeringCount,
|
Peerings: m.PeeringCount,
|
||||||
LiveRoutes: m.RouteCount,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -197,12 +168,16 @@ func TestRouteWatchLiveFeed(t *testing.T) {
|
|||||||
// Create streamer
|
// Create streamer
|
||||||
s := streamer.New(logger, metricsTracker)
|
s := streamer.New(logger, metricsTracker)
|
||||||
|
|
||||||
|
// Create routing table
|
||||||
|
rt := routingtable.New()
|
||||||
|
|
||||||
// Create server
|
// Create server
|
||||||
srv := server.New(mockDB, s, logger)
|
srv := server.New(mockDB, rt, s, logger)
|
||||||
|
|
||||||
// Create RouteWatch with 5 second limit
|
// Create RouteWatch with 5 second limit
|
||||||
deps := Dependencies{
|
deps := Dependencies{
|
||||||
DB: mockDB,
|
DB: mockDB,
|
||||||
|
RoutingTable: rt,
|
||||||
Streamer: s,
|
Streamer: s,
|
||||||
Server: srv,
|
Server: srv,
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
@ -242,8 +217,4 @@ func TestRouteWatchLiveFeed(t *testing.T) {
|
|||||||
}
|
}
|
||||||
t.Logf("Recorded %d AS peering relationships in 5 seconds", stats.Peerings)
|
t.Logf("Recorded %d AS peering relationships in 5 seconds", stats.Peerings)
|
||||||
|
|
||||||
if stats.LiveRoutes == 0 {
|
|
||||||
t.Error("Expected to have some active routes")
|
|
||||||
}
|
|
||||||
t.Logf("Active routes: %d", stats.LiveRoutes)
|
|
||||||
}
|
}
|
||||||
|
@ -2,12 +2,16 @@ package routewatch
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||||
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// databaseHandlerQueueSize is the queue capacity for database operations
|
||||||
|
databaseHandlerQueueSize = 100
|
||||||
|
)
|
||||||
|
|
||||||
// DatabaseHandler handles BGP messages and stores them in the database
|
// DatabaseHandler handles BGP messages and stores them in the database
|
||||||
type DatabaseHandler struct {
|
type DatabaseHandler struct {
|
||||||
db database.Store
|
db database.Store
|
||||||
@ -28,19 +32,17 @@ func (h *DatabaseHandler) WantsMessage(messageType string) bool {
|
|||||||
return messageType == "UPDATE"
|
return messageType == "UPDATE"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueueCapacity returns the desired queue capacity for this handler
|
||||||
|
func (h *DatabaseHandler) QueueCapacity() int {
|
||||||
|
// Database operations are slow, so use a smaller queue
|
||||||
|
return databaseHandlerQueueSize
|
||||||
|
}
|
||||||
|
|
||||||
// HandleMessage processes a RIS message and updates the database
|
// HandleMessage processes a RIS message and updates the database
|
||||||
func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||||
// Use the pre-parsed timestamp
|
// Use the pre-parsed timestamp
|
||||||
timestamp := msg.ParsedTimestamp
|
timestamp := msg.ParsedTimestamp
|
||||||
|
|
||||||
// Parse peer ASN
|
|
||||||
peerASN, err := strconv.Atoi(msg.PeerASN)
|
|
||||||
if err != nil {
|
|
||||||
h.logger.Error("Failed to parse peer ASN", "peer_asn", msg.PeerASN, "error", err)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get origin ASN from path (last element)
|
// Get origin ASN from path (last element)
|
||||||
var originASN int
|
var originASN int
|
||||||
if len(msg.Path) > 0 {
|
if len(msg.Path) > 0 {
|
||||||
@ -51,7 +53,7 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
|||||||
for _, announcement := range msg.Announcements {
|
for _, announcement := range msg.Announcements {
|
||||||
for _, prefix := range announcement.Prefixes {
|
for _, prefix := range announcement.Prefixes {
|
||||||
// Get or create prefix
|
// Get or create prefix
|
||||||
p, err := h.db.GetOrCreatePrefix(prefix, timestamp)
|
_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
|
h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
|
||||||
|
|
||||||
@ -59,30 +61,13 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get or create origin ASN
|
// Get or create origin ASN
|
||||||
asn, err := h.db.GetOrCreateASN(originASN, timestamp)
|
_, err = h.db.GetOrCreateASN(originASN, timestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logger.Error("Failed to get/create ASN", "asn", originASN, "error", err)
|
h.logger.Error("Failed to get/create ASN", "asn", originASN, "error", err)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update live route
|
|
||||||
err = h.db.UpdateLiveRoute(
|
|
||||||
p.ID,
|
|
||||||
asn.ID,
|
|
||||||
peerASN,
|
|
||||||
announcement.NextHop,
|
|
||||||
timestamp,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
h.logger.Error("Failed to update live route",
|
|
||||||
"prefix", prefix,
|
|
||||||
"origin_asn", originASN,
|
|
||||||
"peer_asn", peerASN,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Record the announcement in the announcements table
|
// TODO: Record the announcement in the announcements table
|
||||||
// Process AS path to update peerings
|
// Process AS path to update peerings
|
||||||
if len(msg.Path) > 1 {
|
if len(msg.Path) > 1 {
|
||||||
@ -122,23 +107,13 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
|||||||
// Process withdrawals
|
// Process withdrawals
|
||||||
for _, prefix := range msg.Withdrawals {
|
for _, prefix := range msg.Withdrawals {
|
||||||
// Get prefix
|
// Get prefix
|
||||||
p, err := h.db.GetOrCreatePrefix(prefix, timestamp)
|
_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logger.Error("Failed to get prefix for withdrawal", "prefix", prefix, "error", err)
|
h.logger.Error("Failed to get prefix for withdrawal", "prefix", prefix, "error", err)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Withdraw the route
|
// TODO: Record the withdrawal in the announcements table as a withdrawal
|
||||||
err = h.db.WithdrawLiveRoute(p.ID, peerASN, timestamp)
|
|
||||||
if err != nil {
|
|
||||||
h.logger.Error("Failed to withdraw route",
|
|
||||||
"prefix", prefix,
|
|
||||||
"peer_asn", peerASN,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Record the withdrawal in the withdrawals table
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,11 @@ import (
|
|||||||
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// peerHandlerQueueSize is the queue capacity for peer tracking operations
|
||||||
|
peerHandlerQueueSize = 500
|
||||||
|
)
|
||||||
|
|
||||||
// PeerHandler tracks BGP peers from all message types
|
// PeerHandler tracks BGP peers from all message types
|
||||||
type PeerHandler struct {
|
type PeerHandler struct {
|
||||||
db database.Store
|
db database.Store
|
||||||
@ -27,6 +32,12 @@ func (h *PeerHandler) WantsMessage(_ string) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueueCapacity returns the desired queue capacity for this handler
|
||||||
|
func (h *PeerHandler) QueueCapacity() int {
|
||||||
|
// Peer tracking is lightweight but involves database ops, use moderate queue
|
||||||
|
return peerHandlerQueueSize
|
||||||
|
}
|
||||||
|
|
||||||
// HandleMessage processes a message to track peer information
|
// HandleMessage processes a message to track peer information
|
||||||
func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) {
|
func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||||
// Parse peer ASN from string
|
// Parse peer ASN from string
|
||||||
|
131
internal/routewatch/routingtablehandler.go
Normal file
131
internal/routewatch/routingtablehandler.go
Normal file
@ -0,0 +1,131 @@
|
|||||||
|
package routewatch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log/slog"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
||||||
|
"git.eeqj.de/sneak/routewatch/internal/routingtable"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// routingTableHandlerQueueSize is the queue capacity for in-memory routing table operations
|
||||||
|
routingTableHandlerQueueSize = 10000
|
||||||
|
)
|
||||||
|
|
||||||
|
// RoutingTableHandler handles BGP messages and updates the in-memory routing table
|
||||||
|
type RoutingTableHandler struct {
|
||||||
|
rt *routingtable.RoutingTable
|
||||||
|
logger *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRoutingTableHandler creates a new routing table handler
|
||||||
|
func NewRoutingTableHandler(rt *routingtable.RoutingTable, logger *slog.Logger) *RoutingTableHandler {
|
||||||
|
return &RoutingTableHandler{
|
||||||
|
rt: rt,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WantsMessage returns true if this handler wants to process messages of the given type
|
||||||
|
func (h *RoutingTableHandler) WantsMessage(messageType string) bool {
|
||||||
|
// We only care about UPDATE messages for the routing table
|
||||||
|
return messageType == "UPDATE"
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueCapacity returns the desired queue capacity for this handler
|
||||||
|
func (h *RoutingTableHandler) QueueCapacity() int {
|
||||||
|
// In-memory operations are very fast, so use a large queue
|
||||||
|
return routingTableHandlerQueueSize
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleMessage processes a RIS message and updates the routing table
|
||||||
|
func (h *RoutingTableHandler) HandleMessage(msg *ristypes.RISMessage) {
|
||||||
|
// Use the pre-parsed timestamp
|
||||||
|
timestamp := msg.ParsedTimestamp
|
||||||
|
|
||||||
|
// Parse peer ASN
|
||||||
|
peerASN, err := strconv.Atoi(msg.PeerASN)
|
||||||
|
if err != nil {
|
||||||
|
h.logger.Error("Failed to parse peer ASN", "peer_asn", msg.PeerASN, "error", err)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get origin ASN from path (last element)
|
||||||
|
var originASN int
|
||||||
|
if len(msg.Path) > 0 {
|
||||||
|
originASN = msg.Path[len(msg.Path)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process announcements
|
||||||
|
for _, announcement := range msg.Announcements {
|
||||||
|
for _, prefix := range announcement.Prefixes {
|
||||||
|
// Generate deterministic UUIDs based on the prefix and origin ASN
|
||||||
|
// This ensures consistency across restarts
|
||||||
|
prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix))
|
||||||
|
originASNID := uuid.NewSHA1(uuid.NameSpaceOID, []byte(strconv.Itoa(originASN)))
|
||||||
|
|
||||||
|
// Create route for the routing table
|
||||||
|
route := &routingtable.Route{
|
||||||
|
PrefixID: prefixID,
|
||||||
|
Prefix: prefix,
|
||||||
|
OriginASNID: originASNID,
|
||||||
|
OriginASN: originASN,
|
||||||
|
PeerASN: peerASN,
|
||||||
|
ASPath: msg.Path,
|
||||||
|
NextHop: announcement.NextHop,
|
||||||
|
AnnouncedAt: timestamp,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add route to routing table
|
||||||
|
h.rt.AddRoute(route)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process withdrawals
|
||||||
|
for _, prefix := range msg.Withdrawals {
|
||||||
|
// Generate deterministic UUID for the prefix
|
||||||
|
prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix))
|
||||||
|
|
||||||
|
// Withdraw all routes for this prefix from this peer
|
||||||
|
h.rt.WithdrawRoutesByPrefixAndPeer(prefixID, peerASN)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRoutingTableStats returns statistics about the routing table
|
||||||
|
func (h *RoutingTableHandler) GetRoutingTableStats() map[string]int {
|
||||||
|
return h.rt.Stats()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetActiveRouteCount returns the number of active routes
|
||||||
|
func (h *RoutingTableHandler) GetActiveRouteCount() int {
|
||||||
|
return h.rt.Size()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRoutesByPrefix returns all routes for a specific prefix
|
||||||
|
func (h *RoutingTableHandler) GetRoutesByPrefix(prefixID uuid.UUID) []*routingtable.Route {
|
||||||
|
return h.rt.GetRoutesByPrefix(prefixID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRoutesByOriginASN returns all routes originated by a specific ASN
|
||||||
|
func (h *RoutingTableHandler) GetRoutesByOriginASN(originASNID uuid.UUID) []*routingtable.Route {
|
||||||
|
return h.rt.GetRoutesByOriginASN(originASNID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRoutesByPeerASN returns all routes received from a specific peer ASN
|
||||||
|
func (h *RoutingTableHandler) GetRoutesByPeerASN(peerASN int) []*routingtable.Route {
|
||||||
|
return h.rt.GetRoutesByPeerASN(peerASN)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAllRoutes returns all active routes
|
||||||
|
func (h *RoutingTableHandler) GetAllRoutes() []*routingtable.Route {
|
||||||
|
return h.rt.GetAllRoutes()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClearRoutingTable clears all routes from the routing table
|
||||||
|
func (h *RoutingTableHandler) ClearRoutingTable() {
|
||||||
|
h.rt.Clear()
|
||||||
|
h.logger.Info("Cleared routing table")
|
||||||
|
}
|
397
internal/routingtable/routingtable.go
Normal file
397
internal/routingtable/routingtable.go
Normal file
@ -0,0 +1,397 @@
|
|||||||
|
// Package routingtable provides a thread-safe in-memory representation of the DFZ routing table.
|
||||||
|
package routingtable
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Route represents a single route entry in the routing table
|
||||||
|
type Route struct {
|
||||||
|
PrefixID uuid.UUID `json:"prefix_id"`
|
||||||
|
Prefix string `json:"prefix"` // The actual prefix string (e.g., "10.0.0.0/8")
|
||||||
|
OriginASNID uuid.UUID `json:"origin_asn_id"`
|
||||||
|
OriginASN int `json:"origin_asn"` // The actual ASN number
|
||||||
|
PeerASN int `json:"peer_asn"`
|
||||||
|
ASPath []int `json:"as_path"` // Full AS path
|
||||||
|
NextHop string `json:"next_hop"`
|
||||||
|
AnnouncedAt time.Time `json:"announced_at"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// RouteKey uniquely identifies a route in the table
|
||||||
|
type RouteKey struct {
|
||||||
|
PrefixID uuid.UUID
|
||||||
|
OriginASNID uuid.UUID
|
||||||
|
PeerASN int
|
||||||
|
}
|
||||||
|
|
||||||
|
// RoutingTable is a thread-safe in-memory routing table
|
||||||
|
type RoutingTable struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
routes map[RouteKey]*Route
|
||||||
|
|
||||||
|
// Secondary indexes for efficient lookups
|
||||||
|
byPrefix map[uuid.UUID]map[RouteKey]*Route // Routes indexed by prefix ID
|
||||||
|
byOriginASN map[uuid.UUID]map[RouteKey]*Route // Routes indexed by origin ASN ID
|
||||||
|
byPeerASN map[int]map[RouteKey]*Route // Routes indexed by peer ASN
|
||||||
|
|
||||||
|
// Metrics tracking
|
||||||
|
ipv4Routes int
|
||||||
|
ipv6Routes int
|
||||||
|
ipv4Updates uint64 // Updates counter for rate calculation
|
||||||
|
ipv6Updates uint64 // Updates counter for rate calculation
|
||||||
|
lastMetricsReset time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new empty routing table
|
||||||
|
func New() *RoutingTable {
|
||||||
|
return &RoutingTable{
|
||||||
|
routes: make(map[RouteKey]*Route),
|
||||||
|
byPrefix: make(map[uuid.UUID]map[RouteKey]*Route),
|
||||||
|
byOriginASN: make(map[uuid.UUID]map[RouteKey]*Route),
|
||||||
|
byPeerASN: make(map[int]map[RouteKey]*Route),
|
||||||
|
lastMetricsReset: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddRoute adds or updates a route in the routing table
|
||||||
|
func (rt *RoutingTable) AddRoute(route *Route) {
|
||||||
|
rt.mu.Lock()
|
||||||
|
defer rt.mu.Unlock()
|
||||||
|
|
||||||
|
key := RouteKey{
|
||||||
|
PrefixID: route.PrefixID,
|
||||||
|
OriginASNID: route.OriginASNID,
|
||||||
|
PeerASN: route.PeerASN,
|
||||||
|
}
|
||||||
|
|
||||||
|
// If route already exists, remove it from indexes first
|
||||||
|
if existingRoute, exists := rt.routes[key]; exists {
|
||||||
|
rt.removeFromIndexes(key, existingRoute)
|
||||||
|
// Decrement counter for existing route
|
||||||
|
if isIPv6(existingRoute.Prefix) {
|
||||||
|
rt.ipv6Routes--
|
||||||
|
} else {
|
||||||
|
rt.ipv4Routes--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add to main map
|
||||||
|
rt.routes[key] = route
|
||||||
|
|
||||||
|
// Update indexes
|
||||||
|
rt.addToIndexes(key, route)
|
||||||
|
|
||||||
|
// Update metrics
|
||||||
|
if isIPv6(route.Prefix) {
|
||||||
|
rt.ipv6Routes++
|
||||||
|
atomic.AddUint64(&rt.ipv6Updates, 1)
|
||||||
|
} else {
|
||||||
|
rt.ipv4Routes++
|
||||||
|
atomic.AddUint64(&rt.ipv4Updates, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveRoute removes a route from the routing table
|
||||||
|
func (rt *RoutingTable) RemoveRoute(prefixID, originASNID uuid.UUID, peerASN int) bool {
|
||||||
|
rt.mu.Lock()
|
||||||
|
defer rt.mu.Unlock()
|
||||||
|
|
||||||
|
key := RouteKey{
|
||||||
|
PrefixID: prefixID,
|
||||||
|
OriginASNID: originASNID,
|
||||||
|
PeerASN: peerASN,
|
||||||
|
}
|
||||||
|
|
||||||
|
route, exists := rt.routes[key]
|
||||||
|
if !exists {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove from indexes
|
||||||
|
rt.removeFromIndexes(key, route)
|
||||||
|
|
||||||
|
// Remove from main map
|
||||||
|
delete(rt.routes, key)
|
||||||
|
|
||||||
|
// Update metrics
|
||||||
|
if isIPv6(route.Prefix) {
|
||||||
|
rt.ipv6Routes--
|
||||||
|
atomic.AddUint64(&rt.ipv6Updates, 1)
|
||||||
|
} else {
|
||||||
|
rt.ipv4Routes--
|
||||||
|
atomic.AddUint64(&rt.ipv4Updates, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithdrawRoutesByPrefixAndPeer removes all routes for a specific prefix from a specific peer
|
||||||
|
func (rt *RoutingTable) WithdrawRoutesByPrefixAndPeer(prefixID uuid.UUID, peerASN int) int {
|
||||||
|
rt.mu.Lock()
|
||||||
|
defer rt.mu.Unlock()
|
||||||
|
|
||||||
|
prefixRoutes, exists := rt.byPrefix[prefixID]
|
||||||
|
if !exists {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect keys to delete (can't delete while iterating)
|
||||||
|
var keysToDelete []RouteKey
|
||||||
|
for key, route := range prefixRoutes {
|
||||||
|
if route.PeerASN == peerASN {
|
||||||
|
keysToDelete = append(keysToDelete, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the routes
|
||||||
|
count := 0
|
||||||
|
for _, key := range keysToDelete {
|
||||||
|
route, exists := rt.routes[key]
|
||||||
|
if !exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
rt.removeFromIndexes(key, route)
|
||||||
|
delete(rt.routes, key)
|
||||||
|
count++
|
||||||
|
|
||||||
|
// Update metrics
|
||||||
|
if isIPv6(route.Prefix) {
|
||||||
|
rt.ipv6Routes--
|
||||||
|
atomic.AddUint64(&rt.ipv6Updates, 1)
|
||||||
|
} else {
|
||||||
|
rt.ipv4Routes--
|
||||||
|
atomic.AddUint64(&rt.ipv4Updates, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return count
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRoute retrieves a specific route
|
||||||
|
func (rt *RoutingTable) GetRoute(prefixID, originASNID uuid.UUID, peerASN int) (*Route, bool) {
|
||||||
|
rt.mu.RLock()
|
||||||
|
defer rt.mu.RUnlock()
|
||||||
|
|
||||||
|
key := RouteKey{
|
||||||
|
PrefixID: prefixID,
|
||||||
|
OriginASNID: originASNID,
|
||||||
|
PeerASN: peerASN,
|
||||||
|
}
|
||||||
|
|
||||||
|
route, exists := rt.routes[key]
|
||||||
|
if !exists {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return a copy to prevent external modification
|
||||||
|
routeCopy := *route
|
||||||
|
|
||||||
|
return &routeCopy, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRoutesByPrefix returns all routes for a specific prefix
|
||||||
|
func (rt *RoutingTable) GetRoutesByPrefix(prefixID uuid.UUID) []*Route {
|
||||||
|
rt.mu.RLock()
|
||||||
|
defer rt.mu.RUnlock()
|
||||||
|
|
||||||
|
routes := make([]*Route, 0)
|
||||||
|
if prefixRoutes, exists := rt.byPrefix[prefixID]; exists {
|
||||||
|
for _, route := range prefixRoutes {
|
||||||
|
routeCopy := *route
|
||||||
|
routes = append(routes, &routeCopy)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return routes
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRoutesByOriginASN returns all routes originated by a specific ASN
|
||||||
|
func (rt *RoutingTable) GetRoutesByOriginASN(originASNID uuid.UUID) []*Route {
|
||||||
|
rt.mu.RLock()
|
||||||
|
defer rt.mu.RUnlock()
|
||||||
|
|
||||||
|
routes := make([]*Route, 0)
|
||||||
|
if asnRoutes, exists := rt.byOriginASN[originASNID]; exists {
|
||||||
|
for _, route := range asnRoutes {
|
||||||
|
routeCopy := *route
|
||||||
|
routes = append(routes, &routeCopy)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return routes
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRoutesByPeerASN returns all routes received from a specific peer ASN
|
||||||
|
func (rt *RoutingTable) GetRoutesByPeerASN(peerASN int) []*Route {
|
||||||
|
rt.mu.RLock()
|
||||||
|
defer rt.mu.RUnlock()
|
||||||
|
|
||||||
|
routes := make([]*Route, 0)
|
||||||
|
if peerRoutes, exists := rt.byPeerASN[peerASN]; exists {
|
||||||
|
for _, route := range peerRoutes {
|
||||||
|
routeCopy := *route
|
||||||
|
routes = append(routes, &routeCopy)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return routes
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAllRoutes returns all active routes in the routing table
|
||||||
|
func (rt *RoutingTable) GetAllRoutes() []*Route {
|
||||||
|
rt.mu.RLock()
|
||||||
|
defer rt.mu.RUnlock()
|
||||||
|
|
||||||
|
routes := make([]*Route, 0, len(rt.routes))
|
||||||
|
for _, route := range rt.routes {
|
||||||
|
routeCopy := *route
|
||||||
|
routes = append(routes, &routeCopy)
|
||||||
|
}
|
||||||
|
|
||||||
|
return routes
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size returns the total number of routes in the table
|
||||||
|
func (rt *RoutingTable) Size() int {
|
||||||
|
rt.mu.RLock()
|
||||||
|
defer rt.mu.RUnlock()
|
||||||
|
|
||||||
|
return len(rt.routes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stats returns statistics about the routing table
|
||||||
|
func (rt *RoutingTable) Stats() map[string]int {
|
||||||
|
rt.mu.RLock()
|
||||||
|
defer rt.mu.RUnlock()
|
||||||
|
|
||||||
|
stats := map[string]int{
|
||||||
|
"total_routes": len(rt.routes),
|
||||||
|
"unique_prefixes": len(rt.byPrefix),
|
||||||
|
"unique_origins": len(rt.byOriginASN),
|
||||||
|
"unique_peers": len(rt.byPeerASN),
|
||||||
|
}
|
||||||
|
|
||||||
|
return stats
|
||||||
|
}
|
||||||
|
|
||||||
|
// DetailedStats contains detailed routing table statistics
|
||||||
|
type DetailedStats struct {
|
||||||
|
IPv4Routes int
|
||||||
|
IPv6Routes int
|
||||||
|
IPv4UpdatesRate float64
|
||||||
|
IPv6UpdatesRate float64
|
||||||
|
TotalRoutes int
|
||||||
|
UniquePrefixes int
|
||||||
|
UniqueOrigins int
|
||||||
|
UniquePeers int
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDetailedStats returns detailed statistics including IPv4/IPv6 breakdown and update rates
|
||||||
|
func (rt *RoutingTable) GetDetailedStats() DetailedStats {
|
||||||
|
rt.mu.Lock()
|
||||||
|
defer rt.mu.Unlock()
|
||||||
|
|
||||||
|
// Calculate update rates
|
||||||
|
elapsed := time.Since(rt.lastMetricsReset).Seconds()
|
||||||
|
ipv4Updates := atomic.LoadUint64(&rt.ipv4Updates)
|
||||||
|
ipv6Updates := atomic.LoadUint64(&rt.ipv6Updates)
|
||||||
|
|
||||||
|
stats := DetailedStats{
|
||||||
|
IPv4Routes: rt.ipv4Routes,
|
||||||
|
IPv6Routes: rt.ipv6Routes,
|
||||||
|
IPv4UpdatesRate: float64(ipv4Updates) / elapsed,
|
||||||
|
IPv6UpdatesRate: float64(ipv6Updates) / elapsed,
|
||||||
|
TotalRoutes: len(rt.routes),
|
||||||
|
UniquePrefixes: len(rt.byPrefix),
|
||||||
|
UniqueOrigins: len(rt.byOriginASN),
|
||||||
|
UniquePeers: len(rt.byPeerASN),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset counters for next period
|
||||||
|
atomic.StoreUint64(&rt.ipv4Updates, 0)
|
||||||
|
atomic.StoreUint64(&rt.ipv6Updates, 0)
|
||||||
|
rt.lastMetricsReset = time.Now()
|
||||||
|
|
||||||
|
return stats
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear removes all routes from the routing table
|
||||||
|
func (rt *RoutingTable) Clear() {
|
||||||
|
rt.mu.Lock()
|
||||||
|
defer rt.mu.Unlock()
|
||||||
|
|
||||||
|
rt.routes = make(map[RouteKey]*Route)
|
||||||
|
rt.byPrefix = make(map[uuid.UUID]map[RouteKey]*Route)
|
||||||
|
rt.byOriginASN = make(map[uuid.UUID]map[RouteKey]*Route)
|
||||||
|
rt.byPeerASN = make(map[int]map[RouteKey]*Route)
|
||||||
|
rt.ipv4Routes = 0
|
||||||
|
rt.ipv6Routes = 0
|
||||||
|
atomic.StoreUint64(&rt.ipv4Updates, 0)
|
||||||
|
atomic.StoreUint64(&rt.ipv6Updates, 0)
|
||||||
|
rt.lastMetricsReset = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper methods for index management
|
||||||
|
|
||||||
|
func (rt *RoutingTable) addToIndexes(key RouteKey, route *Route) {
|
||||||
|
// Add to prefix index
|
||||||
|
if rt.byPrefix[route.PrefixID] == nil {
|
||||||
|
rt.byPrefix[route.PrefixID] = make(map[RouteKey]*Route)
|
||||||
|
}
|
||||||
|
rt.byPrefix[route.PrefixID][key] = route
|
||||||
|
|
||||||
|
// Add to origin ASN index
|
||||||
|
if rt.byOriginASN[route.OriginASNID] == nil {
|
||||||
|
rt.byOriginASN[route.OriginASNID] = make(map[RouteKey]*Route)
|
||||||
|
}
|
||||||
|
rt.byOriginASN[route.OriginASNID][key] = route
|
||||||
|
|
||||||
|
// Add to peer ASN index
|
||||||
|
if rt.byPeerASN[route.PeerASN] == nil {
|
||||||
|
rt.byPeerASN[route.PeerASN] = make(map[RouteKey]*Route)
|
||||||
|
}
|
||||||
|
rt.byPeerASN[route.PeerASN][key] = route
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rt *RoutingTable) removeFromIndexes(key RouteKey, route *Route) {
|
||||||
|
// Remove from prefix index
|
||||||
|
if prefixRoutes, exists := rt.byPrefix[route.PrefixID]; exists {
|
||||||
|
delete(prefixRoutes, key)
|
||||||
|
if len(prefixRoutes) == 0 {
|
||||||
|
delete(rt.byPrefix, route.PrefixID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove from origin ASN index
|
||||||
|
if asnRoutes, exists := rt.byOriginASN[route.OriginASNID]; exists {
|
||||||
|
delete(asnRoutes, key)
|
||||||
|
if len(asnRoutes) == 0 {
|
||||||
|
delete(rt.byOriginASN, route.OriginASNID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove from peer ASN index
|
||||||
|
if peerRoutes, exists := rt.byPeerASN[route.PeerASN]; exists {
|
||||||
|
delete(peerRoutes, key)
|
||||||
|
if len(peerRoutes) == 0 {
|
||||||
|
delete(rt.byPeerASN, route.PeerASN)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns a string representation of the route key
|
||||||
|
func (k RouteKey) String() string {
|
||||||
|
return fmt.Sprintf("%s/%s/%d", k.PrefixID, k.OriginASNID, k.PeerASN)
|
||||||
|
}
|
||||||
|
|
||||||
|
// isIPv6 returns true if the prefix is an IPv6 address
|
||||||
|
func isIPv6(prefix string) bool {
|
||||||
|
return strings.Contains(prefix, ":")
|
||||||
|
}
|
219
internal/routingtable/routingtable_test.go
Normal file
219
internal/routingtable/routingtable_test.go
Normal file
@ -0,0 +1,219 @@
|
|||||||
|
package routingtable
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRoutingTable(t *testing.T) {
|
||||||
|
rt := New()
|
||||||
|
|
||||||
|
// Test data
|
||||||
|
prefixID1 := uuid.New()
|
||||||
|
prefixID2 := uuid.New()
|
||||||
|
originASNID1 := uuid.New()
|
||||||
|
originASNID2 := uuid.New()
|
||||||
|
|
||||||
|
route1 := &Route{
|
||||||
|
PrefixID: prefixID1,
|
||||||
|
Prefix: "10.0.0.0/8",
|
||||||
|
OriginASNID: originASNID1,
|
||||||
|
OriginASN: 64512,
|
||||||
|
PeerASN: 64513,
|
||||||
|
ASPath: []int{64513, 64512},
|
||||||
|
NextHop: "192.168.1.1",
|
||||||
|
AnnouncedAt: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
route2 := &Route{
|
||||||
|
PrefixID: prefixID2,
|
||||||
|
Prefix: "192.168.0.0/16",
|
||||||
|
OriginASNID: originASNID2,
|
||||||
|
OriginASN: 64514,
|
||||||
|
PeerASN: 64513,
|
||||||
|
ASPath: []int{64513, 64514},
|
||||||
|
NextHop: "192.168.1.1",
|
||||||
|
AnnouncedAt: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test AddRoute
|
||||||
|
rt.AddRoute(route1)
|
||||||
|
rt.AddRoute(route2)
|
||||||
|
|
||||||
|
if rt.Size() != 2 {
|
||||||
|
t.Errorf("Expected 2 routes, got %d", rt.Size())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test GetRoute
|
||||||
|
retrievedRoute, exists := rt.GetRoute(prefixID1, originASNID1, 64513)
|
||||||
|
if !exists {
|
||||||
|
t.Error("Route 1 should exist")
|
||||||
|
}
|
||||||
|
if retrievedRoute.Prefix != "10.0.0.0/8" {
|
||||||
|
t.Errorf("Expected prefix 10.0.0.0/8, got %s", retrievedRoute.Prefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test GetRoutesByPrefix
|
||||||
|
prefixRoutes := rt.GetRoutesByPrefix(prefixID1)
|
||||||
|
if len(prefixRoutes) != 1 {
|
||||||
|
t.Errorf("Expected 1 route for prefix, got %d", len(prefixRoutes))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test GetRoutesByPeerASN
|
||||||
|
peerRoutes := rt.GetRoutesByPeerASN(64513)
|
||||||
|
if len(peerRoutes) != 2 {
|
||||||
|
t.Errorf("Expected 2 routes from peer 64513, got %d", len(peerRoutes))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test RemoveRoute
|
||||||
|
removed := rt.RemoveRoute(prefixID1, originASNID1, 64513)
|
||||||
|
if !removed {
|
||||||
|
t.Error("Route should have been removed")
|
||||||
|
}
|
||||||
|
if rt.Size() != 1 {
|
||||||
|
t.Errorf("Expected 1 route after removal, got %d", rt.Size())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test WithdrawRoutesByPrefixAndPeer
|
||||||
|
// Add the route back first
|
||||||
|
rt.AddRoute(route1)
|
||||||
|
|
||||||
|
// Add another route for the same prefix from the same peer
|
||||||
|
route3 := &Route{
|
||||||
|
PrefixID: prefixID1,
|
||||||
|
Prefix: "10.0.0.0/8",
|
||||||
|
OriginASNID: originASNID2, // Different origin
|
||||||
|
OriginASN: 64515,
|
||||||
|
PeerASN: 64513,
|
||||||
|
ASPath: []int{64513, 64515},
|
||||||
|
NextHop: "192.168.1.1",
|
||||||
|
AnnouncedAt: time.Now(),
|
||||||
|
}
|
||||||
|
rt.AddRoute(route3)
|
||||||
|
|
||||||
|
count := rt.WithdrawRoutesByPrefixAndPeer(prefixID1, 64513)
|
||||||
|
if count != 2 {
|
||||||
|
t.Errorf("Expected to withdraw 2 routes, withdrew %d", count)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should only have route2 left
|
||||||
|
if rt.Size() != 1 {
|
||||||
|
t.Errorf("Expected 1 route after withdrawal, got %d", rt.Size())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test Stats
|
||||||
|
stats := rt.Stats()
|
||||||
|
if stats["total_routes"] != 1 {
|
||||||
|
t.Errorf("Expected 1 total route in stats, got %d", stats["total_routes"])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test Clear
|
||||||
|
rt.Clear()
|
||||||
|
if rt.Size() != 0 {
|
||||||
|
t.Errorf("Expected 0 routes after clear, got %d", rt.Size())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRoutingTableConcurrency(t *testing.T) {
|
||||||
|
rt := New()
|
||||||
|
|
||||||
|
// Test concurrent access
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
numGoroutines := 10
|
||||||
|
numOperations := 100
|
||||||
|
|
||||||
|
// Start multiple goroutines that add/remove routes
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(id int) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
for j := 0; j < numOperations; j++ {
|
||||||
|
prefixID := uuid.New()
|
||||||
|
originASNID := uuid.New()
|
||||||
|
|
||||||
|
route := &Route{
|
||||||
|
PrefixID: prefixID,
|
||||||
|
Prefix: "10.0.0.0/8",
|
||||||
|
OriginASNID: originASNID,
|
||||||
|
OriginASN: 64512 + id,
|
||||||
|
PeerASN: 64500,
|
||||||
|
ASPath: []int{64500, 64512 + id},
|
||||||
|
NextHop: "192.168.1.1",
|
||||||
|
AnnouncedAt: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add route
|
||||||
|
rt.AddRoute(route)
|
||||||
|
|
||||||
|
// Try to get it
|
||||||
|
_, _ = rt.GetRoute(prefixID, originASNID, 64500)
|
||||||
|
|
||||||
|
// Get stats
|
||||||
|
_ = rt.Stats()
|
||||||
|
|
||||||
|
// Remove it
|
||||||
|
rt.RemoveRoute(prefixID, originASNID, 64500)
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Table should be empty after all operations
|
||||||
|
if rt.Size() != 0 {
|
||||||
|
t.Errorf("Expected empty table after concurrent operations, got %d routes", rt.Size())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRouteUpdate(t *testing.T) {
|
||||||
|
rt := New()
|
||||||
|
|
||||||
|
prefixID := uuid.New()
|
||||||
|
originASNID := uuid.New()
|
||||||
|
|
||||||
|
route1 := &Route{
|
||||||
|
PrefixID: prefixID,
|
||||||
|
Prefix: "10.0.0.0/8",
|
||||||
|
OriginASNID: originASNID,
|
||||||
|
OriginASN: 64512,
|
||||||
|
PeerASN: 64513,
|
||||||
|
ASPath: []int{64513, 64512},
|
||||||
|
NextHop: "192.168.1.1",
|
||||||
|
AnnouncedAt: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add initial route
|
||||||
|
rt.AddRoute(route1)
|
||||||
|
|
||||||
|
// Update the same route with new next hop
|
||||||
|
route2 := &Route{
|
||||||
|
PrefixID: prefixID,
|
||||||
|
Prefix: "10.0.0.0/8",
|
||||||
|
OriginASNID: originASNID,
|
||||||
|
OriginASN: 64512,
|
||||||
|
PeerASN: 64513,
|
||||||
|
ASPath: []int{64513, 64512},
|
||||||
|
NextHop: "192.168.1.2", // Changed
|
||||||
|
AnnouncedAt: time.Now().Add(1 * time.Minute),
|
||||||
|
}
|
||||||
|
|
||||||
|
rt.AddRoute(route2)
|
||||||
|
|
||||||
|
// Should still have only 1 route
|
||||||
|
if rt.Size() != 1 {
|
||||||
|
t.Errorf("Expected 1 route after update, got %d", rt.Size())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the route was updated
|
||||||
|
retrievedRoute, exists := rt.GetRoute(prefixID, originASNID, 64513)
|
||||||
|
if !exists {
|
||||||
|
t.Error("Route should exist after update")
|
||||||
|
}
|
||||||
|
if retrievedRoute.NextHop != "192.168.1.2" {
|
||||||
|
t.Errorf("Expected updated next hop 192.168.1.2, got %s", retrievedRoute.NextHop)
|
||||||
|
}
|
||||||
|
}
|
@ -10,6 +10,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||||
|
"git.eeqj.de/sneak/routewatch/internal/routingtable"
|
||||||
"git.eeqj.de/sneak/routewatch/internal/streamer"
|
"git.eeqj.de/sneak/routewatch/internal/streamer"
|
||||||
"git.eeqj.de/sneak/routewatch/internal/templates"
|
"git.eeqj.de/sneak/routewatch/internal/templates"
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
@ -20,15 +21,17 @@ import (
|
|||||||
type Server struct {
|
type Server struct {
|
||||||
router *chi.Mux
|
router *chi.Mux
|
||||||
db database.Store
|
db database.Store
|
||||||
|
routingTable *routingtable.RoutingTable
|
||||||
streamer *streamer.Streamer
|
streamer *streamer.Streamer
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
srv *http.Server
|
srv *http.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new HTTP server
|
// New creates a new HTTP server
|
||||||
func New(db database.Store, streamer *streamer.Streamer, logger *slog.Logger) *Server {
|
func New(db database.Store, rt *routingtable.RoutingTable, streamer *streamer.Streamer, logger *slog.Logger) *Server {
|
||||||
s := &Server{
|
s := &Server{
|
||||||
db: db,
|
db: db,
|
||||||
|
routingTable: rt,
|
||||||
streamer: streamer,
|
streamer: streamer,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
}
|
}
|
||||||
@ -200,7 +203,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
|
|||||||
IPv4Prefixes: dbStats.IPv4Prefixes,
|
IPv4Prefixes: dbStats.IPv4Prefixes,
|
||||||
IPv6Prefixes: dbStats.IPv6Prefixes,
|
IPv6Prefixes: dbStats.IPv6Prefixes,
|
||||||
Peerings: dbStats.Peerings,
|
Peerings: dbStats.Peerings,
|
||||||
LiveRoutes: dbStats.LiveRoutes,
|
LiveRoutes: s.routingTable.Size(),
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
@ -300,7 +303,7 @@ func (s *Server) handleStats() http.HandlerFunc {
|
|||||||
IPv4Prefixes: dbStats.IPv4Prefixes,
|
IPv4Prefixes: dbStats.IPv4Prefixes,
|
||||||
IPv6Prefixes: dbStats.IPv6Prefixes,
|
IPv6Prefixes: dbStats.IPv6Prefixes,
|
||||||
Peerings: dbStats.Peerings,
|
Peerings: dbStats.Peerings,
|
||||||
LiveRoutes: dbStats.LiveRoutes,
|
LiveRoutes: s.routingTable.Size(),
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
@ -25,7 +25,7 @@ const (
|
|||||||
metricsLogInterval = 10 * time.Second
|
metricsLogInterval = 10 * time.Second
|
||||||
bytesPerKB = 1024
|
bytesPerKB = 1024
|
||||||
bytesPerMB = 1024 * 1024
|
bytesPerMB = 1024 * 1024
|
||||||
maxConcurrentHandlers = 100 // Maximum number of concurrent message handlers
|
maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers
|
||||||
)
|
)
|
||||||
|
|
||||||
// MessageHandler is an interface for handling RIS messages
|
// MessageHandler is an interface for handling RIS messages
|
||||||
@ -35,23 +35,43 @@ type MessageHandler interface {
|
|||||||
|
|
||||||
// HandleMessage processes a RIS message
|
// HandleMessage processes a RIS message
|
||||||
HandleMessage(msg *ristypes.RISMessage)
|
HandleMessage(msg *ristypes.RISMessage)
|
||||||
|
|
||||||
|
// QueueCapacity returns the desired queue capacity for this handler
|
||||||
|
// Handlers that process quickly can have larger queues
|
||||||
|
QueueCapacity() int
|
||||||
}
|
}
|
||||||
|
|
||||||
// RawMessageHandler is a callback for handling raw JSON lines from the stream
|
// RawMessageHandler is a callback for handling raw JSON lines from the stream
|
||||||
type RawMessageHandler func(line string)
|
type RawMessageHandler func(line string)
|
||||||
|
|
||||||
|
// handlerMetrics tracks performance metrics for a handler
|
||||||
|
type handlerMetrics struct {
|
||||||
|
processedCount uint64 // Total messages processed
|
||||||
|
droppedCount uint64 // Total messages dropped
|
||||||
|
totalTime time.Duration // Total processing time (for average calculation)
|
||||||
|
minTime time.Duration // Minimum processing time
|
||||||
|
maxTime time.Duration // Maximum processing time
|
||||||
|
mu sync.Mutex // Protects the metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
// handlerInfo wraps a handler with its queue and metrics
|
||||||
|
type handlerInfo struct {
|
||||||
|
handler MessageHandler
|
||||||
|
queue chan *ristypes.RISMessage
|
||||||
|
metrics handlerMetrics
|
||||||
|
}
|
||||||
|
|
||||||
// Streamer handles streaming BGP updates from RIS Live
|
// Streamer handles streaming BGP updates from RIS Live
|
||||||
type Streamer struct {
|
type Streamer struct {
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
client *http.Client
|
client *http.Client
|
||||||
handlers []MessageHandler
|
handlers []*handlerInfo
|
||||||
rawHandler RawMessageHandler
|
rawHandler RawMessageHandler
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
running bool
|
running bool
|
||||||
metrics *metrics.Tracker
|
metrics *metrics.Tracker
|
||||||
semaphore chan struct{} // Limits concurrent message processing
|
totalDropped uint64 // Total dropped messages across all handlers
|
||||||
droppedMessages uint64 // Atomic counter for dropped messages
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new RIS streamer
|
// New creates a new RIS streamer
|
||||||
@ -61,9 +81,8 @@ func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer {
|
|||||||
client: &http.Client{
|
client: &http.Client{
|
||||||
Timeout: 0, // No timeout for streaming
|
Timeout: 0, // No timeout for streaming
|
||||||
},
|
},
|
||||||
handlers: make([]MessageHandler, 0),
|
handlers: make([]*handlerInfo, 0),
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
semaphore: make(chan struct{}, maxConcurrentHandlers),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,7 +90,19 @@ func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer {
|
|||||||
func (s *Streamer) RegisterHandler(handler MessageHandler) {
|
func (s *Streamer) RegisterHandler(handler MessageHandler) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
s.handlers = append(s.handlers, handler)
|
|
||||||
|
// Create handler info with its own queue based on capacity
|
||||||
|
info := &handlerInfo{
|
||||||
|
handler: handler,
|
||||||
|
queue: make(chan *ristypes.RISMessage, handler.QueueCapacity()),
|
||||||
|
}
|
||||||
|
|
||||||
|
s.handlers = append(s.handlers, info)
|
||||||
|
|
||||||
|
// If we're already running, start a worker for this handler
|
||||||
|
if s.running {
|
||||||
|
go s.runHandlerWorker(info)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterRawHandler sets a callback for raw message lines
|
// RegisterRawHandler sets a callback for raw message lines
|
||||||
@ -94,6 +125,11 @@ func (s *Streamer) Start() error {
|
|||||||
s.cancel = cancel
|
s.cancel = cancel
|
||||||
s.running = true
|
s.running = true
|
||||||
|
|
||||||
|
// Start workers for each handler
|
||||||
|
for _, info := range s.handlers {
|
||||||
|
go s.runHandlerWorker(info)
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := s.stream(ctx); err != nil {
|
if err := s.stream(ctx); err != nil {
|
||||||
s.logger.Error("Streaming error", "error", err)
|
s.logger.Error("Streaming error", "error", err)
|
||||||
@ -112,10 +148,40 @@ func (s *Streamer) Stop() {
|
|||||||
if s.cancel != nil {
|
if s.cancel != nil {
|
||||||
s.cancel()
|
s.cancel()
|
||||||
}
|
}
|
||||||
|
// Close all handler queues to signal workers to stop
|
||||||
|
for _, info := range s.handlers {
|
||||||
|
close(info.queue)
|
||||||
|
}
|
||||||
|
s.running = false
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
s.metrics.SetConnected(false)
|
s.metrics.SetConnected(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// runHandlerWorker processes messages for a specific handler
|
||||||
|
func (s *Streamer) runHandlerWorker(info *handlerInfo) {
|
||||||
|
for msg := range info.queue {
|
||||||
|
start := time.Now()
|
||||||
|
info.handler.HandleMessage(msg)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
|
||||||
|
// Update metrics
|
||||||
|
info.metrics.mu.Lock()
|
||||||
|
info.metrics.processedCount++
|
||||||
|
info.metrics.totalTime += elapsed
|
||||||
|
|
||||||
|
// Update min time
|
||||||
|
if info.metrics.minTime == 0 || elapsed < info.metrics.minTime {
|
||||||
|
info.metrics.minTime = elapsed
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update max time
|
||||||
|
if elapsed > info.metrics.maxTime {
|
||||||
|
info.metrics.maxTime = elapsed
|
||||||
|
}
|
||||||
|
info.metrics.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// IsRunning returns whether the streamer is currently active
|
// IsRunning returns whether the streamer is currently active
|
||||||
func (s *Streamer) IsRunning() bool {
|
func (s *Streamer) IsRunning() bool {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
@ -131,7 +197,7 @@ func (s *Streamer) GetMetrics() metrics.StreamMetrics {
|
|||||||
|
|
||||||
// GetDroppedMessages returns the total number of dropped messages
|
// GetDroppedMessages returns the total number of dropped messages
|
||||||
func (s *Streamer) GetDroppedMessages() uint64 {
|
func (s *Streamer) GetDroppedMessages() uint64 {
|
||||||
return atomic.LoadUint64(&s.droppedMessages)
|
return atomic.LoadUint64(&s.totalDropped)
|
||||||
}
|
}
|
||||||
|
|
||||||
// logMetrics logs the current streaming statistics
|
// logMetrics logs the current streaming statistics
|
||||||
@ -140,18 +206,57 @@ func (s *Streamer) logMetrics() {
|
|||||||
uptime := time.Since(metrics.ConnectedSince)
|
uptime := time.Since(metrics.ConnectedSince)
|
||||||
|
|
||||||
const bitsPerMegabit = 1000000
|
const bitsPerMegabit = 1000000
|
||||||
droppedMessages := atomic.LoadUint64(&s.droppedMessages)
|
totalDropped := atomic.LoadUint64(&s.totalDropped)
|
||||||
s.logger.Info("Stream statistics",
|
|
||||||
"uptime", uptime,
|
s.logger.Info(
|
||||||
"total_messages", metrics.TotalMessages,
|
"Stream statistics",
|
||||||
"total_bytes", metrics.TotalBytes,
|
"uptime",
|
||||||
"total_mb", fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
|
uptime,
|
||||||
"messages_per_sec", fmt.Sprintf("%.2f", metrics.MessagesPerSec),
|
"total_messages",
|
||||||
"bits_per_sec", fmt.Sprintf("%.0f", metrics.BitsPerSec),
|
metrics.TotalMessages,
|
||||||
"mbps", fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
|
"total_bytes",
|
||||||
"dropped_messages", droppedMessages,
|
metrics.TotalBytes,
|
||||||
"active_handlers", len(s.semaphore),
|
"total_mb",
|
||||||
|
fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
|
||||||
|
"messages_per_sec",
|
||||||
|
fmt.Sprintf("%.2f", metrics.MessagesPerSec),
|
||||||
|
"bits_per_sec",
|
||||||
|
fmt.Sprintf("%.0f", metrics.BitsPerSec),
|
||||||
|
"mbps",
|
||||||
|
fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
|
||||||
|
"total_dropped",
|
||||||
|
totalDropped,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Log per-handler statistics
|
||||||
|
s.mu.RLock()
|
||||||
|
for i, info := range s.handlers {
|
||||||
|
info.metrics.mu.Lock()
|
||||||
|
if info.metrics.processedCount > 0 {
|
||||||
|
// Safe conversion: processedCount is bounded by maxInt64
|
||||||
|
processedCount := info.metrics.processedCount
|
||||||
|
const maxInt64 = 1<<63 - 1
|
||||||
|
if processedCount > maxInt64 {
|
||||||
|
processedCount = maxInt64
|
||||||
|
}
|
||||||
|
//nolint:gosec // processedCount is explicitly bounded above
|
||||||
|
avgTime := info.metrics.totalTime / time.Duration(processedCount)
|
||||||
|
s.logger.Info(
|
||||||
|
"Handler statistics",
|
||||||
|
"handler", fmt.Sprintf("%T", info.handler),
|
||||||
|
"index", i,
|
||||||
|
"queue_len", len(info.queue),
|
||||||
|
"queue_cap", cap(info.queue),
|
||||||
|
"processed", info.metrics.processedCount,
|
||||||
|
"dropped", info.metrics.droppedCount,
|
||||||
|
"avg_time", avgTime,
|
||||||
|
"min_time", info.metrics.minTime,
|
||||||
|
"max_time", info.metrics.maxTime,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
info.metrics.mu.Unlock()
|
||||||
|
}
|
||||||
|
s.mu.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateMetrics updates the metrics counters and rates
|
// updateMetrics updates the metrics counters and rates
|
||||||
@ -226,25 +331,12 @@ func (s *Streamer) stream(ctx context.Context) error {
|
|||||||
rawHandler(string(line))
|
rawHandler(string(line))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get current handlers
|
// Parse the message first
|
||||||
s.mu.RLock()
|
|
||||||
handlers := make([]MessageHandler, len(s.handlers))
|
|
||||||
copy(handlers, s.handlers)
|
|
||||||
s.mu.RUnlock()
|
|
||||||
|
|
||||||
// Try to acquire semaphore, drop message if at capacity
|
|
||||||
select {
|
|
||||||
case s.semaphore <- struct{}{}:
|
|
||||||
// Successfully acquired semaphore, process message
|
|
||||||
go func(rawLine []byte, messageHandlers []MessageHandler) {
|
|
||||||
defer func() { <-s.semaphore }() // Release semaphore when done
|
|
||||||
|
|
||||||
// Parse the outer wrapper first
|
|
||||||
var wrapper ristypes.RISLiveMessage
|
var wrapper ristypes.RISLiveMessage
|
||||||
if err := json.Unmarshal(rawLine, &wrapper); err != nil {
|
if err := json.Unmarshal(line, &wrapper); err != nil {
|
||||||
// Output the raw line and panic on parse failure
|
// Output the raw line and panic on parse failure
|
||||||
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err)
|
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err)
|
||||||
fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(rawLine))
|
fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(line))
|
||||||
panic(fmt.Sprintf("JSON parse error: %v", err))
|
panic(fmt.Sprintf("JSON parse error: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -252,10 +344,10 @@ func (s *Streamer) stream(ctx context.Context) error {
|
|||||||
if wrapper.Type != "ris_message" {
|
if wrapper.Type != "ris_message" {
|
||||||
s.logger.Error("Unexpected wrapper type",
|
s.logger.Error("Unexpected wrapper type",
|
||||||
"type", wrapper.Type,
|
"type", wrapper.Type,
|
||||||
"line", string(rawLine),
|
"line", string(line),
|
||||||
)
|
)
|
||||||
|
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the actual message
|
// Get the actual message
|
||||||
@ -268,51 +360,63 @@ func (s *Streamer) stream(ctx context.Context) error {
|
|||||||
switch msg.Type {
|
switch msg.Type {
|
||||||
case "UPDATE":
|
case "UPDATE":
|
||||||
// Process BGP UPDATE messages
|
// Process BGP UPDATE messages
|
||||||
// Will be handled by registered handlers
|
// Will be dispatched to handlers
|
||||||
case "RIS_PEER_STATE":
|
case "RIS_PEER_STATE":
|
||||||
// RIS peer state messages - silently ignore
|
// RIS peer state messages - silently ignore
|
||||||
|
continue
|
||||||
case "KEEPALIVE":
|
case "KEEPALIVE":
|
||||||
// BGP keepalive messages - silently process
|
// BGP keepalive messages - silently process
|
||||||
|
continue
|
||||||
case "OPEN":
|
case "OPEN":
|
||||||
// BGP open messages
|
// BGP open messages
|
||||||
s.logger.Info("BGP session opened",
|
s.logger.Info("BGP session opened",
|
||||||
"peer", msg.Peer,
|
"peer", msg.Peer,
|
||||||
"peer_asn", msg.PeerASN,
|
"peer_asn", msg.PeerASN,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
continue
|
||||||
case "NOTIFICATION":
|
case "NOTIFICATION":
|
||||||
// BGP notification messages (errors)
|
// BGP notification messages (errors)
|
||||||
s.logger.Warn("BGP notification",
|
s.logger.Warn("BGP notification",
|
||||||
"peer", msg.Peer,
|
"peer", msg.Peer,
|
||||||
"peer_asn", msg.PeerASN,
|
"peer_asn", msg.PeerASN,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
continue
|
||||||
case "STATE":
|
case "STATE":
|
||||||
// Peer state changes - silently ignore
|
// Peer state changes - silently ignore
|
||||||
|
continue
|
||||||
default:
|
default:
|
||||||
fmt.Fprintf(
|
fmt.Fprintf(
|
||||||
os.Stderr,
|
os.Stderr,
|
||||||
"UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n",
|
"UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n",
|
||||||
msg.Type,
|
msg.Type,
|
||||||
string(rawLine),
|
string(line),
|
||||||
|
)
|
||||||
|
panic(
|
||||||
|
fmt.Sprintf(
|
||||||
|
"Unknown RIS message type: %s",
|
||||||
|
msg.Type,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call handlers synchronously within this goroutine
|
// Dispatch to interested handlers
|
||||||
// This prevents unbounded goroutine growth at the handler level
|
s.mu.RLock()
|
||||||
for _, handler := range messageHandlers {
|
for _, info := range s.handlers {
|
||||||
if handler.WantsMessage(msg.Type) {
|
if info.handler.WantsMessage(msg.Type) {
|
||||||
handler.HandleMessage(&msg)
|
select {
|
||||||
}
|
case info.queue <- &msg:
|
||||||
}
|
// Message queued successfully
|
||||||
}(append([]byte(nil), line...), handlers) // Copy the line to avoid data races
|
|
||||||
default:
|
default:
|
||||||
// Semaphore is full, drop the message
|
// Queue is full, drop the message
|
||||||
dropped := atomic.AddUint64(&s.droppedMessages, 1)
|
atomic.AddUint64(&info.metrics.droppedCount, 1)
|
||||||
if dropped%1000 == 0 { // Log every 1000 dropped messages
|
atomic.AddUint64(&s.totalDropped, 1)
|
||||||
s.logger.Warn("Dropping messages due to overload", "total_dropped", dropped, "max_handlers", maxConcurrentHandlers)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
s.mu.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
if err := scanner.Err(); err != nil {
|
if err := scanner.Err(); err != nil {
|
||||||
return fmt.Errorf("scanner error: %w", err)
|
return fmt.Errorf("scanner error: %w", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user