Add handler queue metrics to status page

- Add GetHandlerStats() method to streamer to expose handler metrics
- Include queue length/capacity, processed/dropped counts, timing stats
- Update API to include handler_stats in response
- Add dynamic handler stats display to status page HTML
- Shows separate status box for each handler with all metrics
This commit is contained in:
2025-07-28 00:32:37 +02:00
parent 6593a7be76
commit 1a0622efaa
3 changed files with 145 additions and 17 deletions

View File

@@ -195,6 +195,57 @@ func (s *Streamer) GetMetrics() metrics.StreamMetrics {
return s.metrics.GetStreamMetrics()
}
// HandlerStats represents metrics for a single handler
type HandlerStats struct {
Name string
QueueLength int
QueueCapacity int
ProcessedCount uint64
DroppedCount uint64
AvgProcessTime time.Duration
MinProcessTime time.Duration
MaxProcessTime time.Duration
}
// GetHandlerStats returns current handler statistics
func (s *Streamer) GetHandlerStats() []HandlerStats {
s.mu.RLock()
defer s.mu.RUnlock()
stats := make([]HandlerStats, 0, len(s.handlers))
for _, info := range s.handlers {
info.metrics.mu.Lock()
hs := HandlerStats{
Name: fmt.Sprintf("%T", info.handler),
QueueLength: len(info.queue),
QueueCapacity: cap(info.queue),
ProcessedCount: info.metrics.processedCount,
DroppedCount: info.metrics.droppedCount,
MinProcessTime: info.metrics.minTime,
MaxProcessTime: info.metrics.maxTime,
}
// Calculate average time
if info.metrics.processedCount > 0 {
processedCount := info.metrics.processedCount
const maxInt64 = 1<<63 - 1
if processedCount > maxInt64 {
processedCount = maxInt64
}
//nolint:gosec // processedCount is explicitly bounded above
hs.AvgProcessTime = info.metrics.totalTime / time.Duration(processedCount)
}
info.metrics.mu.Unlock()
stats = append(stats, hs)
}
return stats
}
// GetDroppedMessages returns the total number of dropped messages
func (s *Streamer) GetDroppedMessages() uint64 {
return atomic.LoadUint64(&s.totalDropped)