Remove routing table and snapshotter packages, update status page
- Remove routingtable package entirely as database handles all routing data - Remove snapshotter package as database contains all information - Rename 'Connection Status' box to 'RouteWatch' and add Go version, goroutines, memory usage - Move IPv4/IPv6 prefix counts from Database Statistics to Routing Table box - Add Peers count to Database Statistics box - Add go-humanize dependency for memory formatting - Update server to include new metrics in API responses
This commit is contained in:
parent
d929f24f80
commit
ae89468a1b
1
go.mod
1
go.mod
@ -11,6 +11,7 @@ require (
|
|||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||||
go.uber.org/dig v1.19.0 // indirect
|
go.uber.org/dig v1.19.0 // indirect
|
||||||
go.uber.org/multierr v1.10.0 // indirect
|
go.uber.org/multierr v1.10.0 // indirect
|
||||||
go.uber.org/zap v1.26.0 // indirect
|
go.uber.org/zap v1.26.0 // indirect
|
||||||
|
2
go.sum
2
go.sum
@ -1,5 +1,7 @@
|
|||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||||
|
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||||
github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618=
|
github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618=
|
||||||
github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
|
github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
@ -396,6 +396,12 @@ func (d *Database) GetStats() (Stats, error) {
|
|||||||
return stats, err
|
return stats, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Count peers
|
||||||
|
err = d.queryRow("SELECT COUNT(*) FROM bgp_peers").Scan(&stats.Peers)
|
||||||
|
if err != nil {
|
||||||
|
return stats, err
|
||||||
|
}
|
||||||
|
|
||||||
// Get database file size
|
// Get database file size
|
||||||
fileInfo, err := os.Stat(d.path)
|
fileInfo, err := os.Stat(d.path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -11,6 +11,7 @@ type Stats struct {
|
|||||||
IPv4Prefixes int
|
IPv4Prefixes int
|
||||||
IPv6Prefixes int
|
IPv6Prefixes int
|
||||||
Peerings int
|
Peerings int
|
||||||
|
Peers int
|
||||||
FileSizeBytes int64
|
FileSizeBytes int64
|
||||||
LiveRoutes int
|
LiveRoutes int
|
||||||
IPv4PrefixDistribution []PrefixDistribution
|
IPv4PrefixDistribution []PrefixDistribution
|
||||||
|
@ -159,6 +159,7 @@ func (m *mockStore) GetStats() (database.Stats, error) {
|
|||||||
IPv4Prefixes: m.IPv4Prefixes,
|
IPv4Prefixes: m.IPv4Prefixes,
|
||||||
IPv6Prefixes: m.IPv6Prefixes,
|
IPv6Prefixes: m.IPv6Prefixes,
|
||||||
Peerings: m.PeeringCount,
|
Peerings: m.PeeringCount,
|
||||||
|
Peers: 10, // Mock peer count
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,604 +0,0 @@
|
|||||||
// Package routingtable provides a thread-safe in-memory representation of the DFZ routing table.
|
|
||||||
package routingtable
|
|
||||||
|
|
||||||
import (
|
|
||||||
"compress/gzip"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.eeqj.de/sneak/routewatch/internal/config"
|
|
||||||
"git.eeqj.de/sneak/routewatch/internal/logger"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// routeStalenessThreshold is how old a route can be before we consider it stale
|
|
||||||
// Using 30 minutes as a conservative value for snapshot loading
|
|
||||||
routeStalenessThreshold = 30 * time.Minute
|
|
||||||
|
|
||||||
// snapshotFilename is the name of the snapshot file
|
|
||||||
snapshotFilename = "routingtable.json.gz"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Route represents a single route entry in the routing table
|
|
||||||
type Route struct {
|
|
||||||
PrefixID uuid.UUID `json:"prefix_id"`
|
|
||||||
Prefix string `json:"prefix"` // The actual prefix string (e.g., "10.0.0.0/8")
|
|
||||||
OriginASNID uuid.UUID `json:"origin_asn_id"`
|
|
||||||
OriginASN int `json:"origin_asn"` // The actual ASN number
|
|
||||||
PeerASN int `json:"peer_asn"`
|
|
||||||
ASPath []int `json:"as_path"` // Full AS path
|
|
||||||
NextHop string `json:"next_hop"`
|
|
||||||
AnnouncedAt time.Time `json:"announced_at"`
|
|
||||||
AddedAt time.Time `json:"added_at"` // When we added this route to our table
|
|
||||||
}
|
|
||||||
|
|
||||||
// RouteKey uniquely identifies a route in the table
|
|
||||||
type RouteKey struct {
|
|
||||||
PrefixID uuid.UUID
|
|
||||||
OriginASNID uuid.UUID
|
|
||||||
PeerASN int
|
|
||||||
}
|
|
||||||
|
|
||||||
// RoutingTable is a thread-safe in-memory routing table
|
|
||||||
type RoutingTable struct {
|
|
||||||
mu sync.RWMutex
|
|
||||||
routes map[RouteKey]*Route
|
|
||||||
|
|
||||||
// Secondary indexes for efficient lookups
|
|
||||||
byPrefix map[uuid.UUID]map[RouteKey]*Route // Routes indexed by prefix ID
|
|
||||||
byOriginASN map[uuid.UUID]map[RouteKey]*Route // Routes indexed by origin ASN ID
|
|
||||||
byPeerASN map[int]map[RouteKey]*Route // Routes indexed by peer ASN
|
|
||||||
|
|
||||||
// Metrics tracking
|
|
||||||
ipv4Routes int
|
|
||||||
ipv6Routes int
|
|
||||||
ipv4Updates uint64 // Updates counter for rate calculation
|
|
||||||
ipv6Updates uint64 // Updates counter for rate calculation
|
|
||||||
lastMetricsReset time.Time
|
|
||||||
|
|
||||||
// Configuration
|
|
||||||
snapshotDir string
|
|
||||||
routeExpirationTimeout time.Duration
|
|
||||||
logger *logger.Logger
|
|
||||||
|
|
||||||
// Expiration management
|
|
||||||
stopExpiration chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates a new routing table, loading from snapshot if available
|
|
||||||
func New(cfg *config.Config, logger *logger.Logger) *RoutingTable {
|
|
||||||
rt := &RoutingTable{
|
|
||||||
routes: make(map[RouteKey]*Route),
|
|
||||||
byPrefix: make(map[uuid.UUID]map[RouteKey]*Route),
|
|
||||||
byOriginASN: make(map[uuid.UUID]map[RouteKey]*Route),
|
|
||||||
byPeerASN: make(map[int]map[RouteKey]*Route),
|
|
||||||
lastMetricsReset: time.Now(),
|
|
||||||
snapshotDir: cfg.GetStateDir(),
|
|
||||||
routeExpirationTimeout: cfg.RouteExpirationTimeout,
|
|
||||||
logger: logger,
|
|
||||||
stopExpiration: make(chan struct{}),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to load from snapshot
|
|
||||||
if err := rt.loadFromSnapshot(logger); err != nil {
|
|
||||||
logger.Warn("Failed to load routing table from snapshot", "error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start expiration goroutine
|
|
||||||
go rt.expireRoutesLoop()
|
|
||||||
|
|
||||||
return rt
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddRoute adds or updates a route in the routing table
|
|
||||||
func (rt *RoutingTable) AddRoute(route *Route) {
|
|
||||||
rt.mu.Lock()
|
|
||||||
defer rt.mu.Unlock()
|
|
||||||
|
|
||||||
key := RouteKey{
|
|
||||||
PrefixID: route.PrefixID,
|
|
||||||
OriginASNID: route.OriginASNID,
|
|
||||||
PeerASN: route.PeerASN,
|
|
||||||
}
|
|
||||||
|
|
||||||
// If route already exists, remove it from indexes first
|
|
||||||
if existingRoute, exists := rt.routes[key]; exists {
|
|
||||||
rt.removeFromIndexes(key, existingRoute)
|
|
||||||
// Decrement counter for existing route
|
|
||||||
if isIPv6(existingRoute.Prefix) {
|
|
||||||
rt.ipv6Routes--
|
|
||||||
} else {
|
|
||||||
rt.ipv4Routes--
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set AddedAt if not already set
|
|
||||||
if route.AddedAt.IsZero() {
|
|
||||||
route.AddedAt = time.Now().UTC()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add to main map
|
|
||||||
rt.routes[key] = route
|
|
||||||
|
|
||||||
// Update indexes
|
|
||||||
rt.addToIndexes(key, route)
|
|
||||||
|
|
||||||
// Update metrics
|
|
||||||
if isIPv6(route.Prefix) {
|
|
||||||
rt.ipv6Routes++
|
|
||||||
atomic.AddUint64(&rt.ipv6Updates, 1)
|
|
||||||
} else {
|
|
||||||
rt.ipv4Routes++
|
|
||||||
atomic.AddUint64(&rt.ipv4Updates, 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoveRoute removes a route from the routing table
|
|
||||||
func (rt *RoutingTable) RemoveRoute(prefixID, originASNID uuid.UUID, peerASN int) bool {
|
|
||||||
rt.mu.Lock()
|
|
||||||
defer rt.mu.Unlock()
|
|
||||||
|
|
||||||
key := RouteKey{
|
|
||||||
PrefixID: prefixID,
|
|
||||||
OriginASNID: originASNID,
|
|
||||||
PeerASN: peerASN,
|
|
||||||
}
|
|
||||||
|
|
||||||
route, exists := rt.routes[key]
|
|
||||||
if !exists {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove from indexes
|
|
||||||
rt.removeFromIndexes(key, route)
|
|
||||||
|
|
||||||
// Remove from main map
|
|
||||||
delete(rt.routes, key)
|
|
||||||
|
|
||||||
// Update metrics
|
|
||||||
if isIPv6(route.Prefix) {
|
|
||||||
rt.ipv6Routes--
|
|
||||||
atomic.AddUint64(&rt.ipv6Updates, 1)
|
|
||||||
} else {
|
|
||||||
rt.ipv4Routes--
|
|
||||||
atomic.AddUint64(&rt.ipv4Updates, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithdrawRoutesByPrefixAndPeer removes all routes for a specific prefix from a specific peer
|
|
||||||
func (rt *RoutingTable) WithdrawRoutesByPrefixAndPeer(prefixID uuid.UUID, peerASN int) int {
|
|
||||||
rt.mu.Lock()
|
|
||||||
defer rt.mu.Unlock()
|
|
||||||
|
|
||||||
prefixRoutes, exists := rt.byPrefix[prefixID]
|
|
||||||
if !exists {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect keys to delete (can't delete while iterating)
|
|
||||||
var keysToDelete []RouteKey
|
|
||||||
for key, route := range prefixRoutes {
|
|
||||||
if route.PeerASN == peerASN {
|
|
||||||
keysToDelete = append(keysToDelete, key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete the routes
|
|
||||||
count := 0
|
|
||||||
for _, key := range keysToDelete {
|
|
||||||
route, exists := rt.routes[key]
|
|
||||||
if !exists {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
rt.removeFromIndexes(key, route)
|
|
||||||
delete(rt.routes, key)
|
|
||||||
count++
|
|
||||||
|
|
||||||
// Update metrics
|
|
||||||
if isIPv6(route.Prefix) {
|
|
||||||
rt.ipv6Routes--
|
|
||||||
atomic.AddUint64(&rt.ipv6Updates, 1)
|
|
||||||
} else {
|
|
||||||
rt.ipv4Routes--
|
|
||||||
atomic.AddUint64(&rt.ipv4Updates, 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return count
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetRoute retrieves a specific route
|
|
||||||
func (rt *RoutingTable) GetRoute(prefixID, originASNID uuid.UUID, peerASN int) (*Route, bool) {
|
|
||||||
rt.mu.RLock()
|
|
||||||
defer rt.mu.RUnlock()
|
|
||||||
|
|
||||||
key := RouteKey{
|
|
||||||
PrefixID: prefixID,
|
|
||||||
OriginASNID: originASNID,
|
|
||||||
PeerASN: peerASN,
|
|
||||||
}
|
|
||||||
|
|
||||||
route, exists := rt.routes[key]
|
|
||||||
if !exists {
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return a copy to prevent external modification
|
|
||||||
routeCopy := *route
|
|
||||||
|
|
||||||
return &routeCopy, true
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetRoutesByPrefix returns all routes for a specific prefix
|
|
||||||
func (rt *RoutingTable) GetRoutesByPrefix(prefixID uuid.UUID) []*Route {
|
|
||||||
rt.mu.RLock()
|
|
||||||
defer rt.mu.RUnlock()
|
|
||||||
|
|
||||||
routes := make([]*Route, 0)
|
|
||||||
if prefixRoutes, exists := rt.byPrefix[prefixID]; exists {
|
|
||||||
for _, route := range prefixRoutes {
|
|
||||||
routeCopy := *route
|
|
||||||
routes = append(routes, &routeCopy)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return routes
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetRoutesByOriginASN returns all routes originated by a specific ASN
|
|
||||||
func (rt *RoutingTable) GetRoutesByOriginASN(originASNID uuid.UUID) []*Route {
|
|
||||||
rt.mu.RLock()
|
|
||||||
defer rt.mu.RUnlock()
|
|
||||||
|
|
||||||
routes := make([]*Route, 0)
|
|
||||||
if asnRoutes, exists := rt.byOriginASN[originASNID]; exists {
|
|
||||||
for _, route := range asnRoutes {
|
|
||||||
routeCopy := *route
|
|
||||||
routes = append(routes, &routeCopy)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return routes
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetRoutesByPeerASN returns all routes received from a specific peer ASN
|
|
||||||
func (rt *RoutingTable) GetRoutesByPeerASN(peerASN int) []*Route {
|
|
||||||
rt.mu.RLock()
|
|
||||||
defer rt.mu.RUnlock()
|
|
||||||
|
|
||||||
routes := make([]*Route, 0)
|
|
||||||
if peerRoutes, exists := rt.byPeerASN[peerASN]; exists {
|
|
||||||
for _, route := range peerRoutes {
|
|
||||||
routeCopy := *route
|
|
||||||
routes = append(routes, &routeCopy)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return routes
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAllRoutes returns all active routes in the routing table
|
|
||||||
func (rt *RoutingTable) GetAllRoutes() []*Route {
|
|
||||||
rt.mu.RLock()
|
|
||||||
defer rt.mu.RUnlock()
|
|
||||||
|
|
||||||
routes := make([]*Route, 0, len(rt.routes))
|
|
||||||
for _, route := range rt.routes {
|
|
||||||
routeCopy := *route
|
|
||||||
routes = append(routes, &routeCopy)
|
|
||||||
}
|
|
||||||
|
|
||||||
return routes
|
|
||||||
}
|
|
||||||
|
|
||||||
// Size returns the total number of routes in the table
|
|
||||||
func (rt *RoutingTable) Size() int {
|
|
||||||
rt.mu.RLock()
|
|
||||||
defer rt.mu.RUnlock()
|
|
||||||
|
|
||||||
return len(rt.routes)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stats returns statistics about the routing table
|
|
||||||
func (rt *RoutingTable) Stats() map[string]int {
|
|
||||||
rt.mu.RLock()
|
|
||||||
defer rt.mu.RUnlock()
|
|
||||||
|
|
||||||
stats := map[string]int{
|
|
||||||
"total_routes": len(rt.routes),
|
|
||||||
"unique_prefixes": len(rt.byPrefix),
|
|
||||||
"unique_origins": len(rt.byOriginASN),
|
|
||||||
"unique_peers": len(rt.byPeerASN),
|
|
||||||
}
|
|
||||||
|
|
||||||
return stats
|
|
||||||
}
|
|
||||||
|
|
||||||
// DetailedStats contains detailed routing table statistics
|
|
||||||
type DetailedStats struct {
|
|
||||||
IPv4Routes int
|
|
||||||
IPv6Routes int
|
|
||||||
IPv4UpdatesRate float64
|
|
||||||
IPv6UpdatesRate float64
|
|
||||||
TotalRoutes int
|
|
||||||
UniquePrefixes int
|
|
||||||
UniqueOrigins int
|
|
||||||
UniquePeers int
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetDetailedStats returns detailed statistics including IPv4/IPv6 breakdown and update rates
|
|
||||||
func (rt *RoutingTable) GetDetailedStats() DetailedStats {
|
|
||||||
rt.mu.Lock()
|
|
||||||
defer rt.mu.Unlock()
|
|
||||||
|
|
||||||
// Calculate update rates
|
|
||||||
elapsed := time.Since(rt.lastMetricsReset).Seconds()
|
|
||||||
ipv4Updates := atomic.LoadUint64(&rt.ipv4Updates)
|
|
||||||
ipv6Updates := atomic.LoadUint64(&rt.ipv6Updates)
|
|
||||||
|
|
||||||
stats := DetailedStats{
|
|
||||||
IPv4Routes: rt.ipv4Routes,
|
|
||||||
IPv6Routes: rt.ipv6Routes,
|
|
||||||
IPv4UpdatesRate: float64(ipv4Updates) / elapsed,
|
|
||||||
IPv6UpdatesRate: float64(ipv6Updates) / elapsed,
|
|
||||||
TotalRoutes: len(rt.routes),
|
|
||||||
UniquePrefixes: len(rt.byPrefix),
|
|
||||||
UniqueOrigins: len(rt.byOriginASN),
|
|
||||||
UniquePeers: len(rt.byPeerASN),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset counters for next period
|
|
||||||
atomic.StoreUint64(&rt.ipv4Updates, 0)
|
|
||||||
atomic.StoreUint64(&rt.ipv6Updates, 0)
|
|
||||||
rt.lastMetricsReset = time.Now()
|
|
||||||
|
|
||||||
return stats
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear removes all routes from the routing table
|
|
||||||
func (rt *RoutingTable) Clear() {
|
|
||||||
rt.mu.Lock()
|
|
||||||
defer rt.mu.Unlock()
|
|
||||||
|
|
||||||
rt.routes = make(map[RouteKey]*Route)
|
|
||||||
rt.byPrefix = make(map[uuid.UUID]map[RouteKey]*Route)
|
|
||||||
rt.byOriginASN = make(map[uuid.UUID]map[RouteKey]*Route)
|
|
||||||
rt.byPeerASN = make(map[int]map[RouteKey]*Route)
|
|
||||||
rt.ipv4Routes = 0
|
|
||||||
rt.ipv6Routes = 0
|
|
||||||
atomic.StoreUint64(&rt.ipv4Updates, 0)
|
|
||||||
atomic.StoreUint64(&rt.ipv6Updates, 0)
|
|
||||||
rt.lastMetricsReset = time.Now()
|
|
||||||
}
|
|
||||||
|
|
||||||
// RLock acquires a read lock on the routing table
|
|
||||||
// This is exposed for the snapshotter to use
|
|
||||||
func (rt *RoutingTable) RLock() {
|
|
||||||
rt.mu.RLock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// RUnlock releases a read lock on the routing table
|
|
||||||
// This is exposed for the snapshotter to use
|
|
||||||
func (rt *RoutingTable) RUnlock() {
|
|
||||||
rt.mu.RUnlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAllRoutesUnsafe returns all routes without copying
|
|
||||||
// IMPORTANT: Caller must hold RLock before calling this method
|
|
||||||
func (rt *RoutingTable) GetAllRoutesUnsafe() []*Route {
|
|
||||||
routes := make([]*Route, 0, len(rt.routes))
|
|
||||||
for _, route := range rt.routes {
|
|
||||||
routes = append(routes, route)
|
|
||||||
}
|
|
||||||
|
|
||||||
return routes
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper methods for index management
|
|
||||||
|
|
||||||
func (rt *RoutingTable) addToIndexes(key RouteKey, route *Route) {
|
|
||||||
// Add to prefix index
|
|
||||||
if rt.byPrefix[route.PrefixID] == nil {
|
|
||||||
rt.byPrefix[route.PrefixID] = make(map[RouteKey]*Route)
|
|
||||||
}
|
|
||||||
rt.byPrefix[route.PrefixID][key] = route
|
|
||||||
|
|
||||||
// Add to origin ASN index
|
|
||||||
if rt.byOriginASN[route.OriginASNID] == nil {
|
|
||||||
rt.byOriginASN[route.OriginASNID] = make(map[RouteKey]*Route)
|
|
||||||
}
|
|
||||||
rt.byOriginASN[route.OriginASNID][key] = route
|
|
||||||
|
|
||||||
// Add to peer ASN index
|
|
||||||
if rt.byPeerASN[route.PeerASN] == nil {
|
|
||||||
rt.byPeerASN[route.PeerASN] = make(map[RouteKey]*Route)
|
|
||||||
}
|
|
||||||
rt.byPeerASN[route.PeerASN][key] = route
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rt *RoutingTable) removeFromIndexes(key RouteKey, route *Route) {
|
|
||||||
// Remove from prefix index
|
|
||||||
if prefixRoutes, exists := rt.byPrefix[route.PrefixID]; exists {
|
|
||||||
delete(prefixRoutes, key)
|
|
||||||
if len(prefixRoutes) == 0 {
|
|
||||||
delete(rt.byPrefix, route.PrefixID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove from origin ASN index
|
|
||||||
if asnRoutes, exists := rt.byOriginASN[route.OriginASNID]; exists {
|
|
||||||
delete(asnRoutes, key)
|
|
||||||
if len(asnRoutes) == 0 {
|
|
||||||
delete(rt.byOriginASN, route.OriginASNID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove from peer ASN index
|
|
||||||
if peerRoutes, exists := rt.byPeerASN[route.PeerASN]; exists {
|
|
||||||
delete(peerRoutes, key)
|
|
||||||
if len(peerRoutes) == 0 {
|
|
||||||
delete(rt.byPeerASN, route.PeerASN)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// String returns a string representation of the route key
|
|
||||||
func (k RouteKey) String() string {
|
|
||||||
return fmt.Sprintf("%s/%s/%d", k.PrefixID, k.OriginASNID, k.PeerASN)
|
|
||||||
}
|
|
||||||
|
|
||||||
// isIPv6 returns true if the prefix is an IPv6 address
|
|
||||||
func isIPv6(prefix string) bool {
|
|
||||||
return strings.Contains(prefix, ":")
|
|
||||||
}
|
|
||||||
|
|
||||||
// loadFromSnapshot attempts to load the routing table from a snapshot file
|
|
||||||
func (rt *RoutingTable) loadFromSnapshot(logger *logger.Logger) error {
|
|
||||||
// If no snapshot directory specified, nothing to load
|
|
||||||
if rt.snapshotDir == "" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshotPath := filepath.Join(rt.snapshotDir, snapshotFilename)
|
|
||||||
|
|
||||||
// Check if snapshot file exists
|
|
||||||
if _, err := os.Stat(snapshotPath); os.IsNotExist(err) {
|
|
||||||
// No snapshot file exists, this is normal - start with empty routing table
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open the snapshot file
|
|
||||||
file, err := os.Open(filepath.Clean(snapshotPath))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to open snapshot file: %w", err)
|
|
||||||
}
|
|
||||||
defer func() { _ = file.Close() }()
|
|
||||||
|
|
||||||
// Create gzip reader
|
|
||||||
gzReader, err := gzip.NewReader(file)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to create gzip reader: %w", err)
|
|
||||||
}
|
|
||||||
defer func() { _ = gzReader.Close() }()
|
|
||||||
|
|
||||||
// Decode the snapshot
|
|
||||||
var snapshot struct {
|
|
||||||
Timestamp time.Time `json:"timestamp"`
|
|
||||||
Stats DetailedStats `json:"stats"`
|
|
||||||
Routes []*Route `json:"routes"`
|
|
||||||
}
|
|
||||||
|
|
||||||
decoder := json.NewDecoder(gzReader)
|
|
||||||
if err := decoder.Decode(&snapshot); err != nil {
|
|
||||||
return fmt.Errorf("failed to decode snapshot: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate staleness cutoff time
|
|
||||||
now := time.Now().UTC()
|
|
||||||
cutoffTime := now.Add(-routeStalenessThreshold)
|
|
||||||
|
|
||||||
// Load non-stale routes
|
|
||||||
loadedCount := 0
|
|
||||||
staleCount := 0
|
|
||||||
|
|
||||||
for _, route := range snapshot.Routes {
|
|
||||||
// Check if route is stale based on AddedAt time
|
|
||||||
if route.AddedAt.Before(cutoffTime) {
|
|
||||||
staleCount++
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add the route (this will update counters and indexes)
|
|
||||||
rt.AddRoute(route)
|
|
||||||
loadedCount++
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("Loaded routing table from snapshot",
|
|
||||||
"snapshot_time", snapshot.Timestamp,
|
|
||||||
"loaded_routes", loadedCount,
|
|
||||||
"stale_routes", staleCount,
|
|
||||||
"total_routes_in_snapshot", len(snapshot.Routes),
|
|
||||||
)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// expireRoutesLoop periodically removes expired routes
|
|
||||||
func (rt *RoutingTable) expireRoutesLoop() {
|
|
||||||
// Run every minute to check for expired routes
|
|
||||||
ticker := time.NewTicker(1 * time.Minute)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
rt.expireStaleRoutes()
|
|
||||||
case <-rt.stopExpiration:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// expireStaleRoutes removes routes that haven't been updated recently
|
|
||||||
func (rt *RoutingTable) expireStaleRoutes() {
|
|
||||||
rt.mu.Lock()
|
|
||||||
defer rt.mu.Unlock()
|
|
||||||
|
|
||||||
now := time.Now().UTC()
|
|
||||||
cutoffTime := now.Add(-rt.routeExpirationTimeout)
|
|
||||||
expiredCount := 0
|
|
||||||
|
|
||||||
// Collect keys to delete (can't delete while iterating)
|
|
||||||
var keysToDelete []RouteKey
|
|
||||||
for key, route := range rt.routes {
|
|
||||||
// Use AnnouncedAt as the last update time
|
|
||||||
if route.AnnouncedAt.Before(cutoffTime) {
|
|
||||||
keysToDelete = append(keysToDelete, key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete expired routes
|
|
||||||
for _, key := range keysToDelete {
|
|
||||||
route, exists := rt.routes[key]
|
|
||||||
if !exists {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
rt.removeFromIndexes(key, route)
|
|
||||||
delete(rt.routes, key)
|
|
||||||
expiredCount++
|
|
||||||
|
|
||||||
// Update metrics
|
|
||||||
if isIPv6(route.Prefix) {
|
|
||||||
rt.ipv6Routes--
|
|
||||||
} else {
|
|
||||||
rt.ipv4Routes--
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if expiredCount > 0 {
|
|
||||||
rt.logger.Info("Expired stale routes",
|
|
||||||
"count", expiredCount,
|
|
||||||
"timeout", rt.routeExpirationTimeout,
|
|
||||||
"remaining_routes", len(rt.routes),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop gracefully stops the routing table background tasks
|
|
||||||
func (rt *RoutingTable) Stop() {
|
|
||||||
if rt.stopExpiration != nil {
|
|
||||||
close(rt.stopExpiration)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,245 +0,0 @@
|
|||||||
package routingtable
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.eeqj.de/sneak/routewatch/internal/config"
|
|
||||||
"git.eeqj.de/sneak/routewatch/internal/logger"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestRoutingTable(t *testing.T) {
|
|
||||||
// Create a test logger
|
|
||||||
logger := logger.New()
|
|
||||||
|
|
||||||
// Create test config with empty state dir (no snapshot loading)
|
|
||||||
cfg := &config.Config{
|
|
||||||
StateDir: "",
|
|
||||||
}
|
|
||||||
|
|
||||||
rt := New(cfg, logger)
|
|
||||||
|
|
||||||
// Test data
|
|
||||||
prefixID1 := uuid.New()
|
|
||||||
prefixID2 := uuid.New()
|
|
||||||
originASNID1 := uuid.New()
|
|
||||||
originASNID2 := uuid.New()
|
|
||||||
|
|
||||||
route1 := &Route{
|
|
||||||
PrefixID: prefixID1,
|
|
||||||
Prefix: "10.0.0.0/8",
|
|
||||||
OriginASNID: originASNID1,
|
|
||||||
OriginASN: 64512,
|
|
||||||
PeerASN: 64513,
|
|
||||||
ASPath: []int{64513, 64512},
|
|
||||||
NextHop: "192.168.1.1",
|
|
||||||
AnnouncedAt: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
route2 := &Route{
|
|
||||||
PrefixID: prefixID2,
|
|
||||||
Prefix: "192.168.0.0/16",
|
|
||||||
OriginASNID: originASNID2,
|
|
||||||
OriginASN: 64514,
|
|
||||||
PeerASN: 64513,
|
|
||||||
ASPath: []int{64513, 64514},
|
|
||||||
NextHop: "192.168.1.1",
|
|
||||||
AnnouncedAt: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test AddRoute
|
|
||||||
rt.AddRoute(route1)
|
|
||||||
rt.AddRoute(route2)
|
|
||||||
|
|
||||||
if rt.Size() != 2 {
|
|
||||||
t.Errorf("Expected 2 routes, got %d", rt.Size())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test GetRoute
|
|
||||||
retrievedRoute, exists := rt.GetRoute(prefixID1, originASNID1, 64513)
|
|
||||||
if !exists {
|
|
||||||
t.Error("Route 1 should exist")
|
|
||||||
}
|
|
||||||
if retrievedRoute.Prefix != "10.0.0.0/8" {
|
|
||||||
t.Errorf("Expected prefix 10.0.0.0/8, got %s", retrievedRoute.Prefix)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test GetRoutesByPrefix
|
|
||||||
prefixRoutes := rt.GetRoutesByPrefix(prefixID1)
|
|
||||||
if len(prefixRoutes) != 1 {
|
|
||||||
t.Errorf("Expected 1 route for prefix, got %d", len(prefixRoutes))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test GetRoutesByPeerASN
|
|
||||||
peerRoutes := rt.GetRoutesByPeerASN(64513)
|
|
||||||
if len(peerRoutes) != 2 {
|
|
||||||
t.Errorf("Expected 2 routes from peer 64513, got %d", len(peerRoutes))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test RemoveRoute
|
|
||||||
removed := rt.RemoveRoute(prefixID1, originASNID1, 64513)
|
|
||||||
if !removed {
|
|
||||||
t.Error("Route should have been removed")
|
|
||||||
}
|
|
||||||
if rt.Size() != 1 {
|
|
||||||
t.Errorf("Expected 1 route after removal, got %d", rt.Size())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test WithdrawRoutesByPrefixAndPeer
|
|
||||||
// Add the route back first
|
|
||||||
rt.AddRoute(route1)
|
|
||||||
|
|
||||||
// Add another route for the same prefix from the same peer
|
|
||||||
route3 := &Route{
|
|
||||||
PrefixID: prefixID1,
|
|
||||||
Prefix: "10.0.0.0/8",
|
|
||||||
OriginASNID: originASNID2, // Different origin
|
|
||||||
OriginASN: 64515,
|
|
||||||
PeerASN: 64513,
|
|
||||||
ASPath: []int{64513, 64515},
|
|
||||||
NextHop: "192.168.1.1",
|
|
||||||
AnnouncedAt: time.Now(),
|
|
||||||
}
|
|
||||||
rt.AddRoute(route3)
|
|
||||||
|
|
||||||
count := rt.WithdrawRoutesByPrefixAndPeer(prefixID1, 64513)
|
|
||||||
if count != 2 {
|
|
||||||
t.Errorf("Expected to withdraw 2 routes, withdrew %d", count)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Should only have route2 left
|
|
||||||
if rt.Size() != 1 {
|
|
||||||
t.Errorf("Expected 1 route after withdrawal, got %d", rt.Size())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test Stats
|
|
||||||
stats := rt.Stats()
|
|
||||||
if stats["total_routes"] != 1 {
|
|
||||||
t.Errorf("Expected 1 total route in stats, got %d", stats["total_routes"])
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test Clear
|
|
||||||
rt.Clear()
|
|
||||||
if rt.Size() != 0 {
|
|
||||||
t.Errorf("Expected 0 routes after clear, got %d", rt.Size())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRoutingTableConcurrency(t *testing.T) {
|
|
||||||
// Create a test logger
|
|
||||||
logger := logger.New()
|
|
||||||
|
|
||||||
// Create test config with empty state dir (no snapshot loading)
|
|
||||||
cfg := &config.Config{
|
|
||||||
StateDir: "",
|
|
||||||
}
|
|
||||||
|
|
||||||
rt := New(cfg, logger)
|
|
||||||
|
|
||||||
// Test concurrent access
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
numGoroutines := 10
|
|
||||||
numOperations := 100
|
|
||||||
|
|
||||||
// Start multiple goroutines that add/remove routes
|
|
||||||
for i := 0; i < numGoroutines; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(id int) {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
for j := 0; j < numOperations; j++ {
|
|
||||||
prefixID := uuid.New()
|
|
||||||
originASNID := uuid.New()
|
|
||||||
|
|
||||||
route := &Route{
|
|
||||||
PrefixID: prefixID,
|
|
||||||
Prefix: "10.0.0.0/8",
|
|
||||||
OriginASNID: originASNID,
|
|
||||||
OriginASN: 64512 + id,
|
|
||||||
PeerASN: 64500,
|
|
||||||
ASPath: []int{64500, 64512 + id},
|
|
||||||
NextHop: "192.168.1.1",
|
|
||||||
AnnouncedAt: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add route
|
|
||||||
rt.AddRoute(route)
|
|
||||||
|
|
||||||
// Try to get it
|
|
||||||
_, _ = rt.GetRoute(prefixID, originASNID, 64500)
|
|
||||||
|
|
||||||
// Get stats
|
|
||||||
_ = rt.Stats()
|
|
||||||
|
|
||||||
// Remove it
|
|
||||||
rt.RemoveRoute(prefixID, originASNID, 64500)
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
// Table should be empty after all operations
|
|
||||||
if rt.Size() != 0 {
|
|
||||||
t.Errorf("Expected empty table after concurrent operations, got %d routes", rt.Size())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRouteUpdate(t *testing.T) {
|
|
||||||
// Create a test logger
|
|
||||||
logger := logger.New()
|
|
||||||
|
|
||||||
// Create test config with empty state dir (no snapshot loading)
|
|
||||||
cfg := &config.Config{
|
|
||||||
StateDir: "",
|
|
||||||
}
|
|
||||||
|
|
||||||
rt := New(cfg, logger)
|
|
||||||
|
|
||||||
prefixID := uuid.New()
|
|
||||||
originASNID := uuid.New()
|
|
||||||
|
|
||||||
route1 := &Route{
|
|
||||||
PrefixID: prefixID,
|
|
||||||
Prefix: "10.0.0.0/8",
|
|
||||||
OriginASNID: originASNID,
|
|
||||||
OriginASN: 64512,
|
|
||||||
PeerASN: 64513,
|
|
||||||
ASPath: []int{64513, 64512},
|
|
||||||
NextHop: "192.168.1.1",
|
|
||||||
AnnouncedAt: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add initial route
|
|
||||||
rt.AddRoute(route1)
|
|
||||||
|
|
||||||
// Update the same route with new next hop
|
|
||||||
route2 := &Route{
|
|
||||||
PrefixID: prefixID,
|
|
||||||
Prefix: "10.0.0.0/8",
|
|
||||||
OriginASNID: originASNID,
|
|
||||||
OriginASN: 64512,
|
|
||||||
PeerASN: 64513,
|
|
||||||
ASPath: []int{64513, 64512},
|
|
||||||
NextHop: "192.168.1.2", // Changed
|
|
||||||
AnnouncedAt: time.Now().Add(1 * time.Minute),
|
|
||||||
}
|
|
||||||
|
|
||||||
rt.AddRoute(route2)
|
|
||||||
|
|
||||||
// Should still have only 1 route
|
|
||||||
if rt.Size() != 1 {
|
|
||||||
t.Errorf("Expected 1 route after update, got %d", rt.Size())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check that the route was updated
|
|
||||||
retrievedRoute, exists := rt.GetRoute(prefixID, originASNID, 64513)
|
|
||||||
if !exists {
|
|
||||||
t.Error("Route should exist after update")
|
|
||||||
}
|
|
||||||
if retrievedRoute.NextHop != "192.168.1.2" {
|
|
||||||
t.Errorf("Expected updated next hop 192.168.1.2, got %s", retrievedRoute.NextHop)
|
|
||||||
}
|
|
||||||
}
|
|
@ -6,12 +6,14 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.eeqj.de/sneak/routewatch/internal/database"
|
"git.eeqj.de/sneak/routewatch/internal/database"
|
||||||
"git.eeqj.de/sneak/routewatch/internal/logger"
|
"git.eeqj.de/sneak/routewatch/internal/logger"
|
||||||
"git.eeqj.de/sneak/routewatch/internal/streamer"
|
"git.eeqj.de/sneak/routewatch/internal/streamer"
|
||||||
"git.eeqj.de/sneak/routewatch/internal/templates"
|
"git.eeqj.de/sneak/routewatch/internal/templates"
|
||||||
|
"github.com/dustin/go-humanize"
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
"github.com/go-chi/chi/v5/middleware"
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
)
|
)
|
||||||
@ -117,11 +119,15 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
|
|||||||
MessagesPerSec float64 `json:"messages_per_sec"`
|
MessagesPerSec float64 `json:"messages_per_sec"`
|
||||||
MbitsPerSec float64 `json:"mbits_per_sec"`
|
MbitsPerSec float64 `json:"mbits_per_sec"`
|
||||||
Connected bool `json:"connected"`
|
Connected bool `json:"connected"`
|
||||||
|
GoVersion string `json:"go_version"`
|
||||||
|
Goroutines int `json:"goroutines"`
|
||||||
|
MemoryUsage string `json:"memory_usage"`
|
||||||
ASNs int `json:"asns"`
|
ASNs int `json:"asns"`
|
||||||
Prefixes int `json:"prefixes"`
|
Prefixes int `json:"prefixes"`
|
||||||
IPv4Prefixes int `json:"ipv4_prefixes"`
|
IPv4Prefixes int `json:"ipv4_prefixes"`
|
||||||
IPv6Prefixes int `json:"ipv6_prefixes"`
|
IPv6Prefixes int `json:"ipv6_prefixes"`
|
||||||
Peerings int `json:"peerings"`
|
Peerings int `json:"peerings"`
|
||||||
|
Peers int `json:"peers"`
|
||||||
DatabaseSizeBytes int64 `json:"database_size_bytes"`
|
DatabaseSizeBytes int64 `json:"database_size_bytes"`
|
||||||
LiveRoutes int `json:"live_routes"`
|
LiveRoutes int `json:"live_routes"`
|
||||||
IPv4Routes int `json:"ipv4_routes"`
|
IPv4Routes int `json:"ipv4_routes"`
|
||||||
@ -203,6 +209,10 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
|
|||||||
// Get route update metrics
|
// Get route update metrics
|
||||||
routeMetrics := s.streamer.GetMetricsTracker().GetRouteMetrics()
|
routeMetrics := s.streamer.GetMetricsTracker().GetRouteMetrics()
|
||||||
|
|
||||||
|
// Get memory stats
|
||||||
|
var memStats runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&memStats)
|
||||||
|
|
||||||
stats := Stats{
|
stats := Stats{
|
||||||
Uptime: uptime,
|
Uptime: uptime,
|
||||||
TotalMessages: metrics.TotalMessages,
|
TotalMessages: metrics.TotalMessages,
|
||||||
@ -210,11 +220,15 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
|
|||||||
MessagesPerSec: metrics.MessagesPerSec,
|
MessagesPerSec: metrics.MessagesPerSec,
|
||||||
MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit,
|
MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit,
|
||||||
Connected: metrics.Connected,
|
Connected: metrics.Connected,
|
||||||
|
GoVersion: runtime.Version(),
|
||||||
|
Goroutines: runtime.NumGoroutine(),
|
||||||
|
MemoryUsage: humanize.Bytes(memStats.Alloc),
|
||||||
ASNs: dbStats.ASNs,
|
ASNs: dbStats.ASNs,
|
||||||
Prefixes: dbStats.Prefixes,
|
Prefixes: dbStats.Prefixes,
|
||||||
IPv4Prefixes: dbStats.IPv4Prefixes,
|
IPv4Prefixes: dbStats.IPv4Prefixes,
|
||||||
IPv6Prefixes: dbStats.IPv6Prefixes,
|
IPv6Prefixes: dbStats.IPv6Prefixes,
|
||||||
Peerings: dbStats.Peerings,
|
Peerings: dbStats.Peerings,
|
||||||
|
Peers: dbStats.Peers,
|
||||||
DatabaseSizeBytes: dbStats.FileSizeBytes,
|
DatabaseSizeBytes: dbStats.FileSizeBytes,
|
||||||
LiveRoutes: dbStats.LiveRoutes,
|
LiveRoutes: dbStats.LiveRoutes,
|
||||||
IPv4Routes: ipv4Routes,
|
IPv4Routes: ipv4Routes,
|
||||||
@ -258,11 +272,15 @@ func (s *Server) handleStats() http.HandlerFunc {
|
|||||||
MessagesPerSec float64 `json:"messages_per_sec"`
|
MessagesPerSec float64 `json:"messages_per_sec"`
|
||||||
MbitsPerSec float64 `json:"mbits_per_sec"`
|
MbitsPerSec float64 `json:"mbits_per_sec"`
|
||||||
Connected bool `json:"connected"`
|
Connected bool `json:"connected"`
|
||||||
|
GoVersion string `json:"go_version"`
|
||||||
|
Goroutines int `json:"goroutines"`
|
||||||
|
MemoryUsage string `json:"memory_usage"`
|
||||||
ASNs int `json:"asns"`
|
ASNs int `json:"asns"`
|
||||||
Prefixes int `json:"prefixes"`
|
Prefixes int `json:"prefixes"`
|
||||||
IPv4Prefixes int `json:"ipv4_prefixes"`
|
IPv4Prefixes int `json:"ipv4_prefixes"`
|
||||||
IPv6Prefixes int `json:"ipv6_prefixes"`
|
IPv6Prefixes int `json:"ipv6_prefixes"`
|
||||||
Peerings int `json:"peerings"`
|
Peerings int `json:"peerings"`
|
||||||
|
Peers int `json:"peers"`
|
||||||
DatabaseSizeBytes int64 `json:"database_size_bytes"`
|
DatabaseSizeBytes int64 `json:"database_size_bytes"`
|
||||||
LiveRoutes int `json:"live_routes"`
|
LiveRoutes int `json:"live_routes"`
|
||||||
IPv4Routes int `json:"ipv4_routes"`
|
IPv4Routes int `json:"ipv4_routes"`
|
||||||
@ -355,6 +373,10 @@ func (s *Server) handleStats() http.HandlerFunc {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get memory stats
|
||||||
|
var memStats runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&memStats)
|
||||||
|
|
||||||
stats := StatsResponse{
|
stats := StatsResponse{
|
||||||
Uptime: uptime,
|
Uptime: uptime,
|
||||||
TotalMessages: metrics.TotalMessages,
|
TotalMessages: metrics.TotalMessages,
|
||||||
@ -362,11 +384,15 @@ func (s *Server) handleStats() http.HandlerFunc {
|
|||||||
MessagesPerSec: metrics.MessagesPerSec,
|
MessagesPerSec: metrics.MessagesPerSec,
|
||||||
MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit,
|
MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit,
|
||||||
Connected: metrics.Connected,
|
Connected: metrics.Connected,
|
||||||
|
GoVersion: runtime.Version(),
|
||||||
|
Goroutines: runtime.NumGoroutine(),
|
||||||
|
MemoryUsage: humanize.Bytes(memStats.Alloc),
|
||||||
ASNs: dbStats.ASNs,
|
ASNs: dbStats.ASNs,
|
||||||
Prefixes: dbStats.Prefixes,
|
Prefixes: dbStats.Prefixes,
|
||||||
IPv4Prefixes: dbStats.IPv4Prefixes,
|
IPv4Prefixes: dbStats.IPv4Prefixes,
|
||||||
IPv6Prefixes: dbStats.IPv6Prefixes,
|
IPv6Prefixes: dbStats.IPv6Prefixes,
|
||||||
Peerings: dbStats.Peerings,
|
Peerings: dbStats.Peerings,
|
||||||
|
Peers: dbStats.Peers,
|
||||||
DatabaseSizeBytes: dbStats.FileSizeBytes,
|
DatabaseSizeBytes: dbStats.FileSizeBytes,
|
||||||
LiveRoutes: dbStats.LiveRoutes,
|
LiveRoutes: dbStats.LiveRoutes,
|
||||||
IPv4Routes: ipv4Routes,
|
IPv4Routes: ipv4Routes,
|
||||||
|
@ -1,242 +0,0 @@
|
|||||||
// Package snapshotter provides functionality for creating periodic and on-demand
|
|
||||||
// snapshots of the routing table state.
|
|
||||||
package snapshotter
|
|
||||||
|
|
||||||
import (
|
|
||||||
"compress/gzip"
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"git.eeqj.de/sneak/routewatch/internal/logger"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.eeqj.de/sneak/routewatch/internal/config"
|
|
||||||
"git.eeqj.de/sneak/routewatch/internal/routingtable"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
snapshotInterval = 10 * time.Minute
|
|
||||||
snapshotFilename = "routingtable.json.gz"
|
|
||||||
tempFileSuffix = ".tmp"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Snapshotter handles periodic and on-demand snapshots of the routing table
|
|
||||||
type Snapshotter struct {
|
|
||||||
rt *routingtable.RoutingTable
|
|
||||||
stateDir string
|
|
||||||
logger *logger.Logger
|
|
||||||
ctx context.Context
|
|
||||||
cancel context.CancelFunc
|
|
||||||
mu sync.Mutex // Ensures only one snapshot runs at a time
|
|
||||||
wg sync.WaitGroup
|
|
||||||
lastSnapshot time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates a new Snapshotter instance
|
|
||||||
func New(rt *routingtable.RoutingTable, cfg *config.Config, logger *logger.Logger) (*Snapshotter, error) {
|
|
||||||
stateDir := cfg.GetStateDir()
|
|
||||||
|
|
||||||
// If state directory is specified, ensure it exists
|
|
||||||
if stateDir != "" {
|
|
||||||
const stateDirPerms = 0750
|
|
||||||
if err := os.MkdirAll(stateDir, stateDirPerms); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create snapshot directory: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s := &Snapshotter{
|
|
||||||
rt: rt,
|
|
||||||
stateDir: stateDir,
|
|
||||||
logger: logger,
|
|
||||||
}
|
|
||||||
|
|
||||||
return s, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start begins the periodic snapshot process
|
|
||||||
func (s *Snapshotter) Start(ctx context.Context) {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
if s.ctx != nil {
|
|
||||||
// Already started
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
s.ctx = ctx
|
|
||||||
s.cancel = cancel
|
|
||||||
|
|
||||||
// Start periodic snapshot goroutine
|
|
||||||
s.wg.Add(1)
|
|
||||||
go s.periodicSnapshot()
|
|
||||||
}
|
|
||||||
|
|
||||||
// periodicSnapshot runs periodic snapshots
|
|
||||||
func (s *Snapshotter) periodicSnapshot() {
|
|
||||||
defer s.wg.Done()
|
|
||||||
|
|
||||||
ticker := time.NewTicker(snapshotInterval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
// Wait for the first interval before taking any snapshots
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-s.ctx.Done():
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
if err := s.TakeSnapshot(); err != nil {
|
|
||||||
s.logger.Error("Failed to take periodic snapshot", "error", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TakeSnapshot creates a snapshot of the current routing table state
|
|
||||||
func (s *Snapshotter) TakeSnapshot() error {
|
|
||||||
// Can't take snapshot without a state directory
|
|
||||||
if s.stateDir == "" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure only one snapshot runs at a time
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
start := time.Now()
|
|
||||||
s.logger.Info("Starting routing table snapshot")
|
|
||||||
|
|
||||||
// Get a copy of all routes while holding read lock
|
|
||||||
copyStart := time.Now()
|
|
||||||
s.rt.RLock()
|
|
||||||
routes := s.rt.GetAllRoutesUnsafe() // We'll need to add this method
|
|
||||||
s.rt.RUnlock()
|
|
||||||
|
|
||||||
// Get stats separately to avoid deadlock
|
|
||||||
stats := s.rt.GetDetailedStats()
|
|
||||||
|
|
||||||
s.logger.Info("Copied routes from routing table",
|
|
||||||
"duration", time.Since(copyStart),
|
|
||||||
"route_count", len(routes))
|
|
||||||
|
|
||||||
// Create snapshot data structure
|
|
||||||
snapshot := struct {
|
|
||||||
Timestamp time.Time `json:"timestamp"`
|
|
||||||
Stats routingtable.DetailedStats `json:"stats"`
|
|
||||||
Routes []*routingtable.Route `json:"routes"`
|
|
||||||
}{
|
|
||||||
Timestamp: time.Now().UTC(),
|
|
||||||
Stats: stats,
|
|
||||||
Routes: routes,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Serialize to JSON
|
|
||||||
marshalStart := time.Now()
|
|
||||||
jsonData, err := json.Marshal(snapshot)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to marshal snapshot: %w", err)
|
|
||||||
}
|
|
||||||
s.logger.Info("Marshaled snapshot to JSON",
|
|
||||||
"duration", time.Since(marshalStart),
|
|
||||||
"size_bytes", len(jsonData))
|
|
||||||
|
|
||||||
// Write compressed data to temporary file
|
|
||||||
tempPath := filepath.Join(s.stateDir, snapshotFilename+tempFileSuffix)
|
|
||||||
finalPath := filepath.Join(s.stateDir, snapshotFilename)
|
|
||||||
|
|
||||||
// Clean the paths to avoid any path traversal issues
|
|
||||||
tempPath = filepath.Clean(tempPath)
|
|
||||||
finalPath = filepath.Clean(finalPath)
|
|
||||||
|
|
||||||
tempFile, err := os.Create(tempPath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to create temporary file: %w", err)
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
_ = tempFile.Close()
|
|
||||||
// Clean up temp file if it still exists
|
|
||||||
_ = os.Remove(tempPath)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Create gzip writer
|
|
||||||
gzipWriter := gzip.NewWriter(tempFile)
|
|
||||||
gzipWriter.Comment = fmt.Sprintf("RouteWatch snapshot taken at %s", snapshot.Timestamp.Format(time.RFC3339))
|
|
||||||
|
|
||||||
// Write compressed data
|
|
||||||
writeStart := time.Now()
|
|
||||||
if _, err := gzipWriter.Write(jsonData); err != nil {
|
|
||||||
return fmt.Errorf("failed to write compressed data: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close gzip writer to flush all data
|
|
||||||
if err := gzipWriter.Close(); err != nil {
|
|
||||||
return fmt.Errorf("failed to close gzip writer: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sync to disk
|
|
||||||
if err := tempFile.Sync(); err != nil {
|
|
||||||
return fmt.Errorf("failed to sync temporary file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close temp file before rename
|
|
||||||
if err := tempFile.Close(); err != nil {
|
|
||||||
return fmt.Errorf("failed to close temporary file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Atomically rename temp file to final location
|
|
||||||
if err := os.Rename(tempPath, finalPath); err != nil {
|
|
||||||
return fmt.Errorf("failed to rename temporary file: %w", err)
|
|
||||||
}
|
|
||||||
s.logger.Info("Wrote compressed snapshot to disk",
|
|
||||||
"duration", time.Since(writeStart))
|
|
||||||
|
|
||||||
duration := time.Since(start)
|
|
||||||
s.lastSnapshot = time.Now()
|
|
||||||
|
|
||||||
s.logger.Info("Routing table snapshot completed",
|
|
||||||
"duration", duration,
|
|
||||||
"routes", len(routes),
|
|
||||||
"ipv4_routes", stats.IPv4Routes,
|
|
||||||
"ipv6_routes", stats.IPv6Routes,
|
|
||||||
"size_bytes", len(jsonData),
|
|
||||||
"path", finalPath,
|
|
||||||
)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Shutdown performs a final snapshot and cleans up resources
|
|
||||||
func (s *Snapshotter) Shutdown() error {
|
|
||||||
s.logger.Info("Shutting down snapshotter")
|
|
||||||
|
|
||||||
// Cancel context to stop periodic snapshots
|
|
||||||
if s.cancel != nil {
|
|
||||||
s.cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for periodic snapshot goroutine to finish
|
|
||||||
s.wg.Wait()
|
|
||||||
|
|
||||||
// Take final snapshot
|
|
||||||
if err := s.TakeSnapshot(); err != nil {
|
|
||||||
return fmt.Errorf("failed to take final snapshot: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetLastSnapshotTime returns the time of the last successful snapshot
|
|
||||||
func (s *Snapshotter) GetLastSnapshotTime() time.Time {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
return s.lastSnapshot
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetSnapshotPath returns the path to the snapshot file
|
|
||||||
func (s *Snapshotter) GetSnapshotPath() string {
|
|
||||||
return filepath.Join(s.stateDir, snapshotFilename)
|
|
||||||
}
|
|
@ -69,7 +69,7 @@
|
|||||||
<div id="error" class="error" style="display: none;"></div>
|
<div id="error" class="error" style="display: none;"></div>
|
||||||
<div class="status-grid">
|
<div class="status-grid">
|
||||||
<div class="status-card">
|
<div class="status-card">
|
||||||
<h2>Connection Status</h2>
|
<h2>RouteWatch</h2>
|
||||||
<div class="metric">
|
<div class="metric">
|
||||||
<span class="metric-label">Status</span>
|
<span class="metric-label">Status</span>
|
||||||
<span class="metric-value" id="connected">-</span>
|
<span class="metric-value" id="connected">-</span>
|
||||||
@ -78,6 +78,18 @@
|
|||||||
<span class="metric-label">Uptime</span>
|
<span class="metric-label">Uptime</span>
|
||||||
<span class="metric-value" id="uptime">-</span>
|
<span class="metric-value" id="uptime">-</span>
|
||||||
</div>
|
</div>
|
||||||
|
<div class="metric">
|
||||||
|
<span class="metric-label">Go Version</span>
|
||||||
|
<span class="metric-value" id="go_version">-</span>
|
||||||
|
</div>
|
||||||
|
<div class="metric">
|
||||||
|
<span class="metric-label">Goroutines</span>
|
||||||
|
<span class="metric-value" id="goroutines">-</span>
|
||||||
|
</div>
|
||||||
|
<div class="metric">
|
||||||
|
<span class="metric-label">Memory Usage</span>
|
||||||
|
<span class="metric-value" id="memory_usage">-</span>
|
||||||
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<div class="status-card">
|
<div class="status-card">
|
||||||
@ -110,18 +122,14 @@
|
|||||||
<span class="metric-label">Total Prefixes</span>
|
<span class="metric-label">Total Prefixes</span>
|
||||||
<span class="metric-value" id="prefixes">-</span>
|
<span class="metric-value" id="prefixes">-</span>
|
||||||
</div>
|
</div>
|
||||||
<div class="metric">
|
|
||||||
<span class="metric-label">IPv4 Prefixes</span>
|
|
||||||
<span class="metric-value" id="ipv4_prefixes">-</span>
|
|
||||||
</div>
|
|
||||||
<div class="metric">
|
|
||||||
<span class="metric-label">IPv6 Prefixes</span>
|
|
||||||
<span class="metric-value" id="ipv6_prefixes">-</span>
|
|
||||||
</div>
|
|
||||||
<div class="metric">
|
<div class="metric">
|
||||||
<span class="metric-label">Peerings</span>
|
<span class="metric-label">Peerings</span>
|
||||||
<span class="metric-value" id="peerings">-</span>
|
<span class="metric-value" id="peerings">-</span>
|
||||||
</div>
|
</div>
|
||||||
|
<div class="metric">
|
||||||
|
<span class="metric-label">Peers</span>
|
||||||
|
<span class="metric-value" id="peers">-</span>
|
||||||
|
</div>
|
||||||
<div class="metric">
|
<div class="metric">
|
||||||
<span class="metric-label">Database Size</span>
|
<span class="metric-label">Database Size</span>
|
||||||
<span class="metric-value" id="database_size">-</span>
|
<span class="metric-value" id="database_size">-</span>
|
||||||
@ -134,6 +142,14 @@
|
|||||||
<span class="metric-label">Live Routes</span>
|
<span class="metric-label">Live Routes</span>
|
||||||
<span class="metric-value" id="live_routes">-</span>
|
<span class="metric-value" id="live_routes">-</span>
|
||||||
</div>
|
</div>
|
||||||
|
<div class="metric">
|
||||||
|
<span class="metric-label">IPv4 Prefixes</span>
|
||||||
|
<span class="metric-value" id="ipv4_prefixes">-</span>
|
||||||
|
</div>
|
||||||
|
<div class="metric">
|
||||||
|
<span class="metric-label">IPv6 Prefixes</span>
|
||||||
|
<span class="metric-value" id="ipv6_prefixes">-</span>
|
||||||
|
</div>
|
||||||
<div class="metric">
|
<div class="metric">
|
||||||
<span class="metric-label">IPv4 Routes</span>
|
<span class="metric-label">IPv4 Routes</span>
|
||||||
<span class="metric-value" id="ipv4_routes">-</span>
|
<span class="metric-value" id="ipv4_routes">-</span>
|
||||||
@ -269,6 +285,9 @@
|
|||||||
|
|
||||||
// Update all metrics
|
// Update all metrics
|
||||||
document.getElementById('uptime').textContent = data.uptime;
|
document.getElementById('uptime').textContent = data.uptime;
|
||||||
|
document.getElementById('go_version').textContent = data.go_version;
|
||||||
|
document.getElementById('goroutines').textContent = formatNumber(data.goroutines);
|
||||||
|
document.getElementById('memory_usage').textContent = data.memory_usage;
|
||||||
document.getElementById('total_messages').textContent = formatNumber(data.total_messages);
|
document.getElementById('total_messages').textContent = formatNumber(data.total_messages);
|
||||||
document.getElementById('messages_per_sec').textContent = data.messages_per_sec.toFixed(1);
|
document.getElementById('messages_per_sec').textContent = data.messages_per_sec.toFixed(1);
|
||||||
document.getElementById('total_bytes').textContent = formatBytes(data.total_bytes);
|
document.getElementById('total_bytes').textContent = formatBytes(data.total_bytes);
|
||||||
@ -278,6 +297,7 @@
|
|||||||
document.getElementById('ipv4_prefixes').textContent = formatNumber(data.ipv4_prefixes);
|
document.getElementById('ipv4_prefixes').textContent = formatNumber(data.ipv4_prefixes);
|
||||||
document.getElementById('ipv6_prefixes').textContent = formatNumber(data.ipv6_prefixes);
|
document.getElementById('ipv6_prefixes').textContent = formatNumber(data.ipv6_prefixes);
|
||||||
document.getElementById('peerings').textContent = formatNumber(data.peerings);
|
document.getElementById('peerings').textContent = formatNumber(data.peerings);
|
||||||
|
document.getElementById('peers').textContent = formatNumber(data.peers);
|
||||||
document.getElementById('database_size').textContent = formatBytes(data.database_size_bytes);
|
document.getElementById('database_size').textContent = formatBytes(data.database_size_bytes);
|
||||||
document.getElementById('live_routes').textContent = formatNumber(data.live_routes);
|
document.getElementById('live_routes').textContent = formatNumber(data.live_routes);
|
||||||
document.getElementById('ipv4_routes').textContent = formatNumber(data.ipv4_routes);
|
document.getElementById('ipv4_routes').textContent = formatNumber(data.ipv4_routes);
|
||||||
|
Loading…
Reference in New Issue
Block a user