diff --git a/go.mod b/go.mod index aa6bc3c..f1fb0b0 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( ) require ( + github.com/dustin/go-humanize v1.0.1 // indirect go.uber.org/dig v1.19.0 // indirect go.uber.org/multierr v1.10.0 // indirect go.uber.org/zap v1.26.0 // indirect diff --git a/go.sum b/go.sum index 0bffb02..4e183bc 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618= github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= diff --git a/internal/database/database.go b/internal/database/database.go index 5558ff2..bf6f4c3 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -396,6 +396,12 @@ func (d *Database) GetStats() (Stats, error) { return stats, err } + // Count peers + err = d.queryRow("SELECT COUNT(*) FROM bgp_peers").Scan(&stats.Peers) + if err != nil { + return stats, err + } + // Get database file size fileInfo, err := os.Stat(d.path) if err != nil { diff --git a/internal/database/interface.go b/internal/database/interface.go index 3e5b2f4..42bf5d7 100644 --- a/internal/database/interface.go +++ b/internal/database/interface.go @@ -11,6 +11,7 @@ type Stats struct { IPv4Prefixes int IPv6Prefixes int Peerings int + Peers int FileSizeBytes int64 LiveRoutes int IPv4PrefixDistribution []PrefixDistribution diff --git a/internal/routewatch/app_integration_test.go b/internal/routewatch/app_integration_test.go index bc962cf..69c6893 100644 --- a/internal/routewatch/app_integration_test.go +++ b/internal/routewatch/app_integration_test.go @@ -159,6 +159,7 @@ func (m *mockStore) GetStats() (database.Stats, error) { IPv4Prefixes: m.IPv4Prefixes, IPv6Prefixes: m.IPv6Prefixes, Peerings: m.PeeringCount, + Peers: 10, // Mock peer count }, nil } diff --git a/internal/routingtable/routingtable.go b/internal/routingtable/routingtable.go deleted file mode 100644 index cf69a9d..0000000 --- a/internal/routingtable/routingtable.go +++ /dev/null @@ -1,604 +0,0 @@ -// Package routingtable provides a thread-safe in-memory representation of the DFZ routing table. -package routingtable - -import ( - "compress/gzip" - "encoding/json" - "fmt" - "os" - "path/filepath" - "strings" - "sync" - "sync/atomic" - "time" - - "git.eeqj.de/sneak/routewatch/internal/config" - "git.eeqj.de/sneak/routewatch/internal/logger" - "github.com/google/uuid" -) - -const ( - // routeStalenessThreshold is how old a route can be before we consider it stale - // Using 30 minutes as a conservative value for snapshot loading - routeStalenessThreshold = 30 * time.Minute - - // snapshotFilename is the name of the snapshot file - snapshotFilename = "routingtable.json.gz" -) - -// Route represents a single route entry in the routing table -type Route struct { - PrefixID uuid.UUID `json:"prefix_id"` - Prefix string `json:"prefix"` // The actual prefix string (e.g., "10.0.0.0/8") - OriginASNID uuid.UUID `json:"origin_asn_id"` - OriginASN int `json:"origin_asn"` // The actual ASN number - PeerASN int `json:"peer_asn"` - ASPath []int `json:"as_path"` // Full AS path - NextHop string `json:"next_hop"` - AnnouncedAt time.Time `json:"announced_at"` - AddedAt time.Time `json:"added_at"` // When we added this route to our table -} - -// RouteKey uniquely identifies a route in the table -type RouteKey struct { - PrefixID uuid.UUID - OriginASNID uuid.UUID - PeerASN int -} - -// RoutingTable is a thread-safe in-memory routing table -type RoutingTable struct { - mu sync.RWMutex - routes map[RouteKey]*Route - - // Secondary indexes for efficient lookups - byPrefix map[uuid.UUID]map[RouteKey]*Route // Routes indexed by prefix ID - byOriginASN map[uuid.UUID]map[RouteKey]*Route // Routes indexed by origin ASN ID - byPeerASN map[int]map[RouteKey]*Route // Routes indexed by peer ASN - - // Metrics tracking - ipv4Routes int - ipv6Routes int - ipv4Updates uint64 // Updates counter for rate calculation - ipv6Updates uint64 // Updates counter for rate calculation - lastMetricsReset time.Time - - // Configuration - snapshotDir string - routeExpirationTimeout time.Duration - logger *logger.Logger - - // Expiration management - stopExpiration chan struct{} -} - -// New creates a new routing table, loading from snapshot if available -func New(cfg *config.Config, logger *logger.Logger) *RoutingTable { - rt := &RoutingTable{ - routes: make(map[RouteKey]*Route), - byPrefix: make(map[uuid.UUID]map[RouteKey]*Route), - byOriginASN: make(map[uuid.UUID]map[RouteKey]*Route), - byPeerASN: make(map[int]map[RouteKey]*Route), - lastMetricsReset: time.Now(), - snapshotDir: cfg.GetStateDir(), - routeExpirationTimeout: cfg.RouteExpirationTimeout, - logger: logger, - stopExpiration: make(chan struct{}), - } - - // Try to load from snapshot - if err := rt.loadFromSnapshot(logger); err != nil { - logger.Warn("Failed to load routing table from snapshot", "error", err) - } - - // Start expiration goroutine - go rt.expireRoutesLoop() - - return rt -} - -// AddRoute adds or updates a route in the routing table -func (rt *RoutingTable) AddRoute(route *Route) { - rt.mu.Lock() - defer rt.mu.Unlock() - - key := RouteKey{ - PrefixID: route.PrefixID, - OriginASNID: route.OriginASNID, - PeerASN: route.PeerASN, - } - - // If route already exists, remove it from indexes first - if existingRoute, exists := rt.routes[key]; exists { - rt.removeFromIndexes(key, existingRoute) - // Decrement counter for existing route - if isIPv6(existingRoute.Prefix) { - rt.ipv6Routes-- - } else { - rt.ipv4Routes-- - } - } - - // Set AddedAt if not already set - if route.AddedAt.IsZero() { - route.AddedAt = time.Now().UTC() - } - - // Add to main map - rt.routes[key] = route - - // Update indexes - rt.addToIndexes(key, route) - - // Update metrics - if isIPv6(route.Prefix) { - rt.ipv6Routes++ - atomic.AddUint64(&rt.ipv6Updates, 1) - } else { - rt.ipv4Routes++ - atomic.AddUint64(&rt.ipv4Updates, 1) - } -} - -// RemoveRoute removes a route from the routing table -func (rt *RoutingTable) RemoveRoute(prefixID, originASNID uuid.UUID, peerASN int) bool { - rt.mu.Lock() - defer rt.mu.Unlock() - - key := RouteKey{ - PrefixID: prefixID, - OriginASNID: originASNID, - PeerASN: peerASN, - } - - route, exists := rt.routes[key] - if !exists { - return false - } - - // Remove from indexes - rt.removeFromIndexes(key, route) - - // Remove from main map - delete(rt.routes, key) - - // Update metrics - if isIPv6(route.Prefix) { - rt.ipv6Routes-- - atomic.AddUint64(&rt.ipv6Updates, 1) - } else { - rt.ipv4Routes-- - atomic.AddUint64(&rt.ipv4Updates, 1) - } - - return true -} - -// WithdrawRoutesByPrefixAndPeer removes all routes for a specific prefix from a specific peer -func (rt *RoutingTable) WithdrawRoutesByPrefixAndPeer(prefixID uuid.UUID, peerASN int) int { - rt.mu.Lock() - defer rt.mu.Unlock() - - prefixRoutes, exists := rt.byPrefix[prefixID] - if !exists { - return 0 - } - - // Collect keys to delete (can't delete while iterating) - var keysToDelete []RouteKey - for key, route := range prefixRoutes { - if route.PeerASN == peerASN { - keysToDelete = append(keysToDelete, key) - } - } - - // Delete the routes - count := 0 - for _, key := range keysToDelete { - route, exists := rt.routes[key] - if !exists { - continue - } - - rt.removeFromIndexes(key, route) - delete(rt.routes, key) - count++ - - // Update metrics - if isIPv6(route.Prefix) { - rt.ipv6Routes-- - atomic.AddUint64(&rt.ipv6Updates, 1) - } else { - rt.ipv4Routes-- - atomic.AddUint64(&rt.ipv4Updates, 1) - } - } - - return count -} - -// GetRoute retrieves a specific route -func (rt *RoutingTable) GetRoute(prefixID, originASNID uuid.UUID, peerASN int) (*Route, bool) { - rt.mu.RLock() - defer rt.mu.RUnlock() - - key := RouteKey{ - PrefixID: prefixID, - OriginASNID: originASNID, - PeerASN: peerASN, - } - - route, exists := rt.routes[key] - if !exists { - return nil, false - } - - // Return a copy to prevent external modification - routeCopy := *route - - return &routeCopy, true -} - -// GetRoutesByPrefix returns all routes for a specific prefix -func (rt *RoutingTable) GetRoutesByPrefix(prefixID uuid.UUID) []*Route { - rt.mu.RLock() - defer rt.mu.RUnlock() - - routes := make([]*Route, 0) - if prefixRoutes, exists := rt.byPrefix[prefixID]; exists { - for _, route := range prefixRoutes { - routeCopy := *route - routes = append(routes, &routeCopy) - } - } - - return routes -} - -// GetRoutesByOriginASN returns all routes originated by a specific ASN -func (rt *RoutingTable) GetRoutesByOriginASN(originASNID uuid.UUID) []*Route { - rt.mu.RLock() - defer rt.mu.RUnlock() - - routes := make([]*Route, 0) - if asnRoutes, exists := rt.byOriginASN[originASNID]; exists { - for _, route := range asnRoutes { - routeCopy := *route - routes = append(routes, &routeCopy) - } - } - - return routes -} - -// GetRoutesByPeerASN returns all routes received from a specific peer ASN -func (rt *RoutingTable) GetRoutesByPeerASN(peerASN int) []*Route { - rt.mu.RLock() - defer rt.mu.RUnlock() - - routes := make([]*Route, 0) - if peerRoutes, exists := rt.byPeerASN[peerASN]; exists { - for _, route := range peerRoutes { - routeCopy := *route - routes = append(routes, &routeCopy) - } - } - - return routes -} - -// GetAllRoutes returns all active routes in the routing table -func (rt *RoutingTable) GetAllRoutes() []*Route { - rt.mu.RLock() - defer rt.mu.RUnlock() - - routes := make([]*Route, 0, len(rt.routes)) - for _, route := range rt.routes { - routeCopy := *route - routes = append(routes, &routeCopy) - } - - return routes -} - -// Size returns the total number of routes in the table -func (rt *RoutingTable) Size() int { - rt.mu.RLock() - defer rt.mu.RUnlock() - - return len(rt.routes) -} - -// Stats returns statistics about the routing table -func (rt *RoutingTable) Stats() map[string]int { - rt.mu.RLock() - defer rt.mu.RUnlock() - - stats := map[string]int{ - "total_routes": len(rt.routes), - "unique_prefixes": len(rt.byPrefix), - "unique_origins": len(rt.byOriginASN), - "unique_peers": len(rt.byPeerASN), - } - - return stats -} - -// DetailedStats contains detailed routing table statistics -type DetailedStats struct { - IPv4Routes int - IPv6Routes int - IPv4UpdatesRate float64 - IPv6UpdatesRate float64 - TotalRoutes int - UniquePrefixes int - UniqueOrigins int - UniquePeers int -} - -// GetDetailedStats returns detailed statistics including IPv4/IPv6 breakdown and update rates -func (rt *RoutingTable) GetDetailedStats() DetailedStats { - rt.mu.Lock() - defer rt.mu.Unlock() - - // Calculate update rates - elapsed := time.Since(rt.lastMetricsReset).Seconds() - ipv4Updates := atomic.LoadUint64(&rt.ipv4Updates) - ipv6Updates := atomic.LoadUint64(&rt.ipv6Updates) - - stats := DetailedStats{ - IPv4Routes: rt.ipv4Routes, - IPv6Routes: rt.ipv6Routes, - IPv4UpdatesRate: float64(ipv4Updates) / elapsed, - IPv6UpdatesRate: float64(ipv6Updates) / elapsed, - TotalRoutes: len(rt.routes), - UniquePrefixes: len(rt.byPrefix), - UniqueOrigins: len(rt.byOriginASN), - UniquePeers: len(rt.byPeerASN), - } - - // Reset counters for next period - atomic.StoreUint64(&rt.ipv4Updates, 0) - atomic.StoreUint64(&rt.ipv6Updates, 0) - rt.lastMetricsReset = time.Now() - - return stats -} - -// Clear removes all routes from the routing table -func (rt *RoutingTable) Clear() { - rt.mu.Lock() - defer rt.mu.Unlock() - - rt.routes = make(map[RouteKey]*Route) - rt.byPrefix = make(map[uuid.UUID]map[RouteKey]*Route) - rt.byOriginASN = make(map[uuid.UUID]map[RouteKey]*Route) - rt.byPeerASN = make(map[int]map[RouteKey]*Route) - rt.ipv4Routes = 0 - rt.ipv6Routes = 0 - atomic.StoreUint64(&rt.ipv4Updates, 0) - atomic.StoreUint64(&rt.ipv6Updates, 0) - rt.lastMetricsReset = time.Now() -} - -// RLock acquires a read lock on the routing table -// This is exposed for the snapshotter to use -func (rt *RoutingTable) RLock() { - rt.mu.RLock() -} - -// RUnlock releases a read lock on the routing table -// This is exposed for the snapshotter to use -func (rt *RoutingTable) RUnlock() { - rt.mu.RUnlock() -} - -// GetAllRoutesUnsafe returns all routes without copying -// IMPORTANT: Caller must hold RLock before calling this method -func (rt *RoutingTable) GetAllRoutesUnsafe() []*Route { - routes := make([]*Route, 0, len(rt.routes)) - for _, route := range rt.routes { - routes = append(routes, route) - } - - return routes -} - -// Helper methods for index management - -func (rt *RoutingTable) addToIndexes(key RouteKey, route *Route) { - // Add to prefix index - if rt.byPrefix[route.PrefixID] == nil { - rt.byPrefix[route.PrefixID] = make(map[RouteKey]*Route) - } - rt.byPrefix[route.PrefixID][key] = route - - // Add to origin ASN index - if rt.byOriginASN[route.OriginASNID] == nil { - rt.byOriginASN[route.OriginASNID] = make(map[RouteKey]*Route) - } - rt.byOriginASN[route.OriginASNID][key] = route - - // Add to peer ASN index - if rt.byPeerASN[route.PeerASN] == nil { - rt.byPeerASN[route.PeerASN] = make(map[RouteKey]*Route) - } - rt.byPeerASN[route.PeerASN][key] = route -} - -func (rt *RoutingTable) removeFromIndexes(key RouteKey, route *Route) { - // Remove from prefix index - if prefixRoutes, exists := rt.byPrefix[route.PrefixID]; exists { - delete(prefixRoutes, key) - if len(prefixRoutes) == 0 { - delete(rt.byPrefix, route.PrefixID) - } - } - - // Remove from origin ASN index - if asnRoutes, exists := rt.byOriginASN[route.OriginASNID]; exists { - delete(asnRoutes, key) - if len(asnRoutes) == 0 { - delete(rt.byOriginASN, route.OriginASNID) - } - } - - // Remove from peer ASN index - if peerRoutes, exists := rt.byPeerASN[route.PeerASN]; exists { - delete(peerRoutes, key) - if len(peerRoutes) == 0 { - delete(rt.byPeerASN, route.PeerASN) - } - } -} - -// String returns a string representation of the route key -func (k RouteKey) String() string { - return fmt.Sprintf("%s/%s/%d", k.PrefixID, k.OriginASNID, k.PeerASN) -} - -// isIPv6 returns true if the prefix is an IPv6 address -func isIPv6(prefix string) bool { - return strings.Contains(prefix, ":") -} - -// loadFromSnapshot attempts to load the routing table from a snapshot file -func (rt *RoutingTable) loadFromSnapshot(logger *logger.Logger) error { - // If no snapshot directory specified, nothing to load - if rt.snapshotDir == "" { - return nil - } - - snapshotPath := filepath.Join(rt.snapshotDir, snapshotFilename) - - // Check if snapshot file exists - if _, err := os.Stat(snapshotPath); os.IsNotExist(err) { - // No snapshot file exists, this is normal - start with empty routing table - return nil - } - - // Open the snapshot file - file, err := os.Open(filepath.Clean(snapshotPath)) - if err != nil { - return fmt.Errorf("failed to open snapshot file: %w", err) - } - defer func() { _ = file.Close() }() - - // Create gzip reader - gzReader, err := gzip.NewReader(file) - if err != nil { - return fmt.Errorf("failed to create gzip reader: %w", err) - } - defer func() { _ = gzReader.Close() }() - - // Decode the snapshot - var snapshot struct { - Timestamp time.Time `json:"timestamp"` - Stats DetailedStats `json:"stats"` - Routes []*Route `json:"routes"` - } - - decoder := json.NewDecoder(gzReader) - if err := decoder.Decode(&snapshot); err != nil { - return fmt.Errorf("failed to decode snapshot: %w", err) - } - - // Calculate staleness cutoff time - now := time.Now().UTC() - cutoffTime := now.Add(-routeStalenessThreshold) - - // Load non-stale routes - loadedCount := 0 - staleCount := 0 - - for _, route := range snapshot.Routes { - // Check if route is stale based on AddedAt time - if route.AddedAt.Before(cutoffTime) { - staleCount++ - - continue - } - - // Add the route (this will update counters and indexes) - rt.AddRoute(route) - loadedCount++ - } - - logger.Info("Loaded routing table from snapshot", - "snapshot_time", snapshot.Timestamp, - "loaded_routes", loadedCount, - "stale_routes", staleCount, - "total_routes_in_snapshot", len(snapshot.Routes), - ) - - return nil -} - -// expireRoutesLoop periodically removes expired routes -func (rt *RoutingTable) expireRoutesLoop() { - // Run every minute to check for expired routes - ticker := time.NewTicker(1 * time.Minute) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - rt.expireStaleRoutes() - case <-rt.stopExpiration: - return - } - } -} - -// expireStaleRoutes removes routes that haven't been updated recently -func (rt *RoutingTable) expireStaleRoutes() { - rt.mu.Lock() - defer rt.mu.Unlock() - - now := time.Now().UTC() - cutoffTime := now.Add(-rt.routeExpirationTimeout) - expiredCount := 0 - - // Collect keys to delete (can't delete while iterating) - var keysToDelete []RouteKey - for key, route := range rt.routes { - // Use AnnouncedAt as the last update time - if route.AnnouncedAt.Before(cutoffTime) { - keysToDelete = append(keysToDelete, key) - } - } - - // Delete expired routes - for _, key := range keysToDelete { - route, exists := rt.routes[key] - if !exists { - continue - } - - rt.removeFromIndexes(key, route) - delete(rt.routes, key) - expiredCount++ - - // Update metrics - if isIPv6(route.Prefix) { - rt.ipv6Routes-- - } else { - rt.ipv4Routes-- - } - } - - if expiredCount > 0 { - rt.logger.Info("Expired stale routes", - "count", expiredCount, - "timeout", rt.routeExpirationTimeout, - "remaining_routes", len(rt.routes), - ) - } -} - -// Stop gracefully stops the routing table background tasks -func (rt *RoutingTable) Stop() { - if rt.stopExpiration != nil { - close(rt.stopExpiration) - } -} diff --git a/internal/routingtable/routingtable_test.go b/internal/routingtable/routingtable_test.go deleted file mode 100644 index d20c3d6..0000000 --- a/internal/routingtable/routingtable_test.go +++ /dev/null @@ -1,245 +0,0 @@ -package routingtable - -import ( - "sync" - "testing" - "time" - - "git.eeqj.de/sneak/routewatch/internal/config" - "git.eeqj.de/sneak/routewatch/internal/logger" - "github.com/google/uuid" -) - -func TestRoutingTable(t *testing.T) { - // Create a test logger - logger := logger.New() - - // Create test config with empty state dir (no snapshot loading) - cfg := &config.Config{ - StateDir: "", - } - - rt := New(cfg, logger) - - // Test data - prefixID1 := uuid.New() - prefixID2 := uuid.New() - originASNID1 := uuid.New() - originASNID2 := uuid.New() - - route1 := &Route{ - PrefixID: prefixID1, - Prefix: "10.0.0.0/8", - OriginASNID: originASNID1, - OriginASN: 64512, - PeerASN: 64513, - ASPath: []int{64513, 64512}, - NextHop: "192.168.1.1", - AnnouncedAt: time.Now(), - } - - route2 := &Route{ - PrefixID: prefixID2, - Prefix: "192.168.0.0/16", - OriginASNID: originASNID2, - OriginASN: 64514, - PeerASN: 64513, - ASPath: []int{64513, 64514}, - NextHop: "192.168.1.1", - AnnouncedAt: time.Now(), - } - - // Test AddRoute - rt.AddRoute(route1) - rt.AddRoute(route2) - - if rt.Size() != 2 { - t.Errorf("Expected 2 routes, got %d", rt.Size()) - } - - // Test GetRoute - retrievedRoute, exists := rt.GetRoute(prefixID1, originASNID1, 64513) - if !exists { - t.Error("Route 1 should exist") - } - if retrievedRoute.Prefix != "10.0.0.0/8" { - t.Errorf("Expected prefix 10.0.0.0/8, got %s", retrievedRoute.Prefix) - } - - // Test GetRoutesByPrefix - prefixRoutes := rt.GetRoutesByPrefix(prefixID1) - if len(prefixRoutes) != 1 { - t.Errorf("Expected 1 route for prefix, got %d", len(prefixRoutes)) - } - - // Test GetRoutesByPeerASN - peerRoutes := rt.GetRoutesByPeerASN(64513) - if len(peerRoutes) != 2 { - t.Errorf("Expected 2 routes from peer 64513, got %d", len(peerRoutes)) - } - - // Test RemoveRoute - removed := rt.RemoveRoute(prefixID1, originASNID1, 64513) - if !removed { - t.Error("Route should have been removed") - } - if rt.Size() != 1 { - t.Errorf("Expected 1 route after removal, got %d", rt.Size()) - } - - // Test WithdrawRoutesByPrefixAndPeer - // Add the route back first - rt.AddRoute(route1) - - // Add another route for the same prefix from the same peer - route3 := &Route{ - PrefixID: prefixID1, - Prefix: "10.0.0.0/8", - OriginASNID: originASNID2, // Different origin - OriginASN: 64515, - PeerASN: 64513, - ASPath: []int{64513, 64515}, - NextHop: "192.168.1.1", - AnnouncedAt: time.Now(), - } - rt.AddRoute(route3) - - count := rt.WithdrawRoutesByPrefixAndPeer(prefixID1, 64513) - if count != 2 { - t.Errorf("Expected to withdraw 2 routes, withdrew %d", count) - } - - // Should only have route2 left - if rt.Size() != 1 { - t.Errorf("Expected 1 route after withdrawal, got %d", rt.Size()) - } - - // Test Stats - stats := rt.Stats() - if stats["total_routes"] != 1 { - t.Errorf("Expected 1 total route in stats, got %d", stats["total_routes"]) - } - - // Test Clear - rt.Clear() - if rt.Size() != 0 { - t.Errorf("Expected 0 routes after clear, got %d", rt.Size()) - } -} - -func TestRoutingTableConcurrency(t *testing.T) { - // Create a test logger - logger := logger.New() - - // Create test config with empty state dir (no snapshot loading) - cfg := &config.Config{ - StateDir: "", - } - - rt := New(cfg, logger) - - // Test concurrent access - var wg sync.WaitGroup - numGoroutines := 10 - numOperations := 100 - - // Start multiple goroutines that add/remove routes - for i := 0; i < numGoroutines; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - - for j := 0; j < numOperations; j++ { - prefixID := uuid.New() - originASNID := uuid.New() - - route := &Route{ - PrefixID: prefixID, - Prefix: "10.0.0.0/8", - OriginASNID: originASNID, - OriginASN: 64512 + id, - PeerASN: 64500, - ASPath: []int{64500, 64512 + id}, - NextHop: "192.168.1.1", - AnnouncedAt: time.Now(), - } - - // Add route - rt.AddRoute(route) - - // Try to get it - _, _ = rt.GetRoute(prefixID, originASNID, 64500) - - // Get stats - _ = rt.Stats() - - // Remove it - rt.RemoveRoute(prefixID, originASNID, 64500) - } - }(i) - } - - wg.Wait() - - // Table should be empty after all operations - if rt.Size() != 0 { - t.Errorf("Expected empty table after concurrent operations, got %d routes", rt.Size()) - } -} - -func TestRouteUpdate(t *testing.T) { - // Create a test logger - logger := logger.New() - - // Create test config with empty state dir (no snapshot loading) - cfg := &config.Config{ - StateDir: "", - } - - rt := New(cfg, logger) - - prefixID := uuid.New() - originASNID := uuid.New() - - route1 := &Route{ - PrefixID: prefixID, - Prefix: "10.0.0.0/8", - OriginASNID: originASNID, - OriginASN: 64512, - PeerASN: 64513, - ASPath: []int{64513, 64512}, - NextHop: "192.168.1.1", - AnnouncedAt: time.Now(), - } - - // Add initial route - rt.AddRoute(route1) - - // Update the same route with new next hop - route2 := &Route{ - PrefixID: prefixID, - Prefix: "10.0.0.0/8", - OriginASNID: originASNID, - OriginASN: 64512, - PeerASN: 64513, - ASPath: []int{64513, 64512}, - NextHop: "192.168.1.2", // Changed - AnnouncedAt: time.Now().Add(1 * time.Minute), - } - - rt.AddRoute(route2) - - // Should still have only 1 route - if rt.Size() != 1 { - t.Errorf("Expected 1 route after update, got %d", rt.Size()) - } - - // Check that the route was updated - retrievedRoute, exists := rt.GetRoute(prefixID, originASNID, 64513) - if !exists { - t.Error("Route should exist after update") - } - if retrievedRoute.NextHop != "192.168.1.2" { - t.Errorf("Expected updated next hop 192.168.1.2, got %s", retrievedRoute.NextHop) - } -} diff --git a/internal/server/server.go b/internal/server/server.go index 0e04a91..e82c037 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -6,12 +6,14 @@ import ( "encoding/json" "net/http" "os" + "runtime" "time" "git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/logger" "git.eeqj.de/sneak/routewatch/internal/streamer" "git.eeqj.de/sneak/routewatch/internal/templates" + "github.com/dustin/go-humanize" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" ) @@ -117,11 +119,15 @@ func (s *Server) handleStatusJSON() http.HandlerFunc { MessagesPerSec float64 `json:"messages_per_sec"` MbitsPerSec float64 `json:"mbits_per_sec"` Connected bool `json:"connected"` + GoVersion string `json:"go_version"` + Goroutines int `json:"goroutines"` + MemoryUsage string `json:"memory_usage"` ASNs int `json:"asns"` Prefixes int `json:"prefixes"` IPv4Prefixes int `json:"ipv4_prefixes"` IPv6Prefixes int `json:"ipv6_prefixes"` Peerings int `json:"peerings"` + Peers int `json:"peers"` DatabaseSizeBytes int64 `json:"database_size_bytes"` LiveRoutes int `json:"live_routes"` IPv4Routes int `json:"ipv4_routes"` @@ -203,6 +209,10 @@ func (s *Server) handleStatusJSON() http.HandlerFunc { // Get route update metrics routeMetrics := s.streamer.GetMetricsTracker().GetRouteMetrics() + // Get memory stats + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + stats := Stats{ Uptime: uptime, TotalMessages: metrics.TotalMessages, @@ -210,11 +220,15 @@ func (s *Server) handleStatusJSON() http.HandlerFunc { MessagesPerSec: metrics.MessagesPerSec, MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit, Connected: metrics.Connected, + GoVersion: runtime.Version(), + Goroutines: runtime.NumGoroutine(), + MemoryUsage: humanize.Bytes(memStats.Alloc), ASNs: dbStats.ASNs, Prefixes: dbStats.Prefixes, IPv4Prefixes: dbStats.IPv4Prefixes, IPv6Prefixes: dbStats.IPv6Prefixes, Peerings: dbStats.Peerings, + Peers: dbStats.Peers, DatabaseSizeBytes: dbStats.FileSizeBytes, LiveRoutes: dbStats.LiveRoutes, IPv4Routes: ipv4Routes, @@ -258,11 +272,15 @@ func (s *Server) handleStats() http.HandlerFunc { MessagesPerSec float64 `json:"messages_per_sec"` MbitsPerSec float64 `json:"mbits_per_sec"` Connected bool `json:"connected"` + GoVersion string `json:"go_version"` + Goroutines int `json:"goroutines"` + MemoryUsage string `json:"memory_usage"` ASNs int `json:"asns"` Prefixes int `json:"prefixes"` IPv4Prefixes int `json:"ipv4_prefixes"` IPv6Prefixes int `json:"ipv6_prefixes"` Peerings int `json:"peerings"` + Peers int `json:"peers"` DatabaseSizeBytes int64 `json:"database_size_bytes"` LiveRoutes int `json:"live_routes"` IPv4Routes int `json:"ipv4_routes"` @@ -355,6 +373,10 @@ func (s *Server) handleStats() http.HandlerFunc { }) } + // Get memory stats + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + stats := StatsResponse{ Uptime: uptime, TotalMessages: metrics.TotalMessages, @@ -362,11 +384,15 @@ func (s *Server) handleStats() http.HandlerFunc { MessagesPerSec: metrics.MessagesPerSec, MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit, Connected: metrics.Connected, + GoVersion: runtime.Version(), + Goroutines: runtime.NumGoroutine(), + MemoryUsage: humanize.Bytes(memStats.Alloc), ASNs: dbStats.ASNs, Prefixes: dbStats.Prefixes, IPv4Prefixes: dbStats.IPv4Prefixes, IPv6Prefixes: dbStats.IPv6Prefixes, Peerings: dbStats.Peerings, + Peers: dbStats.Peers, DatabaseSizeBytes: dbStats.FileSizeBytes, LiveRoutes: dbStats.LiveRoutes, IPv4Routes: ipv4Routes, diff --git a/internal/snapshotter/snapshotter.go b/internal/snapshotter/snapshotter.go deleted file mode 100644 index 5b4f88c..0000000 --- a/internal/snapshotter/snapshotter.go +++ /dev/null @@ -1,242 +0,0 @@ -// Package snapshotter provides functionality for creating periodic and on-demand -// snapshots of the routing table state. -package snapshotter - -import ( - "compress/gzip" - "context" - "encoding/json" - "fmt" - "git.eeqj.de/sneak/routewatch/internal/logger" - "os" - "path/filepath" - "sync" - "time" - - "git.eeqj.de/sneak/routewatch/internal/config" - "git.eeqj.de/sneak/routewatch/internal/routingtable" -) - -const ( - snapshotInterval = 10 * time.Minute - snapshotFilename = "routingtable.json.gz" - tempFileSuffix = ".tmp" -) - -// Snapshotter handles periodic and on-demand snapshots of the routing table -type Snapshotter struct { - rt *routingtable.RoutingTable - stateDir string - logger *logger.Logger - ctx context.Context - cancel context.CancelFunc - mu sync.Mutex // Ensures only one snapshot runs at a time - wg sync.WaitGroup - lastSnapshot time.Time -} - -// New creates a new Snapshotter instance -func New(rt *routingtable.RoutingTable, cfg *config.Config, logger *logger.Logger) (*Snapshotter, error) { - stateDir := cfg.GetStateDir() - - // If state directory is specified, ensure it exists - if stateDir != "" { - const stateDirPerms = 0750 - if err := os.MkdirAll(stateDir, stateDirPerms); err != nil { - return nil, fmt.Errorf("failed to create snapshot directory: %w", err) - } - } - - s := &Snapshotter{ - rt: rt, - stateDir: stateDir, - logger: logger, - } - - return s, nil -} - -// Start begins the periodic snapshot process -func (s *Snapshotter) Start(ctx context.Context) { - s.mu.Lock() - defer s.mu.Unlock() - - if s.ctx != nil { - // Already started - return - } - - ctx, cancel := context.WithCancel(ctx) - s.ctx = ctx - s.cancel = cancel - - // Start periodic snapshot goroutine - s.wg.Add(1) - go s.periodicSnapshot() -} - -// periodicSnapshot runs periodic snapshots -func (s *Snapshotter) periodicSnapshot() { - defer s.wg.Done() - - ticker := time.NewTicker(snapshotInterval) - defer ticker.Stop() - - // Wait for the first interval before taking any snapshots - for { - select { - case <-s.ctx.Done(): - return - case <-ticker.C: - if err := s.TakeSnapshot(); err != nil { - s.logger.Error("Failed to take periodic snapshot", "error", err) - } - } - } -} - -// TakeSnapshot creates a snapshot of the current routing table state -func (s *Snapshotter) TakeSnapshot() error { - // Can't take snapshot without a state directory - if s.stateDir == "" { - return nil - } - - // Ensure only one snapshot runs at a time - s.mu.Lock() - defer s.mu.Unlock() - - start := time.Now() - s.logger.Info("Starting routing table snapshot") - - // Get a copy of all routes while holding read lock - copyStart := time.Now() - s.rt.RLock() - routes := s.rt.GetAllRoutesUnsafe() // We'll need to add this method - s.rt.RUnlock() - - // Get stats separately to avoid deadlock - stats := s.rt.GetDetailedStats() - - s.logger.Info("Copied routes from routing table", - "duration", time.Since(copyStart), - "route_count", len(routes)) - - // Create snapshot data structure - snapshot := struct { - Timestamp time.Time `json:"timestamp"` - Stats routingtable.DetailedStats `json:"stats"` - Routes []*routingtable.Route `json:"routes"` - }{ - Timestamp: time.Now().UTC(), - Stats: stats, - Routes: routes, - } - - // Serialize to JSON - marshalStart := time.Now() - jsonData, err := json.Marshal(snapshot) - if err != nil { - return fmt.Errorf("failed to marshal snapshot: %w", err) - } - s.logger.Info("Marshaled snapshot to JSON", - "duration", time.Since(marshalStart), - "size_bytes", len(jsonData)) - - // Write compressed data to temporary file - tempPath := filepath.Join(s.stateDir, snapshotFilename+tempFileSuffix) - finalPath := filepath.Join(s.stateDir, snapshotFilename) - - // Clean the paths to avoid any path traversal issues - tempPath = filepath.Clean(tempPath) - finalPath = filepath.Clean(finalPath) - - tempFile, err := os.Create(tempPath) - if err != nil { - return fmt.Errorf("failed to create temporary file: %w", err) - } - defer func() { - _ = tempFile.Close() - // Clean up temp file if it still exists - _ = os.Remove(tempPath) - }() - - // Create gzip writer - gzipWriter := gzip.NewWriter(tempFile) - gzipWriter.Comment = fmt.Sprintf("RouteWatch snapshot taken at %s", snapshot.Timestamp.Format(time.RFC3339)) - - // Write compressed data - writeStart := time.Now() - if _, err := gzipWriter.Write(jsonData); err != nil { - return fmt.Errorf("failed to write compressed data: %w", err) - } - - // Close gzip writer to flush all data - if err := gzipWriter.Close(); err != nil { - return fmt.Errorf("failed to close gzip writer: %w", err) - } - - // Sync to disk - if err := tempFile.Sync(); err != nil { - return fmt.Errorf("failed to sync temporary file: %w", err) - } - - // Close temp file before rename - if err := tempFile.Close(); err != nil { - return fmt.Errorf("failed to close temporary file: %w", err) - } - - // Atomically rename temp file to final location - if err := os.Rename(tempPath, finalPath); err != nil { - return fmt.Errorf("failed to rename temporary file: %w", err) - } - s.logger.Info("Wrote compressed snapshot to disk", - "duration", time.Since(writeStart)) - - duration := time.Since(start) - s.lastSnapshot = time.Now() - - s.logger.Info("Routing table snapshot completed", - "duration", duration, - "routes", len(routes), - "ipv4_routes", stats.IPv4Routes, - "ipv6_routes", stats.IPv6Routes, - "size_bytes", len(jsonData), - "path", finalPath, - ) - - return nil -} - -// Shutdown performs a final snapshot and cleans up resources -func (s *Snapshotter) Shutdown() error { - s.logger.Info("Shutting down snapshotter") - - // Cancel context to stop periodic snapshots - if s.cancel != nil { - s.cancel() - } - - // Wait for periodic snapshot goroutine to finish - s.wg.Wait() - - // Take final snapshot - if err := s.TakeSnapshot(); err != nil { - return fmt.Errorf("failed to take final snapshot: %w", err) - } - - return nil -} - -// GetLastSnapshotTime returns the time of the last successful snapshot -func (s *Snapshotter) GetLastSnapshotTime() time.Time { - s.mu.Lock() - defer s.mu.Unlock() - - return s.lastSnapshot -} - -// GetSnapshotPath returns the path to the snapshot file -func (s *Snapshotter) GetSnapshotPath() string { - return filepath.Join(s.stateDir, snapshotFilename) -} diff --git a/internal/templates/status.html b/internal/templates/status.html index a8c4296..974b7d5 100644 --- a/internal/templates/status.html +++ b/internal/templates/status.html @@ -69,7 +69,7 @@