diff --git a/internal/server/server.go b/internal/server/server.go index 99b5ac3..24cc7b3 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -232,25 +232,38 @@ func (s *Server) handleStatusJSON() http.HandlerFunc { // handleStats returns a handler that serves API v1 statistics func (s *Server) handleStats() http.HandlerFunc { + // HandlerStatsInfo represents handler statistics in the API response + type HandlerStatsInfo struct { + Name string `json:"name"` + QueueLength int `json:"queue_length"` + QueueCapacity int `json:"queue_capacity"` + ProcessedCount uint64 `json:"processed_count"` + DroppedCount uint64 `json:"dropped_count"` + AvgProcessTimeMs float64 `json:"avg_process_time_ms"` + MinProcessTimeMs float64 `json:"min_process_time_ms"` + MaxProcessTimeMs float64 `json:"max_process_time_ms"` + } + // StatsResponse represents the API statistics response type StatsResponse struct { - Uptime string `json:"uptime"` - TotalMessages uint64 `json:"total_messages"` - TotalBytes uint64 `json:"total_bytes"` - MessagesPerSec float64 `json:"messages_per_sec"` - MbitsPerSec float64 `json:"mbits_per_sec"` - Connected bool `json:"connected"` - ASNs int `json:"asns"` - Prefixes int `json:"prefixes"` - IPv4Prefixes int `json:"ipv4_prefixes"` - IPv6Prefixes int `json:"ipv6_prefixes"` - Peerings int `json:"peerings"` - DatabaseSizeBytes int64 `json:"database_size_bytes"` - LiveRoutes int `json:"live_routes"` - IPv4Routes int `json:"ipv4_routes"` - IPv6Routes int `json:"ipv6_routes"` - IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"` - IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"` + Uptime string `json:"uptime"` + TotalMessages uint64 `json:"total_messages"` + TotalBytes uint64 `json:"total_bytes"` + MessagesPerSec float64 `json:"messages_per_sec"` + MbitsPerSec float64 `json:"mbits_per_sec"` + Connected bool `json:"connected"` + ASNs int `json:"asns"` + Prefixes int `json:"prefixes"` + IPv4Prefixes int `json:"ipv4_prefixes"` + IPv6Prefixes int `json:"ipv6_prefixes"` + Peerings int `json:"peerings"` + DatabaseSizeBytes int64 `json:"database_size_bytes"` + LiveRoutes int `json:"live_routes"` + IPv4Routes int `json:"ipv4_routes"` + IPv6Routes int `json:"ipv6_routes"` + IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"` + IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"` + HandlerStats []HandlerStatsInfo `json:"handler_stats"` } return func(w http.ResponseWriter, r *http.Request) { @@ -312,6 +325,23 @@ func (s *Server) handleStats() http.HandlerFunc { // Get detailed routing table stats rtStats := s.routingTable.GetDetailedStats() + // Get handler stats + handlerStats := s.streamer.GetHandlerStats() + handlerStatsInfo := make([]HandlerStatsInfo, 0, len(handlerStats)) + const microsecondsPerMillisecond = 1000.0 + for _, hs := range handlerStats { + handlerStatsInfo = append(handlerStatsInfo, HandlerStatsInfo{ + Name: hs.Name, + QueueLength: hs.QueueLength, + QueueCapacity: hs.QueueCapacity, + ProcessedCount: hs.ProcessedCount, + DroppedCount: hs.DroppedCount, + AvgProcessTimeMs: float64(hs.AvgProcessTime.Microseconds()) / microsecondsPerMillisecond, + MinProcessTimeMs: float64(hs.MinProcessTime.Microseconds()) / microsecondsPerMillisecond, + MaxProcessTimeMs: float64(hs.MaxProcessTime.Microseconds()) / microsecondsPerMillisecond, + }) + } + stats := StatsResponse{ Uptime: uptime, TotalMessages: metrics.TotalMessages, @@ -330,6 +360,7 @@ func (s *Server) handleStats() http.HandlerFunc { IPv6Routes: rtStats.IPv6Routes, IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate, IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate, + HandlerStats: handlerStatsInfo, } w.Header().Set("Content-Type", "application/json") diff --git a/internal/streamer/streamer.go b/internal/streamer/streamer.go index 8698af0..fe7f6b0 100644 --- a/internal/streamer/streamer.go +++ b/internal/streamer/streamer.go @@ -195,6 +195,57 @@ func (s *Streamer) GetMetrics() metrics.StreamMetrics { return s.metrics.GetStreamMetrics() } +// HandlerStats represents metrics for a single handler +type HandlerStats struct { + Name string + QueueLength int + QueueCapacity int + ProcessedCount uint64 + DroppedCount uint64 + AvgProcessTime time.Duration + MinProcessTime time.Duration + MaxProcessTime time.Duration +} + +// GetHandlerStats returns current handler statistics +func (s *Streamer) GetHandlerStats() []HandlerStats { + s.mu.RLock() + defer s.mu.RUnlock() + + stats := make([]HandlerStats, 0, len(s.handlers)) + + for _, info := range s.handlers { + info.metrics.mu.Lock() + + hs := HandlerStats{ + Name: fmt.Sprintf("%T", info.handler), + QueueLength: len(info.queue), + QueueCapacity: cap(info.queue), + ProcessedCount: info.metrics.processedCount, + DroppedCount: info.metrics.droppedCount, + MinProcessTime: info.metrics.minTime, + MaxProcessTime: info.metrics.maxTime, + } + + // Calculate average time + if info.metrics.processedCount > 0 { + processedCount := info.metrics.processedCount + const maxInt64 = 1<<63 - 1 + if processedCount > maxInt64 { + processedCount = maxInt64 + } + //nolint:gosec // processedCount is explicitly bounded above + hs.AvgProcessTime = info.metrics.totalTime / time.Duration(processedCount) + } + + info.metrics.mu.Unlock() + + stats = append(stats, hs) + } + + return stats +} + // GetDroppedMessages returns the total number of dropped messages func (s *Streamer) GetDroppedMessages() uint64 { return atomic.LoadUint64(&s.totalDropped) diff --git a/internal/templates/status.html b/internal/templates/status.html index 33163c0..7519da3 100644 --- a/internal/templates/status.html +++ b/internal/templates/status.html @@ -153,6 +153,10 @@ +