Compare commits
	
		
			4 Commits
		
	
	
		
			d328fb0942
			...
			1d05372899
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 1d05372899 | |||
| 76ec9f68b7 | |||
| a555a1dee2 | |||
| b49d3ce88c | 
							
								
								
									
										2
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								Makefile
									
									
									
									
									
								
							@ -15,7 +15,7 @@ lint:
 | 
			
		||||
	golangci-lint run
 | 
			
		||||
 | 
			
		||||
build:
 | 
			
		||||
	go build -o bin/routewatch cmd/routewatch/main.go
 | 
			
		||||
	CGO_ENABLED=1 go build -o bin/routewatch cmd/routewatch/main.go
 | 
			
		||||
 | 
			
		||||
clean:
 | 
			
		||||
	rm -rf bin/
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										14
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										14
									
								
								go.mod
									
									
									
									
									
								
							@ -3,24 +3,16 @@ module git.eeqj.de/sneak/routewatch
 | 
			
		||||
go 1.24.4
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	github.com/go-chi/chi/v5 v5.2.2
 | 
			
		||||
	github.com/google/uuid v1.6.0
 | 
			
		||||
	github.com/mattn/go-sqlite3 v1.14.29
 | 
			
		||||
	github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9
 | 
			
		||||
	go.uber.org/fx v1.24.0
 | 
			
		||||
	modernc.org/sqlite v1.38.1
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	github.com/dustin/go-humanize v1.0.1 // indirect
 | 
			
		||||
	github.com/go-chi/chi/v5 v5.2.2 // indirect
 | 
			
		||||
	github.com/mattn/go-isatty v0.0.20 // indirect
 | 
			
		||||
	github.com/ncruces/go-strftime v0.1.9 // indirect
 | 
			
		||||
	github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
 | 
			
		||||
	github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
 | 
			
		||||
	go.uber.org/dig v1.19.0 // indirect
 | 
			
		||||
	go.uber.org/multierr v1.10.0 // indirect
 | 
			
		||||
	go.uber.org/zap v1.26.0 // indirect
 | 
			
		||||
	golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect
 | 
			
		||||
	golang.org/x/sys v0.34.0 // indirect
 | 
			
		||||
	modernc.org/libc v1.66.3 // indirect
 | 
			
		||||
	modernc.org/mathutil v1.7.1 // indirect
 | 
			
		||||
	modernc.org/memory v1.11.0 // indirect
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										47
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										47
									
								
								go.sum
									
									
									
									
									
								
							@ -1,23 +1,15 @@
 | 
			
		||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 | 
			
		||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 | 
			
		||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
 | 
			
		||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
 | 
			
		||||
github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618=
 | 
			
		||||
github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
 | 
			
		||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
 | 
			
		||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
 | 
			
		||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
 | 
			
		||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 | 
			
		||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
 | 
			
		||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
 | 
			
		||||
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
 | 
			
		||||
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
 | 
			
		||||
github.com/mattn/go-sqlite3 v1.14.29 h1:1O6nRLJKvsi1H2Sj0Hzdfojwt8GiGKm+LOfLaBFaouQ=
 | 
			
		||||
github.com/mattn/go-sqlite3 v1.14.29/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
 | 
			
		||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 | 
			
		||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 | 
			
		||||
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg=
 | 
			
		||||
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
 | 
			
		||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
 | 
			
		||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
 | 
			
		||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
 | 
			
		||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
 | 
			
		||||
go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4=
 | 
			
		||||
@ -30,42 +22,7 @@ go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
 | 
			
		||||
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
 | 
			
		||||
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
 | 
			
		||||
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
 | 
			
		||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o=
 | 
			
		||||
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8=
 | 
			
		||||
golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
 | 
			
		||||
golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
 | 
			
		||||
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
 | 
			
		||||
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
 | 
			
		||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 | 
			
		||||
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
 | 
			
		||||
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
 | 
			
		||||
golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo=
 | 
			
		||||
golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg=
 | 
			
		||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 | 
			
		||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 | 
			
		||||
modernc.org/cc/v4 v4.26.2 h1:991HMkLjJzYBIfha6ECZdjrIYz2/1ayr+FL8GN+CNzM=
 | 
			
		||||
modernc.org/cc/v4 v4.26.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
 | 
			
		||||
modernc.org/ccgo/v4 v4.28.0 h1:rjznn6WWehKq7dG4JtLRKxb52Ecv8OUGah8+Z/SfpNU=
 | 
			
		||||
modernc.org/ccgo/v4 v4.28.0/go.mod h1:JygV3+9AV6SmPhDasu4JgquwU81XAKLd3OKTUDNOiKE=
 | 
			
		||||
modernc.org/fileutil v1.3.8 h1:qtzNm7ED75pd1C7WgAGcK4edm4fvhtBsEiI/0NQ54YM=
 | 
			
		||||
modernc.org/fileutil v1.3.8/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc=
 | 
			
		||||
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
 | 
			
		||||
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
 | 
			
		||||
modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks=
 | 
			
		||||
modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI=
 | 
			
		||||
modernc.org/libc v1.66.3 h1:cfCbjTUcdsKyyZZfEUKfoHcP3S0Wkvz3jgSzByEWVCQ=
 | 
			
		||||
modernc.org/libc v1.66.3/go.mod h1:XD9zO8kt59cANKvHPXpx7yS2ELPheAey0vjIuZOhOU8=
 | 
			
		||||
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
 | 
			
		||||
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
 | 
			
		||||
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
 | 
			
		||||
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
 | 
			
		||||
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
 | 
			
		||||
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
 | 
			
		||||
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
 | 
			
		||||
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
 | 
			
		||||
modernc.org/sqlite v1.38.1 h1:jNnIjleVta+DKSAr3TnkKK87EEhjPhBLzi6hvIX9Bas=
 | 
			
		||||
modernc.org/sqlite v1.38.1/go.mod h1:cPTJYSlgg3Sfg046yBShXENNtPrWrDX8bsbAQBzgQ5E=
 | 
			
		||||
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
 | 
			
		||||
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
 | 
			
		||||
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
 | 
			
		||||
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
 | 
			
		||||
 | 
			
		||||
