Compare commits

...

2 Commits

Author SHA1 Message Date
d15a5e91b9 Inject Config as dependency for database, routing table, and snapshotter
- Remove old database Config struct and related functions
- Update database.New() to accept config.Config parameter
- Update routingtable.New() to accept config.Config parameter
- Update snapshotter.New() to accept config.Config parameter
- Simplify fx module providers in app.go
- Fix truthiness check for environment variables
- Handle empty state directory gracefully in routing table and snapshotter
- Update all tests to use empty state directory for testing
2025-07-28 00:55:09 +02:00
1a0622efaa Add handler queue metrics to status page
- Add GetHandlerStats() method to streamer to expose handler metrics
- Include queue length/capacity, processed/dropped counts, timing stats
- Update API to include handler_stats in response
- Add dynamic handler stats display to status page HTML
- Shows separate status box for each handler with all metrics
2025-07-28 00:32:37 +02:00
10 changed files with 319 additions and 191 deletions

91
internal/config/config.go Normal file
View File

@ -0,0 +1,91 @@
// Package config provides centralized configuration management for RouteWatch
package config
import (
"fmt"
"os"
"path/filepath"
"runtime"
"time"
)
const (
// AppIdentifier is the reverse domain name identifier for the app
AppIdentifier = "berlin.sneak.app.routewatch"
// dirPermissions for creating directories
dirPermissions = 0750 // rwxr-x---
)
// Config holds configuration for the entire application
type Config struct {
// StateDir is the directory for all application state (database, snapshots)
StateDir string
// MaxRuntime is the maximum runtime (0 = run forever)
MaxRuntime time.Duration
}
// New creates a new Config with default paths based on the OS
func New() (*Config, error) {
stateDir, err := getStateDirectory()
if err != nil {
return nil, fmt.Errorf("failed to determine state directory: %w", err)
}
return &Config{
StateDir: stateDir,
MaxRuntime: 0, // Run forever by default
}, nil
}
// GetStateDir returns the state directory path
func (c *Config) GetStateDir() string {
return c.StateDir
}
// getStateDirectory returns the appropriate state directory based on the OS
func getStateDirectory() (string, error) {
switch runtime.GOOS {
case "darwin":
// macOS: ~/Library/Application Support/berlin.sneak.app.routewatch
home, err := os.UserHomeDir()
if err != nil {
return "", err
}
return filepath.Join(home, "Library", "Application Support", AppIdentifier), nil
case "linux", "freebsd", "openbsd", "netbsd":
// Unix-like: /var/lib/berlin.sneak.app.routewatch if root, else XDG_DATA_HOME
if os.Geteuid() == 0 {
return filepath.Join("/var/lib", AppIdentifier), nil
}
// Check XDG_DATA_HOME first
if xdgData := os.Getenv("XDG_DATA_HOME"); xdgData != "" {
return filepath.Join(xdgData, AppIdentifier), nil
}
// Fall back to ~/.local/share/berlin.sneak.app.routewatch
home, err := os.UserHomeDir()
if err != nil {
return "", err
}
return filepath.Join(home, ".local", "share", AppIdentifier), nil
default:
return "", fmt.Errorf("unsupported operating system: %s", runtime.GOOS)
}
}
// EnsureDirectories creates all necessary directories if they don't exist
func (c *Config) EnsureDirectories() error {
// Ensure state directory exists
if err := os.MkdirAll(c.StateDir, dirPermissions); err != nil {
return fmt.Errorf("failed to create state directory: %w", err)
}
return nil
}

View File

@ -8,9 +8,9 @@ import (
"log/slog" "log/slog"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"time" "time"
"git.eeqj.de/sneak/routewatch/internal/config"
"git.eeqj.de/sneak/routewatch/pkg/asinfo" "git.eeqj.de/sneak/routewatch/pkg/asinfo"
"github.com/google/uuid" "github.com/google/uuid"
_ "github.com/mattn/go-sqlite3" // CGO SQLite driver _ "github.com/mattn/go-sqlite3" // CGO SQLite driver
@ -28,77 +28,22 @@ type Database struct {
path string path string
} }
// Config holds database configuration
type Config struct {
Path string
}
// getDefaultDatabasePath returns the appropriate database path for the OS
func getDefaultDatabasePath() string {
const dbFilename = "db.sqlite"
switch runtime.GOOS {
case "darwin":
// macOS: ~/Library/Application Support/berlin.sneak.app.routewatch/db.sqlite
home, err := os.UserHomeDir()
if err != nil {
return dbFilename
}
appSupport := filepath.Join(home, "Library", "Application Support", "berlin.sneak.app.routewatch")
if err := os.MkdirAll(appSupport, dirPermissions); err != nil {
return dbFilename
}
return filepath.Join(appSupport, dbFilename)
default:
// Linux and others: /var/lib/routewatch/db.sqlite
dbDir := "/var/lib/routewatch"
if err := os.MkdirAll(dbDir, dirPermissions); err != nil {
// Fall back to user's home directory if can't create system directory
home, err := os.UserHomeDir()
if err != nil {
return dbFilename
}
userDir := filepath.Join(home, ".local", "share", "routewatch")
if err := os.MkdirAll(userDir, dirPermissions); err != nil {
return dbFilename
}
return filepath.Join(userDir, dbFilename)
}
return filepath.Join(dbDir, dbFilename)
}
}
// NewConfig provides default database configuration
func NewConfig() Config {
return Config{
Path: getDefaultDatabasePath(),
}
}
// New creates a new database connection and initializes the schema. // New creates a new database connection and initializes the schema.
func New(logger *slog.Logger) (*Database, error) { func New(cfg *config.Config, logger *slog.Logger) (*Database, error) {
config := NewConfig() dbPath := filepath.Join(cfg.GetStateDir(), "db.sqlite")
return NewWithConfig(config, logger)
}
// NewWithConfig creates a new database connection with custom configuration
func NewWithConfig(config Config, logger *slog.Logger) (*Database, error) {
// Log database path // Log database path
logger.Info("Opening database", "path", config.Path) logger.Info("Opening database", "path", dbPath)
// Ensure directory exists // Ensure directory exists
dir := filepath.Dir(config.Path) dir := filepath.Dir(dbPath)
if err := os.MkdirAll(dir, dirPermissions); err != nil { if err := os.MkdirAll(dir, dirPermissions); err != nil {
return nil, fmt.Errorf("failed to create database directory: %w", err) return nil, fmt.Errorf("failed to create database directory: %w", err)
} }
// Add connection parameters for go-sqlite3 // Add connection parameters for go-sqlite3
// Enable WAL mode and other performance optimizations // Enable WAL mode and other performance optimizations
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&cache=shared", config.Path) dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&cache=shared", dbPath)
db, err := sql.Open("sqlite3", dsn) db, err := sql.Open("sqlite3", dsn)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err) return nil, fmt.Errorf("failed to open database: %w", err)
@ -113,7 +58,7 @@ func NewWithConfig(config Config, logger *slog.Logger) (*Database, error) {
db.SetMaxIdleConns(1) db.SetMaxIdleConns(1)
db.SetConnMaxLifetime(0) db.SetConnMaxLifetime(0)
database := &Database{db: db, logger: logger, path: config.Path} database := &Database{db: db, logger: logger, path: dbPath}
if err := database.Initialize(); err != nil { if err := database.Initialize(); err != nil {
return nil, fmt.Errorf("failed to initialize database: %w", err) return nil, fmt.Errorf("failed to initialize database: %w", err)

View File

@ -11,6 +11,7 @@ import (
"sync" "sync"
"time" "time"
"git.eeqj.de/sneak/routewatch/internal/config"
"git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/metrics" "git.eeqj.de/sneak/routewatch/internal/metrics"
"git.eeqj.de/sneak/routewatch/internal/routingtable" "git.eeqj.de/sneak/routewatch/internal/routingtable"
@ -21,23 +22,11 @@ import (
"go.uber.org/fx" "go.uber.org/fx"
) )
// Config contains runtime configuration for RouteWatch
type Config struct {
MaxRuntime time.Duration // Maximum runtime (0 = run forever)
}
const ( const (
// routingTableStatsInterval is how often we log routing table statistics // routingTableStatsInterval is how often we log routing table statistics
routingTableStatsInterval = 15 * time.Second routingTableStatsInterval = 15 * time.Second
) )
// NewConfig provides default configuration
func NewConfig() Config {
return Config{
MaxRuntime: 0, // Run forever by default
}
}
// Dependencies contains all dependencies for RouteWatch // Dependencies contains all dependencies for RouteWatch
type Dependencies struct { type Dependencies struct {
fx.In fx.In
@ -47,7 +36,7 @@ type Dependencies struct {
Streamer *streamer.Streamer Streamer *streamer.Streamer
Server *server.Server Server *server.Server
Logger *slog.Logger Logger *slog.Logger
Config Config `optional:"true"` Config *config.Config
} }
// RouteWatch represents the main application instance // RouteWatch represents the main application instance
@ -63,6 +52,17 @@ type RouteWatch struct {
mu sync.Mutex mu sync.Mutex
} }
// isTruthy returns true if the value is considered truthy
// Empty string, "0", and "false" are considered falsy, everything else is truthy
func isTruthy(value string) bool {
return value != "" && value != "0" && value != "false"
}
// isSnapshotterEnabled checks if the snapshotter should be enabled based on environment variable
func isSnapshotterEnabled() bool {
return !isTruthy(os.Getenv("ROUTEWATCH_DISABLE_SNAPSHOTTER"))
}
// New creates a new RouteWatch instance // New creates a new RouteWatch instance
func New(deps Dependencies) *RouteWatch { func New(deps Dependencies) *RouteWatch {
rw := &RouteWatch{ rw := &RouteWatch{
@ -74,9 +74,9 @@ func New(deps Dependencies) *RouteWatch {
maxRuntime: deps.Config.MaxRuntime, maxRuntime: deps.Config.MaxRuntime,
} }
// Create snapshotter unless disabled (for tests) // Create snapshotter if enabled
if os.Getenv("ROUTEWATCH_DISABLE_SNAPSHOTTER") != "1" { if isSnapshotterEnabled() {
snap, err := snapshotter.New(deps.RoutingTable, deps.Logger) snap, err := snapshotter.New(deps.RoutingTable, deps.Config, deps.Logger)
if err != nil { if err != nil {
deps.Logger.Error("Failed to create snapshotter", "error", err) deps.Logger.Error("Failed to create snapshotter", "error", err)
// Continue without snapshotter // Continue without snapshotter
@ -235,13 +235,10 @@ func getModule() fx.Option {
return fx.Options( return fx.Options(
fx.Provide( fx.Provide(
NewLogger, NewLogger,
NewConfig, config.New,
metrics.New, metrics.New,
database.New,
fx.Annotate( fx.Annotate(
func(db *database.Database) database.Store { database.New,
return db
},
fx.As(new(database.Store)), fx.As(new(database.Store)),
), ),
routingtable.New, routingtable.New,

View File

@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"git.eeqj.de/sneak/routewatch/internal/config"
"git.eeqj.de/sneak/routewatch/internal/database" "git.eeqj.de/sneak/routewatch/internal/database"
"git.eeqj.de/sneak/routewatch/internal/metrics" "git.eeqj.de/sneak/routewatch/internal/metrics"
"git.eeqj.de/sneak/routewatch/internal/routingtable" "git.eeqj.de/sneak/routewatch/internal/routingtable"
@ -171,8 +172,14 @@ func TestRouteWatchLiveFeed(t *testing.T) {
// Create streamer // Create streamer
s := streamer.New(logger, metricsTracker) s := streamer.New(logger, metricsTracker)
// Create test config with empty state dir (no snapshot loading)
cfg := &config.Config{
StateDir: "",
MaxRuntime: 5 * time.Second,
}
// Create routing table // Create routing table
rt := routingtable.New(logger) rt := routingtable.New(cfg, logger)
// Create server // Create server
srv := server.New(mockDB, rt, s, logger) srv := server.New(mockDB, rt, s, logger)
@ -184,9 +191,7 @@ func TestRouteWatchLiveFeed(t *testing.T) {
Streamer: s, Streamer: s,
Server: srv, Server: srv,
Logger: logger, Logger: logger,
Config: Config{ Config: cfg,
MaxRuntime: 5 * time.Second,
},
} }
rw := New(deps) rw := New(deps)

View File

@ -8,12 +8,12 @@ import (
"log/slog" "log/slog"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"git.eeqj.de/sneak/routewatch/internal/config"
"github.com/google/uuid" "github.com/google/uuid"
) )
@ -23,7 +23,7 @@ const (
routeStalenessThreshold = 30 * time.Minute routeStalenessThreshold = 30 * time.Minute
// snapshotFilename is the name of the snapshot file // snapshotFilename is the name of the snapshot file
snapshotFilename = "routewatch-snapshot.json.gz" snapshotFilename = "routingtable.json.gz"
) )
// Route represents a single route entry in the routing table // Route represents a single route entry in the routing table
@ -62,16 +62,20 @@ type RoutingTable struct {
ipv4Updates uint64 // Updates counter for rate calculation ipv4Updates uint64 // Updates counter for rate calculation
ipv6Updates uint64 // Updates counter for rate calculation ipv6Updates uint64 // Updates counter for rate calculation
lastMetricsReset time.Time lastMetricsReset time.Time
// Configuration
snapshotDir string
} }
// New creates a new routing table, loading from snapshot if available // New creates a new routing table, loading from snapshot if available
func New(logger *slog.Logger) *RoutingTable { func New(cfg *config.Config, logger *slog.Logger) *RoutingTable {
rt := &RoutingTable{ rt := &RoutingTable{
routes: make(map[RouteKey]*Route), routes: make(map[RouteKey]*Route),
byPrefix: make(map[uuid.UUID]map[RouteKey]*Route), byPrefix: make(map[uuid.UUID]map[RouteKey]*Route),
byOriginASN: make(map[uuid.UUID]map[RouteKey]*Route), byOriginASN: make(map[uuid.UUID]map[RouteKey]*Route),
byPeerASN: make(map[int]map[RouteKey]*Route), byPeerASN: make(map[int]map[RouteKey]*Route),
lastMetricsReset: time.Now(), lastMetricsReset: time.Now(),
snapshotDir: cfg.GetStateDir(),
} }
// Try to load from snapshot // Try to load from snapshot
@ -447,51 +451,18 @@ func isIPv6(prefix string) bool {
return strings.Contains(prefix, ":") return strings.Contains(prefix, ":")
} }
// getStateDirectory returns the appropriate state directory based on the OS
func getStateDirectory() (string, error) {
switch runtime.GOOS {
case "darwin":
// macOS: Use ~/Library/Application Support/routewatch
home, err := os.UserHomeDir()
if err != nil {
return "", err
}
return filepath.Join(home, "Library", "Application Support", "routewatch"), nil
case "linux", "freebsd", "openbsd", "netbsd":
// Unix-like: Use /var/lib/routewatch if running as root, otherwise use XDG_STATE_HOME
if os.Geteuid() == 0 {
return "/var/lib/routewatch", nil
}
// Check XDG_STATE_HOME first
if xdgState := os.Getenv("XDG_STATE_HOME"); xdgState != "" {
return filepath.Join(xdgState, "routewatch"), nil
}
// Fall back to ~/.local/state/routewatch
home, err := os.UserHomeDir()
if err != nil {
return "", err
}
return filepath.Join(home, ".local", "state", "routewatch"), nil
default:
return "", fmt.Errorf("unsupported operating system: %s", runtime.GOOS)
}
}
// loadFromSnapshot attempts to load the routing table from a snapshot file // loadFromSnapshot attempts to load the routing table from a snapshot file
func (rt *RoutingTable) loadFromSnapshot(logger *slog.Logger) error { func (rt *RoutingTable) loadFromSnapshot(logger *slog.Logger) error {
stateDir, err := getStateDirectory() // If no snapshot directory specified, nothing to load
if err != nil { if rt.snapshotDir == "" {
return fmt.Errorf("failed to determine state directory: %w", err) return nil
} }
snapshotPath := filepath.Join(stateDir, snapshotFilename) snapshotPath := filepath.Join(rt.snapshotDir, snapshotFilename)
// Check if snapshot file exists // Check if snapshot file exists
if _, err := os.Stat(snapshotPath); os.IsNotExist(err) { if _, err := os.Stat(snapshotPath); os.IsNotExist(err) {
logger.Info("No snapshot file found, starting with empty routing table") // No snapshot file exists, this is normal - start with empty routing table
return nil return nil
} }

View File

@ -6,13 +6,20 @@ import (
"testing" "testing"
"time" "time"
"git.eeqj.de/sneak/routewatch/internal/config"
"github.com/google/uuid" "github.com/google/uuid"
) )
func TestRoutingTable(t *testing.T) { func TestRoutingTable(t *testing.T) {
// Create a test logger // Create a test logger
logger := slog.Default() logger := slog.Default()
rt := New(logger)
// Create test config with empty state dir (no snapshot loading)
cfg := &config.Config{
StateDir: "",
}
rt := New(cfg, logger)
// Test data // Test data
prefixID1 := uuid.New() prefixID1 := uuid.New()
@ -123,7 +130,13 @@ func TestRoutingTable(t *testing.T) {
func TestRoutingTableConcurrency(t *testing.T) { func TestRoutingTableConcurrency(t *testing.T) {
// Create a test logger // Create a test logger
logger := slog.Default() logger := slog.Default()
rt := New(logger)
// Create test config with empty state dir (no snapshot loading)
cfg := &config.Config{
StateDir: "",
}
rt := New(cfg, logger)
// Test concurrent access // Test concurrent access
var wg sync.WaitGroup var wg sync.WaitGroup
@ -177,7 +190,13 @@ func TestRoutingTableConcurrency(t *testing.T) {
func TestRouteUpdate(t *testing.T) { func TestRouteUpdate(t *testing.T) {
// Create a test logger // Create a test logger
logger := slog.Default() logger := slog.Default()
rt := New(logger)
// Create test config with empty state dir (no snapshot loading)
cfg := &config.Config{
StateDir: "",
}
rt := New(cfg, logger)
prefixID := uuid.New() prefixID := uuid.New()
originASNID := uuid.New() originASNID := uuid.New()

View File

@ -232,25 +232,38 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
// handleStats returns a handler that serves API v1 statistics // handleStats returns a handler that serves API v1 statistics
func (s *Server) handleStats() http.HandlerFunc { func (s *Server) handleStats() http.HandlerFunc {
// HandlerStatsInfo represents handler statistics in the API response
type HandlerStatsInfo struct {
Name string `json:"name"`
QueueLength int `json:"queue_length"`
QueueCapacity int `json:"queue_capacity"`
ProcessedCount uint64 `json:"processed_count"`
DroppedCount uint64 `json:"dropped_count"`
AvgProcessTimeMs float64 `json:"avg_process_time_ms"`
MinProcessTimeMs float64 `json:"min_process_time_ms"`
MaxProcessTimeMs float64 `json:"max_process_time_ms"`
}
// StatsResponse represents the API statistics response // StatsResponse represents the API statistics response
type StatsResponse struct { type StatsResponse struct {
Uptime string `json:"uptime"` Uptime string `json:"uptime"`
TotalMessages uint64 `json:"total_messages"` TotalMessages uint64 `json:"total_messages"`
TotalBytes uint64 `json:"total_bytes"` TotalBytes uint64 `json:"total_bytes"`
MessagesPerSec float64 `json:"messages_per_sec"` MessagesPerSec float64 `json:"messages_per_sec"`
MbitsPerSec float64 `json:"mbits_per_sec"` MbitsPerSec float64 `json:"mbits_per_sec"`
Connected bool `json:"connected"` Connected bool `json:"connected"`
ASNs int `json:"asns"` ASNs int `json:"asns"`
Prefixes int `json:"prefixes"` Prefixes int `json:"prefixes"`
IPv4Prefixes int `json:"ipv4_prefixes"` IPv4Prefixes int `json:"ipv4_prefixes"`
IPv6Prefixes int `json:"ipv6_prefixes"` IPv6Prefixes int `json:"ipv6_prefixes"`
Peerings int `json:"peerings"` Peerings int `json:"peerings"`
DatabaseSizeBytes int64 `json:"database_size_bytes"` DatabaseSizeBytes int64 `json:"database_size_bytes"`
LiveRoutes int `json:"live_routes"` LiveRoutes int `json:"live_routes"`
IPv4Routes int `json:"ipv4_routes"` IPv4Routes int `json:"ipv4_routes"`
IPv6Routes int `json:"ipv6_routes"` IPv6Routes int `json:"ipv6_routes"`
IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"` IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"`
IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"` IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"`
HandlerStats []HandlerStatsInfo `json:"handler_stats"`
} }
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
@ -312,6 +325,23 @@ func (s *Server) handleStats() http.HandlerFunc {
// Get detailed routing table stats // Get detailed routing table stats
rtStats := s.routingTable.GetDetailedStats() rtStats := s.routingTable.GetDetailedStats()
// Get handler stats
handlerStats := s.streamer.GetHandlerStats()
handlerStatsInfo := make([]HandlerStatsInfo, 0, len(handlerStats))
const microsecondsPerMillisecond = 1000.0
for _, hs := range handlerStats {
handlerStatsInfo = append(handlerStatsInfo, HandlerStatsInfo{
Name: hs.Name,
QueueLength: hs.QueueLength,
QueueCapacity: hs.QueueCapacity,
ProcessedCount: hs.ProcessedCount,
DroppedCount: hs.DroppedCount,
AvgProcessTimeMs: float64(hs.AvgProcessTime.Microseconds()) / microsecondsPerMillisecond,
MinProcessTimeMs: float64(hs.MinProcessTime.Microseconds()) / microsecondsPerMillisecond,
MaxProcessTimeMs: float64(hs.MaxProcessTime.Microseconds()) / microsecondsPerMillisecond,
})
}
stats := StatsResponse{ stats := StatsResponse{
Uptime: uptime, Uptime: uptime,
TotalMessages: metrics.TotalMessages, TotalMessages: metrics.TotalMessages,
@ -330,6 +360,7 @@ func (s *Server) handleStats() http.HandlerFunc {
IPv6Routes: rtStats.IPv6Routes, IPv6Routes: rtStats.IPv6Routes,
IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate, IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate,
IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate, IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate,
HandlerStats: handlerStatsInfo,
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")

View File

@ -10,16 +10,16 @@ import (
"log/slog" "log/slog"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sync" "sync"
"time" "time"
"git.eeqj.de/sneak/routewatch/internal/config"
"git.eeqj.de/sneak/routewatch/internal/routingtable" "git.eeqj.de/sneak/routewatch/internal/routingtable"
) )
const ( const (
snapshotInterval = 10 * time.Minute snapshotInterval = 10 * time.Minute
snapshotFilename = "routewatch-snapshot.json.gz" snapshotFilename = "routingtable.json.gz"
tempFileSuffix = ".tmp" tempFileSuffix = ".tmp"
) )
@ -36,16 +36,15 @@ type Snapshotter struct {
} }
// New creates a new Snapshotter instance // New creates a new Snapshotter instance
func New(rt *routingtable.RoutingTable, logger *slog.Logger) (*Snapshotter, error) { func New(rt *routingtable.RoutingTable, cfg *config.Config, logger *slog.Logger) (*Snapshotter, error) {
stateDir, err := getStateDirectory() stateDir := cfg.GetStateDir()
if err != nil {
return nil, fmt.Errorf("failed to determine state directory: %w", err)
}
// Ensure state directory exists // If state directory is specified, ensure it exists
const stateDirPerms = 0750 if stateDir != "" {
if err := os.MkdirAll(stateDir, stateDirPerms); err != nil { const stateDirPerms = 0750
return nil, fmt.Errorf("failed to create state directory: %w", err) if err := os.MkdirAll(stateDir, stateDirPerms); err != nil {
return nil, fmt.Errorf("failed to create snapshot directory: %w", err)
}
} }
s := &Snapshotter{ s := &Snapshotter{
@ -76,38 +75,6 @@ func (s *Snapshotter) Start(ctx context.Context) {
go s.periodicSnapshot() go s.periodicSnapshot()
} }
// getStateDirectory returns the appropriate state directory based on the OS
func getStateDirectory() (string, error) {
switch runtime.GOOS {
case "darwin":
// macOS: Use ~/Library/Application Support/routewatch
home, err := os.UserHomeDir()
if err != nil {
return "", err
}
return filepath.Join(home, "Library", "Application Support", "routewatch"), nil
case "linux", "freebsd", "openbsd", "netbsd":
// Unix-like: Use /var/lib/routewatch if running as root, otherwise use XDG_STATE_HOME
if os.Geteuid() == 0 {
return "/var/lib/routewatch", nil
}
// Check XDG_STATE_HOME first
if xdgState := os.Getenv("XDG_STATE_HOME"); xdgState != "" {
return filepath.Join(xdgState, "routewatch"), nil
}
// Fall back to ~/.local/state/routewatch
home, err := os.UserHomeDir()
if err != nil {
return "", err
}
return filepath.Join(home, ".local", "state", "routewatch"), nil
default:
return "", fmt.Errorf("unsupported operating system: %s", runtime.GOOS)
}
}
// periodicSnapshot runs periodic snapshots // periodicSnapshot runs periodic snapshots
func (s *Snapshotter) periodicSnapshot() { func (s *Snapshotter) periodicSnapshot() {
defer s.wg.Done() defer s.wg.Done()
@ -130,6 +97,11 @@ func (s *Snapshotter) periodicSnapshot() {
// TakeSnapshot creates a snapshot of the current routing table state // TakeSnapshot creates a snapshot of the current routing table state
func (s *Snapshotter) TakeSnapshot() error { func (s *Snapshotter) TakeSnapshot() error {
// Can't take snapshot without a state directory
if s.stateDir == "" {
return nil
}
// Ensure only one snapshot runs at a time // Ensure only one snapshot runs at a time
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()

View File

@ -195,6 +195,57 @@ func (s *Streamer) GetMetrics() metrics.StreamMetrics {
return s.metrics.GetStreamMetrics() return s.metrics.GetStreamMetrics()
} }
// HandlerStats represents metrics for a single handler
type HandlerStats struct {
Name string
QueueLength int
QueueCapacity int
ProcessedCount uint64
DroppedCount uint64
AvgProcessTime time.Duration
MinProcessTime time.Duration
MaxProcessTime time.Duration
}
// GetHandlerStats returns current handler statistics
func (s *Streamer) GetHandlerStats() []HandlerStats {
s.mu.RLock()
defer s.mu.RUnlock()
stats := make([]HandlerStats, 0, len(s.handlers))
for _, info := range s.handlers {
info.metrics.mu.Lock()
hs := HandlerStats{
Name: fmt.Sprintf("%T", info.handler),
QueueLength: len(info.queue),
QueueCapacity: cap(info.queue),
ProcessedCount: info.metrics.processedCount,
DroppedCount: info.metrics.droppedCount,
MinProcessTime: info.metrics.minTime,
MaxProcessTime: info.metrics.maxTime,
}
// Calculate average time
if info.metrics.processedCount > 0 {
processedCount := info.metrics.processedCount
const maxInt64 = 1<<63 - 1
if processedCount > maxInt64 {
processedCount = maxInt64
}
//nolint:gosec // processedCount is explicitly bounded above
hs.AvgProcessTime = info.metrics.totalTime / time.Duration(processedCount)
}
info.metrics.mu.Unlock()
stats = append(stats, hs)
}
return stats
}
// GetDroppedMessages returns the total number of dropped messages // GetDroppedMessages returns the total number of dropped messages
func (s *Streamer) GetDroppedMessages() uint64 { func (s *Streamer) GetDroppedMessages() uint64 {
return atomic.LoadUint64(&s.totalDropped) return atomic.LoadUint64(&s.totalDropped)

View File

@ -153,6 +153,10 @@
</div> </div>
</div> </div>
<div id="handler-stats-container" class="status-grid">
<!-- Handler stats will be dynamically added here -->
</div>
<script> <script>
function formatBytes(bytes) { function formatBytes(bytes) {
if (bytes === 0) return '0 B'; if (bytes === 0) return '0 B';
@ -166,6 +170,45 @@
return num.toLocaleString(); return num.toLocaleString();
} }
function updateHandlerStats(handlerStats) {
const container = document.getElementById('handler-stats-container');
container.innerHTML = '';
handlerStats.forEach(handler => {
const card = document.createElement('div');
card.className = 'status-card';
// Extract handler name (remove package prefix)
const handlerName = handler.name.split('.').pop();
card.innerHTML = `
<h2>${handlerName}</h2>
<div class="metric">
<span class="metric-label">Queue</span>
<span class="metric-value">${handler.queue_length}/${handler.queue_capacity}</span>
</div>
<div class="metric">
<span class="metric-label">Processed</span>
<span class="metric-value">${formatNumber(handler.processed_count)}</span>
</div>
<div class="metric">
<span class="metric-label">Dropped</span>
<span class="metric-value ${handler.dropped_count > 0 ? 'disconnected' : ''}">${formatNumber(handler.dropped_count)}</span>
</div>
<div class="metric">
<span class="metric-label">Avg Time</span>
<span class="metric-value">${handler.avg_process_time_ms.toFixed(2)} ms</span>
</div>
<div class="metric">
<span class="metric-label">Min/Max Time</span>
<span class="metric-value">${handler.min_process_time_ms.toFixed(2)} / ${handler.max_process_time_ms.toFixed(2)} ms</span>
</div>
`;
container.appendChild(card);
});
}
function updateStatus() { function updateStatus() {
fetch('/api/v1/stats') fetch('/api/v1/stats')
.then(response => response.json()) .then(response => response.json())
@ -203,6 +246,9 @@
document.getElementById('ipv4_updates_per_sec').textContent = data.ipv4_updates_per_sec.toFixed(1); document.getElementById('ipv4_updates_per_sec').textContent = data.ipv4_updates_per_sec.toFixed(1);
document.getElementById('ipv6_updates_per_sec').textContent = data.ipv6_updates_per_sec.toFixed(1); document.getElementById('ipv6_updates_per_sec').textContent = data.ipv6_updates_per_sec.toFixed(1);
// Update handler stats
updateHandlerStats(data.handler_stats || []);
// Clear any errors // Clear any errors
document.getElementById('error').style.display = 'none'; document.getElementById('error').style.display = 'none';
}) })