@ -11,8 +11,9 @@ import (
 | 
			
		||||
	"runtime"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/pkg/asinfo"
 | 
			
		||||
	"github.com/google/uuid"
 | 
			
		||||
	_ "modernc.org/sqlite" // Pure Go SQLite driver
 | 
			
		||||
	_ "github.com/mattn/go-sqlite3" // CGO SQLite driver
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
//go:embed schema.sql
 | 
			
		||||
@ -94,10 +95,10 @@ func NewWithConfig(config Config, logger *slog.Logger) (*Database, error) {
 | 
			
		||||
		return nil, fmt.Errorf("failed to create database directory: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Add connection parameters for modernc.org/sqlite
 | 
			
		||||
	// Add connection parameters for go-sqlite3
 | 
			
		||||
	// Enable WAL mode and other performance optimizations
 | 
			
		||||
	dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&_cache_size=-512000", config.Path)
 | 
			
		||||
	db, err := sql.Open("sqlite", dsn)
 | 
			
		||||
	dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&cache=shared", config.Path)
 | 
			
		||||
	db, err := sql.Open("sqlite3", dsn)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("failed to open database: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
@ -126,7 +127,7 @@ func (d *Database) Initialize() error {
 | 
			
		||||
	pragmas := []string{
 | 
			
		||||
		"PRAGMA journal_mode=WAL",        // Already set in connection string
 | 
			
		||||
		"PRAGMA synchronous=NORMAL",      // Faster than FULL, still safe
 | 
			
		||||
		"PRAGMA cache_size=-524288",      // 512MB cache
 | 
			
		||||
		"PRAGMA cache_size=-524288",      // 512MB cache (negative = KB)
 | 
			
		||||
		"PRAGMA temp_store=MEMORY",       // Use memory for temp tables
 | 
			
		||||
		"PRAGMA mmap_size=268435456",     // 256MB memory-mapped I/O
 | 
			
		||||
		"PRAGMA wal_autocheckpoint=1000", // Checkpoint every 1000 pages
 | 
			
		||||
@ -174,12 +175,15 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
 | 
			
		||||
 | 
			
		||||
	var asn ASN
 | 
			
		||||
	var idStr string
 | 
			
		||||
	err = tx.QueryRow("SELECT id, number, first_seen, last_seen FROM asns WHERE number = ?", number).
 | 
			
		||||
		Scan(&idStr, &asn.Number, &asn.FirstSeen, &asn.LastSeen)
 | 
			
		||||
	var handle, description sql.NullString
 | 
			
		||||
	err = tx.QueryRow("SELECT id, number, handle, description, first_seen, last_seen FROM asns WHERE number = ?", number).
 | 
			
		||||
		Scan(&idStr, &asn.Number, &handle, &description, &asn.FirstSeen, &asn.LastSeen)
 | 
			
		||||
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		// ASN exists, update last_seen
 | 
			
		||||
		asn.ID, _ = uuid.Parse(idStr)
 | 
			
		||||
		asn.Handle = handle.String
 | 
			
		||||
		asn.Description = description.String
 | 
			
		||||
		_, err = tx.Exec("UPDATE asns SET last_seen = ? WHERE id = ?", timestamp, asn.ID.String())
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
@ -199,15 +203,22 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// ASN doesn't exist, create it
 | 
			
		||||
	// ASN doesn't exist, create it with ASN info lookup
 | 
			
		||||
	asn = ASN{
 | 
			
		||||
		ID:        generateUUID(),
 | 
			
		||||
		Number:    number,
 | 
			
		||||
		FirstSeen: timestamp,
 | 
			
		||||
		LastSeen:  timestamp,
 | 
			
		||||
	}
 | 
			
		||||
	_, err = tx.Exec("INSERT INTO asns (id, number, first_seen, last_seen) VALUES (?, ?, ?, ?)",
 | 
			
		||||
		asn.ID.String(), asn.Number, asn.FirstSeen, asn.LastSeen)
 | 
			
		||||
 | 
			
		||||
	// Look up ASN info
 | 
			
		||||
	if info, ok := asinfo.Get(number); ok {
 | 
			
		||||
		asn.Handle = info.Handle
 | 
			
		||||
		asn.Description = info.Description
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = tx.Exec("INSERT INTO asns (id, number, handle, description, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?)",
 | 
			
		||||
		asn.ID.String(), asn.Number, asn.Handle, asn.Description, asn.FirstSeen, asn.LastSeen)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
@ -340,75 +351,6 @@ func (d *Database) RecordPeering(fromASNID, toASNID string, timestamp time.Time)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UpdateLiveRoute updates the live routing table for an announcement
 | 
			
		||||
func (d *Database) UpdateLiveRoute(
 | 
			
		||||
	prefixID, originASNID uuid.UUID,
 | 
			
		||||
	peerASN int,
 | 
			
		||||
	nextHop string,
 | 
			
		||||
	timestamp time.Time,
 | 
			
		||||
) error {
 | 
			
		||||
	// Use SQLite's UPSERT capability to avoid the SELECT+UPDATE/INSERT pattern
 | 
			
		||||
	// This reduces the number of queries and improves performance
 | 
			
		||||
	// Note: We removed the WHERE clause from ON CONFLICT UPDATE because
 | 
			
		||||
	// if we're updating, we want to update regardless of withdrawn_at status
 | 
			
		||||
	err := d.exec(`
 | 
			
		||||
		INSERT INTO live_routes (id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at, withdrawn_at)
 | 
			
		||||
		VALUES (?, ?, ?, ?, ?, ?, NULL)
 | 
			
		||||
		ON CONFLICT(prefix_id, origin_asn_id, peer_asn) DO UPDATE SET
 | 
			
		||||
			next_hop = excluded.next_hop,
 | 
			
		||||
			announced_at = excluded.announced_at,
 | 
			
		||||
			withdrawn_at = NULL`,
 | 
			
		||||
		generateUUID().String(), prefixID.String(), originASNID.String(),
 | 
			
		||||
		peerASN, nextHop, timestamp)
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WithdrawLiveRoute marks a route as withdrawn in the live routing table
 | 
			
		||||
func (d *Database) WithdrawLiveRoute(prefixID uuid.UUID, peerASN int, timestamp time.Time) error {
 | 
			
		||||
	err := d.exec(`
 | 
			
		||||
		UPDATE live_routes 
 | 
			
		||||
		SET withdrawn_at = ?
 | 
			
		||||
		WHERE prefix_id = ? AND peer_asn = ? AND withdrawn_at IS NULL`,
 | 
			
		||||
		timestamp, prefixID.String(), peerASN)
 | 
			
		||||
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetActiveLiveRoutes returns all currently active routes (not withdrawn)
 | 
			
		||||
func (d *Database) GetActiveLiveRoutes() ([]LiveRoute, error) {
 | 
			
		||||
	rows, err := d.query(`
 | 
			
		||||
		SELECT id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at
 | 
			
		||||
		FROM live_routes 
 | 
			
		||||
		WHERE withdrawn_at IS NULL
 | 
			
		||||
		ORDER BY announced_at DESC`)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	defer func() {
 | 
			
		||||
		_ = rows.Close()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	var routes []LiveRoute
 | 
			
		||||
	for rows.Next() {
 | 
			
		||||
		var route LiveRoute
 | 
			
		||||
		var idStr, prefixIDStr, originASNIDStr string
 | 
			
		||||
		err := rows.Scan(&idStr, &prefixIDStr, &originASNIDStr,
 | 
			
		||||
			&route.PeerASN, &route.NextHop, &route.AnnouncedAt)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		route.ID, _ = uuid.Parse(idStr)
 | 
			
		||||
		route.PrefixID, _ = uuid.Parse(prefixIDStr)
 | 
			
		||||
		route.OriginASNID, _ = uuid.Parse(originASNIDStr)
 | 
			
		||||
 | 
			
		||||
		routes = append(routes, route)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return routes, rows.Err()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UpdatePeer updates or creates a BGP peer record
 | 
			
		||||
func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
 | 
			
		||||
	tx, err := d.beginTx()
 | 
			
		||||
@ -495,13 +437,6 @@ func (d *Database) GetStats() (Stats, error) {
 | 
			
		||||
		return stats, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Count live routes
 | 
			
		||||
	d.logger.Info("Counting live routes")
 | 
			
		||||
	err = d.queryRow("SELECT COUNT(*) FROM live_routes WHERE withdrawn_at IS NULL").Scan(&stats.LiveRoutes)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return stats, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	d.logger.Info("Stats collection complete")
 | 
			
		||||
 | 
			
		||||
	return stats, nil
 | 
			
		||||
 | 
			
		||||
@ -2,8 +2,6 @@ package database
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/google/uuid"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Stats contains database statistics
 | 
			
		||||
@ -13,7 +11,6 @@ type Stats struct {
 | 
			
		||||
	IPv4Prefixes int
 | 
			
		||||
	IPv6Prefixes int
 | 
			
		||||
	Peerings     int
 | 
			
		||||
	LiveRoutes   int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Store defines the interface for database operations
 | 
			
		||||
@ -30,11 +27,6 @@ type Store interface {
 | 
			
		||||
	// Peering operations
 | 
			
		||||
	RecordPeering(fromASNID, toASNID string, timestamp time.Time) error
 | 
			
		||||
 | 
			
		||||
	// Live route operations
 | 
			
		||||
	UpdateLiveRoute(prefixID, originASNID uuid.UUID, peerASN int, nextHop string, timestamp time.Time) error
 | 
			
		||||
	WithdrawLiveRoute(prefixID uuid.UUID, peerASN int, timestamp time.Time) error
 | 
			
		||||
	GetActiveLiveRoutes() ([]LiveRoute, error)
 | 
			
		||||
 | 
			
		||||
	// Statistics
 | 
			
		||||
	GetStats() (Stats, error)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -10,6 +10,8 @@ import (
 | 
			
		||||
type ASN struct {
 | 
			
		||||
	ID          uuid.UUID `json:"id"`
 | 
			
		||||
	Number      int       `json:"number"`
 | 
			
		||||
	Handle      string    `json:"handle"`
 | 
			
		||||
	Description string    `json:"description"`
 | 
			
		||||
	FirstSeen   time.Time `json:"first_seen"`
 | 
			
		||||
	LastSeen    time.Time `json:"last_seen"`
 | 
			
		||||
}
 | 
			
		||||
@ -43,15 +45,3 @@ type ASNPeering struct {
 | 
			
		||||
	FirstSeen time.Time `json:"first_seen"`
 | 
			
		||||
	LastSeen  time.Time `json:"last_seen"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// LiveRoute represents the current state of a route in the live routing table
 | 
			
		||||
type LiveRoute struct {
 | 
			
		||||
	ID          uuid.UUID  `json:"id"`
 | 
			
		||||
	PrefixID    uuid.UUID  `json:"prefix_id"`
 | 
			
		||||
	OriginASNID uuid.UUID  `json:"origin_asn_id"`
 | 
			
		||||
	PeerASN     int        `json:"peer_asn"`
 | 
			
		||||
	Path        string     `json:"path"`
 | 
			
		||||
	NextHop     string     `json:"next_hop"`
 | 
			
		||||
	AnnouncedAt time.Time  `json:"announced_at"`
 | 
			
		||||
	WithdrawnAt *time.Time `json:"withdrawn_at"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,8 @@
 | 
			
		||||
CREATE TABLE IF NOT EXISTS asns (
 | 
			
		||||
	id TEXT PRIMARY KEY,
 | 
			
		||||
	number INTEGER UNIQUE NOT NULL,
 | 
			
		||||
	handle TEXT,
 | 
			
		||||
	description TEXT,
 | 
			
		||||
	first_seen DATETIME NOT NULL,
 | 
			
		||||
	last_seen DATETIME NOT NULL
 | 
			
		||||
);
 | 
			
		||||
@ -48,20 +50,6 @@ CREATE TABLE IF NOT EXISTS bgp_peers (
 | 
			
		||||
	last_message_type TEXT
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
-- Live routing table: current state of announced routes
 | 
			
		||||
CREATE TABLE IF NOT EXISTS live_routes (
 | 
			
		||||
	id TEXT PRIMARY KEY,
 | 
			
		||||
	prefix_id TEXT NOT NULL,
 | 
			
		||||
	origin_asn_id TEXT NOT NULL,
 | 
			
		||||
	peer_asn INTEGER NOT NULL,
 | 
			
		||||
	next_hop TEXT,
 | 
			
		||||
	announced_at DATETIME NOT NULL,
 | 
			
		||||
	withdrawn_at DATETIME,
 | 
			
		||||
	FOREIGN KEY (prefix_id) REFERENCES prefixes(id),
 | 
			
		||||
	FOREIGN KEY (origin_asn_id) REFERENCES asns(id),
 | 
			
		||||
	UNIQUE(prefix_id, origin_asn_id, peer_asn)
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_prefixes_ip_version ON prefixes(ip_version);
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_prefixes_version_prefix ON prefixes(ip_version, prefix);
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp);
 | 
			
		||||
@ -71,43 +59,6 @@ CREATE INDEX IF NOT EXISTS idx_asn_peerings_from_asn ON asn_peerings(from_asn_id
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_asn_peerings_to_asn ON asn_peerings(to_asn_id);
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_asn_peerings_lookup ON asn_peerings(from_asn_id, to_asn_id);
 | 
			
		||||
 | 
			
		||||
-- Indexes for live routes table
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_live_routes_active 
 | 
			
		||||
	ON live_routes(prefix_id, origin_asn_id) 
 | 
			
		||||
	WHERE withdrawn_at IS NULL;
 | 
			
		||||
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_live_routes_origin 
 | 
			
		||||
	ON live_routes(origin_asn_id) 
 | 
			
		||||
	WHERE withdrawn_at IS NULL;
 | 
			
		||||
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_live_routes_prefix 
 | 
			
		||||
	ON live_routes(prefix_id) 
 | 
			
		||||
	WHERE withdrawn_at IS NULL;
 | 
			
		||||
 | 
			
		||||
-- Critical index for the most common query pattern
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_live_routes_lookup
 | 
			
		||||
	ON live_routes(prefix_id, origin_asn_id, peer_asn)
 | 
			
		||||
	WHERE withdrawn_at IS NULL;
 | 
			
		||||
 | 
			
		||||
-- Index for withdrawal updates by prefix and peer
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_live_routes_withdraw
 | 
			
		||||
	ON live_routes(prefix_id, peer_asn)
 | 
			
		||||
	WHERE withdrawn_at IS NULL;
 | 
			
		||||
 | 
			
		||||
-- Covering index for SELECT id queries (includes id in index)
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_live_routes_covering
 | 
			
		||||
	ON live_routes(prefix_id, origin_asn_id, peer_asn, id)
 | 
			
		||||
	WHERE withdrawn_at IS NULL;
 | 
			
		||||
 | 
			
		||||
-- Index for UPDATE by id operations
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_live_routes_id
 | 
			
		||||
	ON live_routes(id);
 | 
			
		||||
 | 
			
		||||
-- Index for stats queries
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_live_routes_stats
 | 
			
		||||
	ON live_routes(withdrawn_at)
 | 
			
		||||
	WHERE withdrawn_at IS NULL;
 | 
			
		||||
 | 
			
		||||
-- Additional indexes for prefixes table
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_prefixes_prefix ON prefixes(prefix);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -26,6 +26,7 @@ func (d *Database) queryRow(query string, args ...interface{}) *sql.Row {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// query wraps Query with slow query logging
 | 
			
		||||
// nolint:unused // kept for future use to ensure all queries go through slow query logging
 | 
			
		||||
func (d *Database) query(query string, args ...interface{}) (*sql.Rows, error) {
 | 
			
		||||
	start := time.Now()
 | 
			
		||||
	defer logSlowQuery(d.logger, query, start)
 | 
			
		||||
 | 
			
		||||
@ -4,6 +4,7 @@ package routewatch
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"log/slog"
 | 
			
		||||
	"os"
 | 
			
		||||
	"strings"
 | 
			
		||||
@ -11,6 +12,7 @@ import (
 | 
			
		||||
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/database"
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/metrics"
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/routingtable"
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/server"
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/streamer"
 | 
			
		||||
 | 
			
		||||
@ -22,6 +24,11 @@ type Config struct {
 | 
			
		||||
	MaxRuntime time.Duration // Maximum runtime (0 = run forever)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	// routingTableStatsInterval is how often we log routing table statistics
 | 
			
		||||
	routingTableStatsInterval = 15 * time.Second
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// NewConfig provides default configuration
 | 
			
		||||
func NewConfig() Config {
 | 
			
		||||
	return Config{
 | 
			
		||||
@ -34,6 +41,7 @@ type Dependencies struct {
 | 
			
		||||
	fx.In
 | 
			
		||||
 | 
			
		||||
	DB           database.Store
 | 
			
		||||
	RoutingTable *routingtable.RoutingTable
 | 
			
		||||
	Streamer     *streamer.Streamer
 | 
			
		||||
	Server       *server.Server
 | 
			
		||||
	Logger       *slog.Logger
 | 
			
		||||
@ -43,6 +51,7 @@ type Dependencies struct {
 | 
			
		||||
// RouteWatch represents the main application instance
 | 
			
		||||
type RouteWatch struct {
 | 
			
		||||
	db           database.Store
 | 
			
		||||
	routingTable *routingtable.RoutingTable
 | 
			
		||||
	streamer     *streamer.Streamer
 | 
			
		||||
	server       *server.Server
 | 
			
		||||
	logger       *slog.Logger
 | 
			
		||||
@ -53,6 +62,7 @@ type RouteWatch struct {
 | 
			
		||||
func New(deps Dependencies) *RouteWatch {
 | 
			
		||||
	return &RouteWatch{
 | 
			
		||||
		db:           deps.DB,
 | 
			
		||||
		routingTable: deps.RoutingTable,
 | 
			
		||||
		streamer:     deps.Streamer,
 | 
			
		||||
		server:       deps.Server,
 | 
			
		||||
		logger:       deps.Logger,
 | 
			
		||||
@ -76,10 +86,17 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
 | 
			
		||||
	dbHandler := NewDatabaseHandler(rw.db, rw.logger)
 | 
			
		||||
	rw.streamer.RegisterHandler(dbHandler)
 | 
			
		||||
 | 
			
		||||
	// Register routing table handler to maintain in-memory routing table
 | 
			
		||||
	rtHandler := NewRoutingTableHandler(rw.routingTable, rw.logger)
 | 
			
		||||
	rw.streamer.RegisterHandler(rtHandler)
 | 
			
		||||
 | 
			
		||||
	// Register peer tracking handler to track all peers
 | 
			
		||||
	peerHandler := NewPeerHandler(rw.db, rw.logger)
 | 
			
		||||
	rw.streamer.RegisterHandler(peerHandler)
 | 
			
		||||
 | 
			
		||||
	// Start periodic routing table stats logging
 | 
			
		||||
	go rw.logRoutingTableStats(ctx)
 | 
			
		||||
 | 
			
		||||
	// Start streaming
 | 
			
		||||
	if err := rw.streamer.Start(); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
@ -117,6 +134,32 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// logRoutingTableStats periodically logs routing table statistics
 | 
			
		||||
func (rw *RouteWatch) logRoutingTableStats(ctx context.Context) {
 | 
			
		||||
	// Log stats periodically
 | 
			
		||||
	ticker := time.NewTicker(routingTableStatsInterval)
 | 
			
		||||
	defer ticker.Stop()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			return
 | 
			
		||||
		case <-ticker.C:
 | 
			
		||||
			stats := rw.routingTable.GetDetailedStats()
 | 
			
		||||
			rw.logger.Info("Routing table statistics",
 | 
			
		||||
				"ipv4_routes", stats.IPv4Routes,
 | 
			
		||||
				"ipv6_routes", stats.IPv6Routes,
 | 
			
		||||
				"ipv4_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv4UpdatesRate),
 | 
			
		||||
				"ipv6_updates_per_sec", fmt.Sprintf("%.2f", stats.IPv6UpdatesRate),
 | 
			
		||||
				"total_routes", stats.TotalRoutes,
 | 
			
		||||
				"unique_prefixes", stats.UniquePrefixes,
 | 
			
		||||
				"unique_origins", stats.UniqueOrigins,
 | 
			
		||||
				"unique_peers", stats.UniquePeers,
 | 
			
		||||
			)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewLogger creates a structured logger
 | 
			
		||||
func NewLogger() *slog.Logger {
 | 
			
		||||
	level := slog.LevelInfo
 | 
			
		||||
@ -154,8 +197,12 @@ func getModule() fx.Option {
 | 
			
		||||
				},
 | 
			
		||||
				fx.As(new(database.Store)),
 | 
			
		||||
			),
 | 
			
		||||
			routingtable.New,
 | 
			
		||||
			streamer.New,
 | 
			
		||||
			fx.Annotate(
 | 
			
		||||
				server.New,
 | 
			
		||||
				fx.ParamTags(``, ``, ``, ``),
 | 
			
		||||
			),
 | 
			
		||||
			New,
 | 
			
		||||
		),
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
@ -9,6 +9,7 @@ import (
 | 
			
		||||
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/database"
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/metrics"
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/routingtable"
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/server"
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/streamer"
 | 
			
		||||
	"github.com/google/uuid"
 | 
			
		||||
@ -129,35 +130,6 @@ func (m *mockStore) RecordPeering(fromASNID, toASNID string, _ time.Time) error
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UpdateLiveRoute mock implementation
 | 
			
		||||
func (m *mockStore) UpdateLiveRoute(prefixID, originASNID uuid.UUID, peerASN int, _ string, _ time.Time) error {
 | 
			
		||||
	m.mu.Lock()
 | 
			
		||||
	defer m.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	key := prefixID.String() + "_" + originASNID.String() + "_" + string(rune(peerASN))
 | 
			
		||||
	if !m.Routes[key] {
 | 
			
		||||
		m.Routes[key] = true
 | 
			
		||||
		m.RouteCount++
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WithdrawLiveRoute mock implementation
 | 
			
		||||
func (m *mockStore) WithdrawLiveRoute(_ uuid.UUID, _ int, _ time.Time) error {
 | 
			
		||||
	m.mu.Lock()
 | 
			
		||||
	defer m.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	m.WithdrawalCount++
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetActiveLiveRoutes mock implementation
 | 
			
		||||
func (m *mockStore) GetActiveLiveRoutes() ([]database.LiveRoute, error) {
 | 
			
		||||
	return []database.LiveRoute{}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UpdatePeer mock implementation
 | 
			
		||||
func (m *mockStore) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
 | 
			
		||||
	// Simple mock - just return nil
 | 
			
		||||
@ -180,7 +152,6 @@ func (m *mockStore) GetStats() (database.Stats, error) {
 | 
			
		||||
		IPv4Prefixes: m.IPv4Prefixes,
 | 
			
		||||
		IPv6Prefixes: m.IPv6Prefixes,
 | 
			
		||||
		Peerings:     m.PeeringCount,
 | 
			
		||||
		LiveRoutes:   m.RouteCount,
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -197,12 +168,16 @@ func TestRouteWatchLiveFeed(t *testing.T) {
 | 
			
		||||
	// Create streamer
 | 
			
		||||
	s := streamer.New(logger, metricsTracker)
 | 
			
		||||
 | 
			
		||||
	// Create routing table
 | 
			
		||||
	rt := routingtable.New()
 | 
			
		||||
 | 
			
		||||
	// Create server
 | 
			
		||||
	srv := server.New(mockDB, s, logger)
 | 
			
		||||
	srv := server.New(mockDB, rt, s, logger)
 | 
			
		||||
 | 
			
		||||
	// Create RouteWatch with 5 second limit
 | 
			
		||||
	deps := Dependencies{
 | 
			
		||||
		DB:           mockDB,
 | 
			
		||||
		RoutingTable: rt,
 | 
			
		||||
		Streamer:     s,
 | 
			
		||||
		Server:       srv,
 | 
			
		||||
		Logger:       logger,
 | 
			
		||||
@ -242,8 +217,4 @@ func TestRouteWatchLiveFeed(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
	t.Logf("Recorded %d AS peering relationships in 5 seconds", stats.Peerings)
 | 
			
		||||
 | 
			
		||||
	if stats.LiveRoutes == 0 {
 | 
			
		||||
		t.Error("Expected to have some active routes")
 | 
			
		||||
	}
 | 
			
		||||
	t.Logf("Active routes: %d", stats.LiveRoutes)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -2,12 +2,16 @@ package routewatch
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"log/slog"
 | 
			
		||||
	"strconv"
 | 
			
		||||
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/database"
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/ristypes"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	// databaseHandlerQueueSize is the queue capacity for database operations
 | 
			
		||||
	databaseHandlerQueueSize = 100
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// DatabaseHandler handles BGP messages and stores them in the database
 | 
			
		||||
type DatabaseHandler struct {
 | 
			
		||||
	db     database.Store
 | 
			
		||||
@ -28,19 +32,17 @@ func (h *DatabaseHandler) WantsMessage(messageType string) bool {
 | 
			
		||||
	return messageType == "UPDATE"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// QueueCapacity returns the desired queue capacity for this handler
 | 
			
		||||
func (h *DatabaseHandler) QueueCapacity() int {
 | 
			
		||||
	// Database operations are slow, so use a smaller queue
 | 
			
		||||
	return databaseHandlerQueueSize
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// HandleMessage processes a RIS message and updates the database
 | 
			
		||||
func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
 | 
			
		||||
	// Use the pre-parsed timestamp
 | 
			
		||||
	timestamp := msg.ParsedTimestamp
 | 
			
		||||
 | 
			
		||||
	// Parse peer ASN
 | 
			
		||||
	peerASN, err := strconv.Atoi(msg.PeerASN)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		h.logger.Error("Failed to parse peer ASN", "peer_asn", msg.PeerASN, "error", err)
 | 
			
		||||
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Get origin ASN from path (last element)
 | 
			
		||||
	var originASN int
 | 
			
		||||
	if len(msg.Path) > 0 {
 | 
			
		||||
@ -51,7 +53,7 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
 | 
			
		||||
	for _, announcement := range msg.Announcements {
 | 
			
		||||
		for _, prefix := range announcement.Prefixes {
 | 
			
		||||
			// Get or create prefix
 | 
			
		||||
			p, err := h.db.GetOrCreatePrefix(prefix, timestamp)
 | 
			
		||||
			_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
 | 
			
		||||
 | 
			
		||||
@ -59,30 +61,13 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Get or create origin ASN
 | 
			
		||||
			asn, err := h.db.GetOrCreateASN(originASN, timestamp)
 | 
			
		||||
			_, err = h.db.GetOrCreateASN(originASN, timestamp)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				h.logger.Error("Failed to get/create ASN", "asn", originASN, "error", err)
 | 
			
		||||
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Update live route
 | 
			
		||||
			err = h.db.UpdateLiveRoute(
 | 
			
		||||
				p.ID,
 | 
			
		||||
				asn.ID,
 | 
			
		||||
				peerASN,
 | 
			
		||||
				announcement.NextHop,
 | 
			
		||||
				timestamp,
 | 
			
		||||
			)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				h.logger.Error("Failed to update live route",
 | 
			
		||||
					"prefix", prefix,
 | 
			
		||||
					"origin_asn", originASN,
 | 
			
		||||
					"peer_asn", peerASN,
 | 
			
		||||
					"error", err,
 | 
			
		||||
				)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// TODO: Record the announcement in the announcements table
 | 
			
		||||
			// Process AS path to update peerings
 | 
			
		||||
			if len(msg.Path) > 1 {
 | 
			
		||||
@ -122,23 +107,13 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
 | 
			
		||||
	// Process withdrawals
 | 
			
		||||
	for _, prefix := range msg.Withdrawals {
 | 
			
		||||
		// Get prefix
 | 
			
		||||
		p, err := h.db.GetOrCreatePrefix(prefix, timestamp)
 | 
			
		||||
		_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			h.logger.Error("Failed to get prefix for withdrawal", "prefix", prefix, "error", err)
 | 
			
		||||
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Withdraw the route
 | 
			
		||||
		err = h.db.WithdrawLiveRoute(p.ID, peerASN, timestamp)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			h.logger.Error("Failed to withdraw route",
 | 
			
		||||
				"prefix", prefix,
 | 
			
		||||
				"peer_asn", peerASN,
 | 
			
		||||
				"error", err,
 | 
			
		||||
			)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// TODO: Record the withdrawal in the withdrawals table
 | 
			
		||||
		// TODO: Record the withdrawal in the announcements table as a withdrawal
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -8,6 +8,11 @@ import (
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/ristypes"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	// peerHandlerQueueSize is the queue capacity for peer tracking operations
 | 
			
		||||
	peerHandlerQueueSize = 500
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// PeerHandler tracks BGP peers from all message types
 | 
			
		||||
type PeerHandler struct {
 | 
			
		||||
	db     database.Store
 | 
			
		||||
@ -27,6 +32,12 @@ func (h *PeerHandler) WantsMessage(_ string) bool {
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// QueueCapacity returns the desired queue capacity for this handler
 | 
			
		||||
func (h *PeerHandler) QueueCapacity() int {
 | 
			
		||||
	// Peer tracking is lightweight but involves database ops, use moderate queue
 | 
			
		||||
	return peerHandlerQueueSize
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// HandleMessage processes a message to track peer information
 | 
			
		||||
func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) {
 | 
			
		||||
	// Parse peer ASN from string
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										131
									
								
								internal/routewatch/routingtablehandler.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										131
									
								
								internal/routewatch/routingtablehandler.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,131 @@
 | 
			
		||||
package routewatch
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"log/slog"
 | 
			
		||||
	"strconv"
 | 
			
		||||
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/ristypes"
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/routingtable"
 | 
			
		||||
	"github.com/google/uuid"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	// routingTableHandlerQueueSize is the queue capacity for in-memory routing table operations
 | 
			
		||||
	routingTableHandlerQueueSize = 10000
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// RoutingTableHandler handles BGP messages and updates the in-memory routing table
 | 
			
		||||
type RoutingTableHandler struct {
 | 
			
		||||
	rt     *routingtable.RoutingTable
 | 
			
		||||
	logger *slog.Logger
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewRoutingTableHandler creates a new routing table handler
 | 
			
		||||
func NewRoutingTableHandler(rt *routingtable.RoutingTable, logger *slog.Logger) *RoutingTableHandler {
 | 
			
		||||
	return &RoutingTableHandler{
 | 
			
		||||
		rt:     rt,
 | 
			
		||||
		logger: logger,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WantsMessage returns true if this handler wants to process messages of the given type
 | 
			
		||||
func (h *RoutingTableHandler) WantsMessage(messageType string) bool {
 | 
			
		||||
	// We only care about UPDATE messages for the routing table
 | 
			
		||||
	return messageType == "UPDATE"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// QueueCapacity returns the desired queue capacity for this handler
 | 
			
		||||
func (h *RoutingTableHandler) QueueCapacity() int {
 | 
			
		||||
	// In-memory operations are very fast, so use a large queue
 | 
			
		||||
	return routingTableHandlerQueueSize
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// HandleMessage processes a RIS message and updates the routing table
 | 
			
		||||
func (h *RoutingTableHandler) HandleMessage(msg *ristypes.RISMessage) {
 | 
			
		||||
	// Use the pre-parsed timestamp
 | 
			
		||||
	timestamp := msg.ParsedTimestamp
 | 
			
		||||
 | 
			
		||||
	// Parse peer ASN
 | 
			
		||||
	peerASN, err := strconv.Atoi(msg.PeerASN)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		h.logger.Error("Failed to parse peer ASN", "peer_asn", msg.PeerASN, "error", err)
 | 
			
		||||
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Get origin ASN from path (last element)
 | 
			
		||||
	var originASN int
 | 
			
		||||
	if len(msg.Path) > 0 {
 | 
			
		||||
		originASN = msg.Path[len(msg.Path)-1]
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Process announcements
 | 
			
		||||
	for _, announcement := range msg.Announcements {
 | 
			
		||||
		for _, prefix := range announcement.Prefixes {
 | 
			
		||||
			// Generate deterministic UUIDs based on the prefix and origin ASN
 | 
			
		||||
			// This ensures consistency across restarts
 | 
			
		||||
			prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix))
 | 
			
		||||
			originASNID := uuid.NewSHA1(uuid.NameSpaceOID, []byte(strconv.Itoa(originASN)))
 | 
			
		||||
 | 
			
		||||
			// Create route for the routing table
 | 
			
		||||
			route := &routingtable.Route{
 | 
			
		||||
				PrefixID:    prefixID,
 | 
			
		||||
				Prefix:      prefix,
 | 
			
		||||
				OriginASNID: originASNID,
 | 
			
		||||
				OriginASN:   originASN,
 | 
			
		||||
				PeerASN:     peerASN,
 | 
			
		||||
				ASPath:      msg.Path,
 | 
			
		||||
				NextHop:     announcement.NextHop,
 | 
			
		||||
				AnnouncedAt: timestamp,
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// Add route to routing table
 | 
			
		||||
			h.rt.AddRoute(route)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Process withdrawals
 | 
			
		||||
	for _, prefix := range msg.Withdrawals {
 | 
			
		||||
		// Generate deterministic UUID for the prefix
 | 
			
		||||
		prefixID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(prefix))
 | 
			
		||||
 | 
			
		||||
		// Withdraw all routes for this prefix from this peer
 | 
			
		||||
		h.rt.WithdrawRoutesByPrefixAndPeer(prefixID, peerASN)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetRoutingTableStats returns statistics about the routing table
 | 
			
		||||
func (h *RoutingTableHandler) GetRoutingTableStats() map[string]int {
 | 
			
		||||
	return h.rt.Stats()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetActiveRouteCount returns the number of active routes
 | 
			
		||||
func (h *RoutingTableHandler) GetActiveRouteCount() int {
 | 
			
		||||
	return h.rt.Size()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetRoutesByPrefix returns all routes for a specific prefix
 | 
			
		||||
func (h *RoutingTableHandler) GetRoutesByPrefix(prefixID uuid.UUID) []*routingtable.Route {
 | 
			
		||||
	return h.rt.GetRoutesByPrefix(prefixID)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetRoutesByOriginASN returns all routes originated by a specific ASN
 | 
			
		||||
func (h *RoutingTableHandler) GetRoutesByOriginASN(originASNID uuid.UUID) []*routingtable.Route {
 | 
			
		||||
	return h.rt.GetRoutesByOriginASN(originASNID)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetRoutesByPeerASN returns all routes received from a specific peer ASN
 | 
			
		||||
func (h *RoutingTableHandler) GetRoutesByPeerASN(peerASN int) []*routingtable.Route {
 | 
			
		||||
	return h.rt.GetRoutesByPeerASN(peerASN)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetAllRoutes returns all active routes
 | 
			
		||||
func (h *RoutingTableHandler) GetAllRoutes() []*routingtable.Route {
 | 
			
		||||
	return h.rt.GetAllRoutes()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ClearRoutingTable clears all routes from the routing table
 | 
			
		||||
func (h *RoutingTableHandler) ClearRoutingTable() {
 | 
			
		||||
	h.rt.Clear()
 | 
			
		||||
	h.logger.Info("Cleared routing table")
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										397
									
								
								internal/routingtable/routingtable.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										397
									
								
								internal/routingtable/routingtable.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,397 @@
 | 
			
		||||
// Package routingtable provides a thread-safe in-memory representation of the DFZ routing table.
 | 
			
		||||
package routingtable
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/google/uuid"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Route represents a single route entry in the routing table
 | 
			
		||||
type Route struct {
 | 
			
		||||
	PrefixID    uuid.UUID `json:"prefix_id"`
 | 
			
		||||
	Prefix      string    `json:"prefix"` // The actual prefix string (e.g., "10.0.0.0/8")
 | 
			
		||||
	OriginASNID uuid.UUID `json:"origin_asn_id"`
 | 
			
		||||
	OriginASN   int       `json:"origin_asn"` // The actual ASN number
 | 
			
		||||
	PeerASN     int       `json:"peer_asn"`
 | 
			
		||||
	ASPath      []int     `json:"as_path"` // Full AS path
 | 
			
		||||
	NextHop     string    `json:"next_hop"`
 | 
			
		||||
	AnnouncedAt time.Time `json:"announced_at"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RouteKey uniquely identifies a route in the table
 | 
			
		||||
type RouteKey struct {
 | 
			
		||||
	PrefixID    uuid.UUID
 | 
			
		||||
	OriginASNID uuid.UUID
 | 
			
		||||
	PeerASN     int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RoutingTable is a thread-safe in-memory routing table
 | 
			
		||||
type RoutingTable struct {
 | 
			
		||||
	mu     sync.RWMutex
 | 
			
		||||
	routes map[RouteKey]*Route
 | 
			
		||||
 | 
			
		||||
	// Secondary indexes for efficient lookups
 | 
			
		||||
	byPrefix    map[uuid.UUID]map[RouteKey]*Route // Routes indexed by prefix ID
 | 
			
		||||
	byOriginASN map[uuid.UUID]map[RouteKey]*Route // Routes indexed by origin ASN ID
 | 
			
		||||
	byPeerASN   map[int]map[RouteKey]*Route       // Routes indexed by peer ASN
 | 
			
		||||
 | 
			
		||||
	// Metrics tracking
 | 
			
		||||
	ipv4Routes       int
 | 
			
		||||
	ipv6Routes       int
 | 
			
		||||
	ipv4Updates      uint64 // Updates counter for rate calculation
 | 
			
		||||
	ipv6Updates      uint64 // Updates counter for rate calculation
 | 
			
		||||
	lastMetricsReset time.Time
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// New creates a new empty routing table
 | 
			
		||||
func New() *RoutingTable {
 | 
			
		||||
	return &RoutingTable{
 | 
			
		||||
		routes:           make(map[RouteKey]*Route),
 | 
			
		||||
		byPrefix:         make(map[uuid.UUID]map[RouteKey]*Route),
 | 
			
		||||
		byOriginASN:      make(map[uuid.UUID]map[RouteKey]*Route),
 | 
			
		||||
		byPeerASN:        make(map[int]map[RouteKey]*Route),
 | 
			
		||||
		lastMetricsReset: time.Now(),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddRoute adds or updates a route in the routing table
 | 
			
		||||
func (rt *RoutingTable) AddRoute(route *Route) {
 | 
			
		||||
	rt.mu.Lock()
 | 
			
		||||
	defer rt.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	key := RouteKey{
 | 
			
		||||
		PrefixID:    route.PrefixID,
 | 
			
		||||
		OriginASNID: route.OriginASNID,
 | 
			
		||||
		PeerASN:     route.PeerASN,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// If route already exists, remove it from indexes first
 | 
			
		||||
	if existingRoute, exists := rt.routes[key]; exists {
 | 
			
		||||
		rt.removeFromIndexes(key, existingRoute)
 | 
			
		||||
		// Decrement counter for existing route
 | 
			
		||||
		if isIPv6(existingRoute.Prefix) {
 | 
			
		||||
			rt.ipv6Routes--
 | 
			
		||||
		} else {
 | 
			
		||||
			rt.ipv4Routes--
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Add to main map
 | 
			
		||||
	rt.routes[key] = route
 | 
			
		||||
 | 
			
		||||
	// Update indexes
 | 
			
		||||
	rt.addToIndexes(key, route)
 | 
			
		||||
 | 
			
		||||
	// Update metrics
 | 
			
		||||
	if isIPv6(route.Prefix) {
 | 
			
		||||
		rt.ipv6Routes++
 | 
			
		||||
		atomic.AddUint64(&rt.ipv6Updates, 1)
 | 
			
		||||
	} else {
 | 
			
		||||
		rt.ipv4Routes++
 | 
			
		||||
		atomic.AddUint64(&rt.ipv4Updates, 1)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RemoveRoute removes a route from the routing table
 | 
			
		||||
func (rt *RoutingTable) RemoveRoute(prefixID, originASNID uuid.UUID, peerASN int) bool {
 | 
			
		||||
	rt.mu.Lock()
 | 
			
		||||
	defer rt.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	key := RouteKey{
 | 
			
		||||
		PrefixID:    prefixID,
 | 
			
		||||
		OriginASNID: originASNID,
 | 
			
		||||
		PeerASN:     peerASN,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	route, exists := rt.routes[key]
 | 
			
		||||
	if !exists {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Remove from indexes
 | 
			
		||||
	rt.removeFromIndexes(key, route)
 | 
			
		||||
 | 
			
		||||
	// Remove from main map
 | 
			
		||||
	delete(rt.routes, key)
 | 
			
		||||
 | 
			
		||||
	// Update metrics
 | 
			
		||||
	if isIPv6(route.Prefix) {
 | 
			
		||||
		rt.ipv6Routes--
 | 
			
		||||
		atomic.AddUint64(&rt.ipv6Updates, 1)
 | 
			
		||||
	} else {
 | 
			
		||||
		rt.ipv4Routes--
 | 
			
		||||
		atomic.AddUint64(&rt.ipv4Updates, 1)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WithdrawRoutesByPrefixAndPeer removes all routes for a specific prefix from a specific peer
 | 
			
		||||
func (rt *RoutingTable) WithdrawRoutesByPrefixAndPeer(prefixID uuid.UUID, peerASN int) int {
 | 
			
		||||
	rt.mu.Lock()
 | 
			
		||||
	defer rt.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	prefixRoutes, exists := rt.byPrefix[prefixID]
 | 
			
		||||
	if !exists {
 | 
			
		||||
		return 0
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Collect keys to delete (can't delete while iterating)
 | 
			
		||||
	var keysToDelete []RouteKey
 | 
			
		||||
	for key, route := range prefixRoutes {
 | 
			
		||||
		if route.PeerASN == peerASN {
 | 
			
		||||
			keysToDelete = append(keysToDelete, key)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Delete the routes
 | 
			
		||||
	count := 0
 | 
			
		||||
	for _, key := range keysToDelete {
 | 
			
		||||
		route, exists := rt.routes[key]
 | 
			
		||||
		if !exists {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		rt.removeFromIndexes(key, route)
 | 
			
		||||
		delete(rt.routes, key)
 | 
			
		||||
		count++
 | 
			
		||||
 | 
			
		||||
		// Update metrics
 | 
			
		||||
		if isIPv6(route.Prefix) {
 | 
			
		||||
			rt.ipv6Routes--
 | 
			
		||||
			atomic.AddUint64(&rt.ipv6Updates, 1)
 | 
			
		||||
		} else {
 | 
			
		||||
			rt.ipv4Routes--
 | 
			
		||||
			atomic.AddUint64(&rt.ipv4Updates, 1)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return count
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetRoute retrieves a specific route
 | 
			
		||||
func (rt *RoutingTable) GetRoute(prefixID, originASNID uuid.UUID, peerASN int) (*Route, bool) {
 | 
			
		||||
	rt.mu.RLock()
 | 
			
		||||
	defer rt.mu.RUnlock()
 | 
			
		||||
 | 
			
		||||
	key := RouteKey{
 | 
			
		||||
		PrefixID:    prefixID,
 | 
			
		||||
		OriginASNID: originASNID,
 | 
			
		||||
		PeerASN:     peerASN,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	route, exists := rt.routes[key]
 | 
			
		||||
	if !exists {
 | 
			
		||||
		return nil, false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Return a copy to prevent external modification
 | 
			
		||||
	routeCopy := *route
 | 
			
		||||
 | 
			
		||||
	return &routeCopy, true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetRoutesByPrefix returns all routes for a specific prefix
 | 
			
		||||
func (rt *RoutingTable) GetRoutesByPrefix(prefixID uuid.UUID) []*Route {
 | 
			
		||||
	rt.mu.RLock()
 | 
			
		||||
	defer rt.mu.RUnlock()
 | 
			
		||||
 | 
			
		||||
	routes := make([]*Route, 0)
 | 
			
		||||
	if prefixRoutes, exists := rt.byPrefix[prefixID]; exists {
 | 
			
		||||
		for _, route := range prefixRoutes {
 | 
			
		||||
			routeCopy := *route
 | 
			
		||||
			routes = append(routes, &routeCopy)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return routes
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetRoutesByOriginASN returns all routes originated by a specific ASN
 | 
			
		||||
func (rt *RoutingTable) GetRoutesByOriginASN(originASNID uuid.UUID) []*Route {
 | 
			
		||||
	rt.mu.RLock()
 | 
			
		||||
	defer rt.mu.RUnlock()
 | 
			
		||||
 | 
			
		||||
	routes := make([]*Route, 0)
 | 
			
		||||
	if asnRoutes, exists := rt.byOriginASN[originASNID]; exists {
 | 
			
		||||
		for _, route := range asnRoutes {
 | 
			
		||||
			routeCopy := *route
 | 
			
		||||
			routes = append(routes, &routeCopy)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return routes
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetRoutesByPeerASN returns all routes received from a specific peer ASN
 | 
			
		||||
func (rt *RoutingTable) GetRoutesByPeerASN(peerASN int) []*Route {
 | 
			
		||||
	rt.mu.RLock()
 | 
			
		||||
	defer rt.mu.RUnlock()
 | 
			
		||||
 | 
			
		||||
	routes := make([]*Route, 0)
 | 
			
		||||
	if peerRoutes, exists := rt.byPeerASN[peerASN]; exists {
 | 
			
		||||
		for _, route := range peerRoutes {
 | 
			
		||||
			routeCopy := *route
 | 
			
		||||
			routes = append(routes, &routeCopy)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return routes
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetAllRoutes returns all active routes in the routing table
 | 
			
		||||
func (rt *RoutingTable) GetAllRoutes() []*Route {
 | 
			
		||||
	rt.mu.RLock()
 | 
			
		||||
	defer rt.mu.RUnlock()
 | 
			
		||||
 | 
			
		||||
	routes := make([]*Route, 0, len(rt.routes))
 | 
			
		||||
	for _, route := range rt.routes {
 | 
			
		||||
		routeCopy := *route
 | 
			
		||||
		routes = append(routes, &routeCopy)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return routes
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Size returns the total number of routes in the table
 | 
			
		||||
func (rt *RoutingTable) Size() int {
 | 
			
		||||
	rt.mu.RLock()
 | 
			
		||||
	defer rt.mu.RUnlock()
 | 
			
		||||
 | 
			
		||||
	return len(rt.routes)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Stats returns statistics about the routing table
 | 
			
		||||
func (rt *RoutingTable) Stats() map[string]int {
 | 
			
		||||
	rt.mu.RLock()
 | 
			
		||||
	defer rt.mu.RUnlock()
 | 
			
		||||
 | 
			
		||||
	stats := map[string]int{
 | 
			
		||||
		"total_routes":    len(rt.routes),
 | 
			
		||||
		"unique_prefixes": len(rt.byPrefix),
 | 
			
		||||
		"unique_origins":  len(rt.byOriginASN),
 | 
			
		||||
		"unique_peers":    len(rt.byPeerASN),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return stats
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// DetailedStats contains detailed routing table statistics
 | 
			
		||||
type DetailedStats struct {
 | 
			
		||||
	IPv4Routes      int
 | 
			
		||||
	IPv6Routes      int
 | 
			
		||||
	IPv4UpdatesRate float64
 | 
			
		||||
	IPv6UpdatesRate float64
 | 
			
		||||
	TotalRoutes     int
 | 
			
		||||
	UniquePrefixes  int
 | 
			
		||||
	UniqueOrigins   int
 | 
			
		||||
	UniquePeers     int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetDetailedStats returns detailed statistics including IPv4/IPv6 breakdown and update rates
 | 
			
		||||
func (rt *RoutingTable) GetDetailedStats() DetailedStats {
 | 
			
		||||
	rt.mu.Lock()
 | 
			
		||||
	defer rt.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	// Calculate update rates
 | 
			
		||||
	elapsed := time.Since(rt.lastMetricsReset).Seconds()
 | 
			
		||||
	ipv4Updates := atomic.LoadUint64(&rt.ipv4Updates)
 | 
			
		||||
	ipv6Updates := atomic.LoadUint64(&rt.ipv6Updates)
 | 
			
		||||
 | 
			
		||||
	stats := DetailedStats{
 | 
			
		||||
		IPv4Routes:      rt.ipv4Routes,
 | 
			
		||||
		IPv6Routes:      rt.ipv6Routes,
 | 
			
		||||
		IPv4UpdatesRate: float64(ipv4Updates) / elapsed,
 | 
			
		||||
		IPv6UpdatesRate: float64(ipv6Updates) / elapsed,
 | 
			
		||||
		TotalRoutes:     len(rt.routes),
 | 
			
		||||
		UniquePrefixes:  len(rt.byPrefix),
 | 
			
		||||
		UniqueOrigins:   len(rt.byOriginASN),
 | 
			
		||||
		UniquePeers:     len(rt.byPeerASN),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Reset counters for next period
 | 
			
		||||
	atomic.StoreUint64(&rt.ipv4Updates, 0)
 | 
			
		||||
	atomic.StoreUint64(&rt.ipv6Updates, 0)
 | 
			
		||||
	rt.lastMetricsReset = time.Now()
 | 
			
		||||
 | 
			
		||||
	return stats
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Clear removes all routes from the routing table
 | 
			
		||||
func (rt *RoutingTable) Clear() {
 | 
			
		||||
	rt.mu.Lock()
 | 
			
		||||
	defer rt.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	rt.routes = make(map[RouteKey]*Route)
 | 
			
		||||
	rt.byPrefix = make(map[uuid.UUID]map[RouteKey]*Route)
 | 
			
		||||
	rt.byOriginASN = make(map[uuid.UUID]map[RouteKey]*Route)
 | 
			
		||||
	rt.byPeerASN = make(map[int]map[RouteKey]*Route)
 | 
			
		||||
	rt.ipv4Routes = 0
 | 
			
		||||
	rt.ipv6Routes = 0
 | 
			
		||||
	atomic.StoreUint64(&rt.ipv4Updates, 0)
 | 
			
		||||
	atomic.StoreUint64(&rt.ipv6Updates, 0)
 | 
			
		||||
	rt.lastMetricsReset = time.Now()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Helper methods for index management
 | 
			
		||||
 | 
			
		||||
func (rt *RoutingTable) addToIndexes(key RouteKey, route *Route) {
 | 
			
		||||
	// Add to prefix index
 | 
			
		||||
	if rt.byPrefix[route.PrefixID] == nil {
 | 
			
		||||
		rt.byPrefix[route.PrefixID] = make(map[RouteKey]*Route)
 | 
			
		||||
	}
 | 
			
		||||
	rt.byPrefix[route.PrefixID][key] = route
 | 
			
		||||
 | 
			
		||||
	// Add to origin ASN index
 | 
			
		||||
	if rt.byOriginASN[route.OriginASNID] == nil {
 | 
			
		||||
		rt.byOriginASN[route.OriginASNID] = make(map[RouteKey]*Route)
 | 
			
		||||
	}
 | 
			
		||||
	rt.byOriginASN[route.OriginASNID][key] = route
 | 
			
		||||
 | 
			
		||||
	// Add to peer ASN index
 | 
			
		||||
	if rt.byPeerASN[route.PeerASN] == nil {
 | 
			
		||||
		rt.byPeerASN[route.PeerASN] = make(map[RouteKey]*Route)
 | 
			
		||||
	}
 | 
			
		||||
	rt.byPeerASN[route.PeerASN][key] = route
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (rt *RoutingTable) removeFromIndexes(key RouteKey, route *Route) {
 | 
			
		||||
	// Remove from prefix index
 | 
			
		||||
	if prefixRoutes, exists := rt.byPrefix[route.PrefixID]; exists {
 | 
			
		||||
		delete(prefixRoutes, key)
 | 
			
		||||
		if len(prefixRoutes) == 0 {
 | 
			
		||||
			delete(rt.byPrefix, route.PrefixID)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Remove from origin ASN index
 | 
			
		||||
	if asnRoutes, exists := rt.byOriginASN[route.OriginASNID]; exists {
 | 
			
		||||
		delete(asnRoutes, key)
 | 
			
		||||
		if len(asnRoutes) == 0 {
 | 
			
		||||
			delete(rt.byOriginASN, route.OriginASNID)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Remove from peer ASN index
 | 
			
		||||
	if peerRoutes, exists := rt.byPeerASN[route.PeerASN]; exists {
 | 
			
		||||
		delete(peerRoutes, key)
 | 
			
		||||
		if len(peerRoutes) == 0 {
 | 
			
		||||
			delete(rt.byPeerASN, route.PeerASN)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// String returns a string representation of the route key
 | 
			
		||||
func (k RouteKey) String() string {
 | 
			
		||||
	return fmt.Sprintf("%s/%s/%d", k.PrefixID, k.OriginASNID, k.PeerASN)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// isIPv6 returns true if the prefix is an IPv6 address
 | 
			
		||||
func isIPv6(prefix string) bool {
 | 
			
		||||
	return strings.Contains(prefix, ":")
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										219
									
								
								internal/routingtable/routingtable_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										219
									
								
								internal/routingtable/routingtable_test.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,219 @@
 | 
			
		||||
package routingtable
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/google/uuid"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestRoutingTable(t *testing.T) {
 | 
			
		||||
	rt := New()
 | 
			
		||||
 | 
			
		||||
	// Test data
 | 
			
		||||
	prefixID1 := uuid.New()
 | 
			
		||||
	prefixID2 := uuid.New()
 | 
			
		||||
	originASNID1 := uuid.New()
 | 
			
		||||
	originASNID2 := uuid.New()
 | 
			
		||||
 | 
			
		||||
	route1 := &Route{
 | 
			
		||||
		PrefixID:    prefixID1,
 | 
			
		||||
		Prefix:      "10.0.0.0/8",
 | 
			
		||||
		OriginASNID: originASNID1,
 | 
			
		||||
		OriginASN:   64512,
 | 
			
		||||
		PeerASN:     64513,
 | 
			
		||||
		ASPath:      []int{64513, 64512},
 | 
			
		||||
		NextHop:     "192.168.1.1",
 | 
			
		||||
		AnnouncedAt: time.Now(),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	route2 := &Route{
 | 
			
		||||
		PrefixID:    prefixID2,
 | 
			
		||||
		Prefix:      "192.168.0.0/16",
 | 
			
		||||
		OriginASNID: originASNID2,
 | 
			
		||||
		OriginASN:   64514,
 | 
			
		||||
		PeerASN:     64513,
 | 
			
		||||
		ASPath:      []int{64513, 64514},
 | 
			
		||||
		NextHop:     "192.168.1.1",
 | 
			
		||||
		AnnouncedAt: time.Now(),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Test AddRoute
 | 
			
		||||
	rt.AddRoute(route1)
 | 
			
		||||
	rt.AddRoute(route2)
 | 
			
		||||
 | 
			
		||||
	if rt.Size() != 2 {
 | 
			
		||||
		t.Errorf("Expected 2 routes, got %d", rt.Size())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Test GetRoute
 | 
			
		||||
	retrievedRoute, exists := rt.GetRoute(prefixID1, originASNID1, 64513)
 | 
			
		||||
	if !exists {
 | 
			
		||||
		t.Error("Route 1 should exist")
 | 
			
		||||
	}
 | 
			
		||||
	if retrievedRoute.Prefix != "10.0.0.0/8" {
 | 
			
		||||
		t.Errorf("Expected prefix 10.0.0.0/8, got %s", retrievedRoute.Prefix)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Test GetRoutesByPrefix
 | 
			
		||||
	prefixRoutes := rt.GetRoutesByPrefix(prefixID1)
 | 
			
		||||
	if len(prefixRoutes) != 1 {
 | 
			
		||||
		t.Errorf("Expected 1 route for prefix, got %d", len(prefixRoutes))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Test GetRoutesByPeerASN
 | 
			
		||||
	peerRoutes := rt.GetRoutesByPeerASN(64513)
 | 
			
		||||
	if len(peerRoutes) != 2 {
 | 
			
		||||
		t.Errorf("Expected 2 routes from peer 64513, got %d", len(peerRoutes))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Test RemoveRoute
 | 
			
		||||
	removed := rt.RemoveRoute(prefixID1, originASNID1, 64513)
 | 
			
		||||
	if !removed {
 | 
			
		||||
		t.Error("Route should have been removed")
 | 
			
		||||
	}
 | 
			
		||||
	if rt.Size() != 1 {
 | 
			
		||||
		t.Errorf("Expected 1 route after removal, got %d", rt.Size())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Test WithdrawRoutesByPrefixAndPeer
 | 
			
		||||
	// Add the route back first
 | 
			
		||||
	rt.AddRoute(route1)
 | 
			
		||||
 | 
			
		||||
	// Add another route for the same prefix from the same peer
 | 
			
		||||
	route3 := &Route{
 | 
			
		||||
		PrefixID:    prefixID1,
 | 
			
		||||
		Prefix:      "10.0.0.0/8",
 | 
			
		||||
		OriginASNID: originASNID2, // Different origin
 | 
			
		||||
		OriginASN:   64515,
 | 
			
		||||
		PeerASN:     64513,
 | 
			
		||||
		ASPath:      []int{64513, 64515},
 | 
			
		||||
		NextHop:     "192.168.1.1",
 | 
			
		||||
		AnnouncedAt: time.Now(),
 | 
			
		||||
	}
 | 
			
		||||
	rt.AddRoute(route3)
 | 
			
		||||
 | 
			
		||||
	count := rt.WithdrawRoutesByPrefixAndPeer(prefixID1, 64513)
 | 
			
		||||
	if count != 2 {
 | 
			
		||||
		t.Errorf("Expected to withdraw 2 routes, withdrew %d", count)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Should only have route2 left
 | 
			
		||||
	if rt.Size() != 1 {
 | 
			
		||||
		t.Errorf("Expected 1 route after withdrawal, got %d", rt.Size())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Test Stats
 | 
			
		||||
	stats := rt.Stats()
 | 
			
		||||
	if stats["total_routes"] != 1 {
 | 
			
		||||
		t.Errorf("Expected 1 total route in stats, got %d", stats["total_routes"])
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Test Clear
 | 
			
		||||
	rt.Clear()
 | 
			
		||||
	if rt.Size() != 0 {
 | 
			
		||||
		t.Errorf("Expected 0 routes after clear, got %d", rt.Size())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestRoutingTableConcurrency(t *testing.T) {
 | 
			
		||||
	rt := New()
 | 
			
		||||
 | 
			
		||||
	// Test concurrent access
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	numGoroutines := 10
 | 
			
		||||
	numOperations := 100
 | 
			
		||||
 | 
			
		||||
	// Start multiple goroutines that add/remove routes
 | 
			
		||||
	for i := 0; i < numGoroutines; i++ {
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		go func(id int) {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
 | 
			
		||||
			for j := 0; j < numOperations; j++ {
 | 
			
		||||
				prefixID := uuid.New()
 | 
			
		||||
				originASNID := uuid.New()
 | 
			
		||||
 | 
			
		||||
				route := &Route{
 | 
			
		||||
					PrefixID:    prefixID,
 | 
			
		||||
					Prefix:      "10.0.0.0/8",
 | 
			
		||||
					OriginASNID: originASNID,
 | 
			
		||||
					OriginASN:   64512 + id,
 | 
			
		||||
					PeerASN:     64500,
 | 
			
		||||
					ASPath:      []int{64500, 64512 + id},
 | 
			
		||||
					NextHop:     "192.168.1.1",
 | 
			
		||||
					AnnouncedAt: time.Now(),
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				// Add route
 | 
			
		||||
				rt.AddRoute(route)
 | 
			
		||||
 | 
			
		||||
				// Try to get it
 | 
			
		||||
				_, _ = rt.GetRoute(prefixID, originASNID, 64500)
 | 
			
		||||
 | 
			
		||||
				// Get stats
 | 
			
		||||
				_ = rt.Stats()
 | 
			
		||||
 | 
			
		||||
				// Remove it
 | 
			
		||||
				rt.RemoveRoute(prefixID, originASNID, 64500)
 | 
			
		||||
			}
 | 
			
		||||
		}(i)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
 | 
			
		||||
	// Table should be empty after all operations
 | 
			
		||||
	if rt.Size() != 0 {
 | 
			
		||||
		t.Errorf("Expected empty table after concurrent operations, got %d routes", rt.Size())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestRouteUpdate(t *testing.T) {
 | 
			
		||||
	rt := New()
 | 
			
		||||
 | 
			
		||||
	prefixID := uuid.New()
 | 
			
		||||
	originASNID := uuid.New()
 | 
			
		||||
 | 
			
		||||
	route1 := &Route{
 | 
			
		||||
		PrefixID:    prefixID,
 | 
			
		||||
		Prefix:      "10.0.0.0/8",
 | 
			
		||||
		OriginASNID: originASNID,
 | 
			
		||||
		OriginASN:   64512,
 | 
			
		||||
		PeerASN:     64513,
 | 
			
		||||
		ASPath:      []int{64513, 64512},
 | 
			
		||||
		NextHop:     "192.168.1.1",
 | 
			
		||||
		AnnouncedAt: time.Now(),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Add initial route
 | 
			
		||||
	rt.AddRoute(route1)
 | 
			
		||||
 | 
			
		||||
	// Update the same route with new next hop
 | 
			
		||||
	route2 := &Route{
 | 
			
		||||
		PrefixID:    prefixID,
 | 
			
		||||
		Prefix:      "10.0.0.0/8",
 | 
			
		||||
		OriginASNID: originASNID,
 | 
			
		||||
		OriginASN:   64512,
 | 
			
		||||
		PeerASN:     64513,
 | 
			
		||||
		ASPath:      []int{64513, 64512},
 | 
			
		||||
		NextHop:     "192.168.1.2", // Changed
 | 
			
		||||
		AnnouncedAt: time.Now().Add(1 * time.Minute),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	rt.AddRoute(route2)
 | 
			
		||||
 | 
			
		||||
	// Should still have only 1 route
 | 
			
		||||
	if rt.Size() != 1 {
 | 
			
		||||
		t.Errorf("Expected 1 route after update, got %d", rt.Size())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Check that the route was updated
 | 
			
		||||
	retrievedRoute, exists := rt.GetRoute(prefixID, originASNID, 64513)
 | 
			
		||||
	if !exists {
 | 
			
		||||
		t.Error("Route should exist after update")
 | 
			
		||||
	}
 | 
			
		||||
	if retrievedRoute.NextHop != "192.168.1.2" {
 | 
			
		||||
		t.Errorf("Expected updated next hop 192.168.1.2, got %s", retrievedRoute.NextHop)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@ -10,6 +10,7 @@ import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/database"
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/routingtable"
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/streamer"
 | 
			
		||||
	"git.eeqj.de/sneak/routewatch/internal/templates"
 | 
			
		||||
	"github.com/go-chi/chi/v5"
 | 
			
		||||
@ -20,15 +21,17 @@ import (
 | 
			
		||||
type Server struct {
 | 
			
		||||
	router       *chi.Mux
 | 
			
		||||
	db           database.Store
 | 
			
		||||
	routingTable *routingtable.RoutingTable
 | 
			
		||||
	streamer     *streamer.Streamer
 | 
			
		||||
	logger       *slog.Logger
 | 
			
		||||
	srv          *http.Server
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// New creates a new HTTP server
 | 
			
		||||
func New(db database.Store, streamer *streamer.Streamer, logger *slog.Logger) *Server {
 | 
			
		||||
func New(db database.Store, rt *routingtable.RoutingTable, streamer *streamer.Streamer, logger *slog.Logger) *Server {
 | 
			
		||||
	s := &Server{
 | 
			
		||||
		db:           db,
 | 
			
		||||
		routingTable: rt,
 | 
			
		||||
		streamer:     streamer,
 | 
			
		||||
		logger:       logger,
 | 
			
		||||
	}
 | 
			
		||||
@ -200,7 +203,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
 | 
			
		||||
			IPv4Prefixes:   dbStats.IPv4Prefixes,
 | 
			
		||||
			IPv6Prefixes:   dbStats.IPv6Prefixes,
 | 
			
		||||
			Peerings:       dbStats.Peerings,
 | 
			
		||||
			LiveRoutes:     dbStats.LiveRoutes,
 | 
			
		||||
			LiveRoutes:     s.routingTable.Size(),
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		w.Header().Set("Content-Type", "application/json")
 | 
			
		||||
@ -300,7 +303,7 @@ func (s *Server) handleStats() http.HandlerFunc {
 | 
			
		||||
			IPv4Prefixes:   dbStats.IPv4Prefixes,
 | 
			
		||||
			IPv6Prefixes:   dbStats.IPv6Prefixes,
 | 
			
		||||
			Peerings:       dbStats.Peerings,
 | 
			
		||||
			LiveRoutes:     dbStats.LiveRoutes,
 | 
			
		||||
			LiveRoutes:     s.routingTable.Size(),
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		w.Header().Set("Content-Type", "application/json")
 | 
			
		||||
 | 
			
		||||
@ -25,7 +25,7 @@ const (
 | 
			
		||||
	metricsLogInterval    = 10 * time.Second
 | 
			
		||||
	bytesPerKB            = 1024
 | 
			
		||||
	bytesPerMB            = 1024 * 1024
 | 
			
		||||
	maxConcurrentHandlers = 100 // Maximum number of concurrent message handlers
 | 
			
		||||
	maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// MessageHandler is an interface for handling RIS messages
 | 
			
		||||
@ -35,23 +35,43 @@ type MessageHandler interface {
 | 
			
		||||
 | 
			
		||||
	// HandleMessage processes a RIS message
 | 
			
		||||
	HandleMessage(msg *ristypes.RISMessage)
 | 
			
		||||
 | 
			
		||||
	// QueueCapacity returns the desired queue capacity for this handler
 | 
			
		||||
	// Handlers that process quickly can have larger queues
 | 
			
		||||
	QueueCapacity() int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RawMessageHandler is a callback for handling raw JSON lines from the stream
 | 
			
		||||
type RawMessageHandler func(line string)
 | 
			
		||||
 | 
			
		||||
// handlerMetrics tracks performance metrics for a handler
 | 
			
		||||
type handlerMetrics struct {
 | 
			
		||||
	processedCount uint64        // Total messages processed
 | 
			
		||||
	droppedCount   uint64        // Total messages dropped
 | 
			
		||||
	totalTime      time.Duration // Total processing time (for average calculation)
 | 
			
		||||
	minTime        time.Duration // Minimum processing time
 | 
			
		||||
	maxTime        time.Duration // Maximum processing time
 | 
			
		||||
	mu             sync.Mutex    // Protects the metrics
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// handlerInfo wraps a handler with its queue and metrics
 | 
			
		||||
type handlerInfo struct {
 | 
			
		||||
	handler MessageHandler
 | 
			
		||||
	queue   chan *ristypes.RISMessage
 | 
			
		||||
	metrics handlerMetrics
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Streamer handles streaming BGP updates from RIS Live
 | 
			
		||||
type Streamer struct {
 | 
			
		||||
	logger       *slog.Logger
 | 
			
		||||
	client       *http.Client
 | 
			
		||||
	handlers        []MessageHandler
 | 
			
		||||
	handlers     []*handlerInfo
 | 
			
		||||
	rawHandler   RawMessageHandler
 | 
			
		||||
	mu           sync.RWMutex
 | 
			
		||||
	cancel       context.CancelFunc
 | 
			
		||||
	running      bool
 | 
			
		||||
	metrics      *metrics.Tracker
 | 
			
		||||
	semaphore       chan struct{} // Limits concurrent message processing
 | 
			
		||||
	droppedMessages uint64        // Atomic counter for dropped messages
 | 
			
		||||
	totalDropped uint64 // Total dropped messages across all handlers
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// New creates a new RIS streamer
 | 
			
		||||
@ -61,9 +81,8 @@ func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer {
 | 
			
		||||
		client: &http.Client{
 | 
			
		||||
			Timeout: 0, // No timeout for streaming
 | 
			
		||||
		},
 | 
			
		||||
		handlers:  make([]MessageHandler, 0),
 | 
			
		||||
		handlers: make([]*handlerInfo, 0),
 | 
			
		||||
		metrics:  metrics,
 | 
			
		||||
		semaphore: make(chan struct{}, maxConcurrentHandlers),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -71,7 +90,19 @@ func New(logger *slog.Logger, metrics *metrics.Tracker) *Streamer {
 | 
			
		||||
func (s *Streamer) RegisterHandler(handler MessageHandler) {
 | 
			
		||||
	s.mu.Lock()
 | 
			
		||||
	defer s.mu.Unlock()
 | 
			
		||||
	s.handlers = append(s.handlers, handler)
 | 
			
		||||
 | 
			
		||||
	// Create handler info with its own queue based on capacity
 | 
			
		||||
	info := &handlerInfo{
 | 
			
		||||
		handler: handler,
 | 
			
		||||
		queue:   make(chan *ristypes.RISMessage, handler.QueueCapacity()),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s.handlers = append(s.handlers, info)
 | 
			
		||||
 | 
			
		||||
	// If we're already running, start a worker for this handler
 | 
			
		||||
	if s.running {
 | 
			
		||||
		go s.runHandlerWorker(info)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RegisterRawHandler sets a callback for raw message lines
 | 
			
		||||
@ -94,6 +125,11 @@ func (s *Streamer) Start() error {
 | 
			
		||||
	s.cancel = cancel
 | 
			
		||||
	s.running = true
 | 
			
		||||
 | 
			
		||||
	// Start workers for each handler
 | 
			
		||||
	for _, info := range s.handlers {
 | 
			
		||||
		go s.runHandlerWorker(info)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		if err := s.stream(ctx); err != nil {
 | 
			
		||||
			s.logger.Error("Streaming error", "error", err)
 | 
			
		||||
@ -112,10 +148,40 @@ func (s *Streamer) Stop() {
 | 
			
		||||
	if s.cancel != nil {
 | 
			
		||||
		s.cancel()
 | 
			
		||||
	}
 | 
			
		||||
	// Close all handler queues to signal workers to stop
 | 
			
		||||
	for _, info := range s.handlers {
 | 
			
		||||
		close(info.queue)
 | 
			
		||||
	}
 | 
			
		||||
	s.running = false
 | 
			
		||||
	s.mu.Unlock()
 | 
			
		||||
	s.metrics.SetConnected(false)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// runHandlerWorker processes messages for a specific handler
 | 
			
		||||
func (s *Streamer) runHandlerWorker(info *handlerInfo) {
 | 
			
		||||
	for msg := range info.queue {
 | 
			
		||||
		start := time.Now()
 | 
			
		||||
		info.handler.HandleMessage(msg)
 | 
			
		||||
		elapsed := time.Since(start)
 | 
			
		||||
 | 
			
		||||
		// Update metrics
 | 
			
		||||
		info.metrics.mu.Lock()
 | 
			
		||||
		info.metrics.processedCount++
 | 
			
		||||
		info.metrics.totalTime += elapsed
 | 
			
		||||
 | 
			
		||||
		// Update min time
 | 
			
		||||
		if info.metrics.minTime == 0 || elapsed < info.metrics.minTime {
 | 
			
		||||
			info.metrics.minTime = elapsed
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Update max time
 | 
			
		||||
		if elapsed > info.metrics.maxTime {
 | 
			
		||||
			info.metrics.maxTime = elapsed
 | 
			
		||||
		}
 | 
			
		||||
		info.metrics.mu.Unlock()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// IsRunning returns whether the streamer is currently active
 | 
			
		||||
func (s *Streamer) IsRunning() bool {
 | 
			
		||||
	s.mu.RLock()
 | 
			
		||||
@ -131,7 +197,7 @@ func (s *Streamer) GetMetrics() metrics.StreamMetrics {
 | 
			
		||||
 | 
			
		||||
// GetDroppedMessages returns the total number of dropped messages
 | 
			
		||||
func (s *Streamer) GetDroppedMessages() uint64 {
 | 
			
		||||
	return atomic.LoadUint64(&s.droppedMessages)
 | 
			
		||||
	return atomic.LoadUint64(&s.totalDropped)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// logMetrics logs the current streaming statistics
 | 
			
		||||
@ -140,18 +206,57 @@ func (s *Streamer) logMetrics() {
 | 
			
		||||
	uptime := time.Since(metrics.ConnectedSince)
 | 
			
		||||
 | 
			
		||||
	const bitsPerMegabit = 1000000
 | 
			
		||||
	droppedMessages := atomic.LoadUint64(&s.droppedMessages)
 | 
			
		||||
	s.logger.Info("Stream statistics",
 | 
			
		||||
		"uptime", uptime,
 | 
			
		||||
		"total_messages", metrics.TotalMessages,
 | 
			
		||||
		"total_bytes", metrics.TotalBytes,
 | 
			
		||||
		"total_mb", fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
 | 
			
		||||
		"messages_per_sec", fmt.Sprintf("%.2f", metrics.MessagesPerSec),
 | 
			
		||||
		"bits_per_sec", fmt.Sprintf("%.0f", metrics.BitsPerSec),
 | 
			
		||||
		"mbps", fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
 | 
			
		||||
		"dropped_messages", droppedMessages,
 | 
			
		||||
		"active_handlers", len(s.semaphore),
 | 
			
		||||
	totalDropped := atomic.LoadUint64(&s.totalDropped)
 | 
			
		||||
 | 
			
		||||
	s.logger.Info(
 | 
			
		||||
		"Stream statistics",
 | 
			
		||||
		"uptime",
 | 
			
		||||
		uptime,
 | 
			
		||||
		"total_messages",
 | 
			
		||||
		metrics.TotalMessages,
 | 
			
		||||
		"total_bytes",
 | 
			
		||||
		metrics.TotalBytes,
 | 
			
		||||
		"total_mb",
 | 
			
		||||
		fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
 | 
			
		||||
		"messages_per_sec",
 | 
			
		||||
		fmt.Sprintf("%.2f", metrics.MessagesPerSec),
 | 
			
		||||
		"bits_per_sec",
 | 
			
		||||
		fmt.Sprintf("%.0f", metrics.BitsPerSec),
 | 
			
		||||
		"mbps",
 | 
			
		||||
		fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
 | 
			
		||||
		"total_dropped",
 | 
			
		||||
		totalDropped,
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	// Log per-handler statistics
 | 
			
		||||
	s.mu.RLock()
 | 
			
		||||
	for i, info := range s.handlers {
 | 
			
		||||
		info.metrics.mu.Lock()
 | 
			
		||||
		if info.metrics.processedCount > 0 {
 | 
			
		||||
			// Safe conversion: processedCount is bounded by maxInt64
 | 
			
		||||
			processedCount := info.metrics.processedCount
 | 
			
		||||
			const maxInt64 = 1<<63 - 1
 | 
			
		||||
			if processedCount > maxInt64 {
 | 
			
		||||
				processedCount = maxInt64
 | 
			
		||||
			}
 | 
			
		||||
			//nolint:gosec // processedCount is explicitly bounded above
 | 
			
		||||
			avgTime := info.metrics.totalTime / time.Duration(processedCount)
 | 
			
		||||
			s.logger.Info(
 | 
			
		||||
				"Handler statistics",
 | 
			
		||||
				"handler", fmt.Sprintf("%T", info.handler),
 | 
			
		||||
				"index", i,
 | 
			
		||||
				"queue_len", len(info.queue),
 | 
			
		||||
				"queue_cap", cap(info.queue),
 | 
			
		||||
				"processed", info.metrics.processedCount,
 | 
			
		||||
				"dropped", info.metrics.droppedCount,
 | 
			
		||||
				"avg_time", avgTime,
 | 
			
		||||
				"min_time", info.metrics.minTime,
 | 
			
		||||
				"max_time", info.metrics.maxTime,
 | 
			
		||||
			)
 | 
			
		||||
		}
 | 
			
		||||
		info.metrics.mu.Unlock()
 | 
			
		||||
	}
 | 
			
		||||
	s.mu.RUnlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// updateMetrics updates the metrics counters and rates
 | 
			
		||||
@ -226,25 +331,12 @@ func (s *Streamer) stream(ctx context.Context) error {
 | 
			
		||||
			rawHandler(string(line))
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Get current handlers
 | 
			
		||||
		s.mu.RLock()
 | 
			
		||||
		handlers := make([]MessageHandler, len(s.handlers))
 | 
			
		||||
		copy(handlers, s.handlers)
 | 
			
		||||
		s.mu.RUnlock()
 | 
			
		||||
 | 
			
		||||
		// Try to acquire semaphore, drop message if at capacity
 | 
			
		||||
		select {
 | 
			
		||||
		case s.semaphore <- struct{}{}:
 | 
			
		||||
			// Successfully acquired semaphore, process message
 | 
			
		||||
			go func(rawLine []byte, messageHandlers []MessageHandler) {
 | 
			
		||||
				defer func() { <-s.semaphore }() // Release semaphore when done
 | 
			
		||||
 | 
			
		||||
				// Parse the outer wrapper first
 | 
			
		||||
		// Parse the message first
 | 
			
		||||
		var wrapper ristypes.RISLiveMessage
 | 
			
		||||
				if err := json.Unmarshal(rawLine, &wrapper); err != nil {
 | 
			
		||||
		if err := json.Unmarshal(line, &wrapper); err != nil {
 | 
			
		||||
			// Output the raw line and panic on parse failure
 | 
			
		||||
			fmt.Fprintf(os.Stderr, "Failed to parse JSON: %v\n", err)
 | 
			
		||||
					fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(rawLine))
 | 
			
		||||
			fmt.Fprintf(os.Stderr, "Raw line: %s\n", string(line))
 | 
			
		||||
			panic(fmt.Sprintf("JSON parse error: %v", err))
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
@ -252,10 +344,10 @@ func (s *Streamer) stream(ctx context.Context) error {
 | 
			
		||||
		if wrapper.Type != "ris_message" {
 | 
			
		||||
			s.logger.Error("Unexpected wrapper type",
 | 
			
		||||
				"type", wrapper.Type,
 | 
			
		||||
						"line", string(rawLine),
 | 
			
		||||
				"line", string(line),
 | 
			
		||||
			)
 | 
			
		||||
 | 
			
		||||
					return
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Get the actual message
 | 
			
		||||
@ -268,51 +360,63 @@ func (s *Streamer) stream(ctx context.Context) error {
 | 
			
		||||
		switch msg.Type {
 | 
			
		||||
		case "UPDATE":
 | 
			
		||||
			// Process BGP UPDATE messages
 | 
			
		||||
					// Will be handled by registered handlers
 | 
			
		||||
			// Will be dispatched to handlers
 | 
			
		||||
		case "RIS_PEER_STATE":
 | 
			
		||||
			// RIS peer state messages - silently ignore
 | 
			
		||||
			continue
 | 
			
		||||
		case "KEEPALIVE":
 | 
			
		||||
			// BGP keepalive messages - silently process
 | 
			
		||||
			continue
 | 
			
		||||
		case "OPEN":
 | 
			
		||||
			// BGP open messages
 | 
			
		||||
			s.logger.Info("BGP session opened",
 | 
			
		||||
				"peer", msg.Peer,
 | 
			
		||||
				"peer_asn", msg.PeerASN,
 | 
			
		||||
			)
 | 
			
		||||
 | 
			
		||||
			continue
 | 
			
		||||
		case "NOTIFICATION":
 | 
			
		||||
			// BGP notification messages (errors)
 | 
			
		||||
			s.logger.Warn("BGP notification",
 | 
			
		||||
				"peer", msg.Peer,
 | 
			
		||||
				"peer_asn", msg.PeerASN,
 | 
			
		||||
			)
 | 
			
		||||
 | 
			
		||||
			continue
 | 
			
		||||
		case "STATE":
 | 
			
		||||
			// Peer state changes - silently ignore
 | 
			
		||||
			continue
 | 
			
		||||
		default:
 | 
			
		||||
			fmt.Fprintf(
 | 
			
		||||
				os.Stderr,
 | 
			
		||||
				"UNKNOWN MESSAGE TYPE: %s\nRAW MESSAGE: %s\n",
 | 
			
		||||
				msg.Type,
 | 
			
		||||
						string(rawLine),
 | 
			
		||||
				string(line),
 | 
			
		||||
			)
 | 
			
		||||
			panic(
 | 
			
		||||
				fmt.Sprintf(
 | 
			
		||||
					"Unknown RIS message type: %s",
 | 
			
		||||
					msg.Type,
 | 
			
		||||
				),
 | 
			
		||||
			)
 | 
			
		||||
					panic(fmt.Sprintf("Unknown RIS message type: %s", msg.Type))
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
				// Call handlers synchronously within this goroutine
 | 
			
		||||
				// This prevents unbounded goroutine growth at the handler level
 | 
			
		||||
				for _, handler := range messageHandlers {
 | 
			
		||||
					if handler.WantsMessage(msg.Type) {
 | 
			
		||||
						handler.HandleMessage(&msg)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}(append([]byte(nil), line...), handlers) // Copy the line to avoid data races
 | 
			
		||||
		// Dispatch to interested handlers
 | 
			
		||||
		s.mu.RLock()
 | 
			
		||||
		for _, info := range s.handlers {
 | 
			
		||||
			if info.handler.WantsMessage(msg.Type) {
 | 
			
		||||
				select {
 | 
			
		||||
				case info.queue <- &msg:
 | 
			
		||||
					// Message queued successfully
 | 
			
		||||
				default:
 | 
			
		||||
			// Semaphore is full, drop the message
 | 
			
		||||
			dropped := atomic.AddUint64(&s.droppedMessages, 1)
 | 
			
		||||
			if dropped%1000 == 0 { // Log every 1000 dropped messages
 | 
			
		||||
				s.logger.Warn("Dropping messages due to overload", "total_dropped", dropped, "max_handlers", maxConcurrentHandlers)
 | 
			
		||||
					// Queue is full, drop the message
 | 
			
		||||
					atomic.AddUint64(&info.metrics.droppedCount, 1)
 | 
			
		||||
					atomic.AddUint64(&s.totalDropped, 1)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		s.mu.RUnlock()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := scanner.Err(); err != nil {
 | 
			
		||||
		return fmt.Errorf("scanner error: %w", err)
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user