hdmistat/internal/netmon/netmon.go
2025-07-24 14:32:50 +02:00

260 lines
6.2 KiB
Go

// Package netmon provides network interface monitoring with historical data
package netmon
import (
"context"
"log/slog"
"strings"
"sync"
"time"
"github.com/dustin/go-humanize"
psnet "github.com/shirou/gopsutil/v3/net"
)
const (
ringBufferSize = 60 // Keep 60 seconds of history
sampleInterval = time.Second
rateWindowSeconds = 5 // Window size for rate calculation
bitsPerByte = 8
)
// Sample represents a single point-in-time network sample
type Sample struct {
BytesSent uint64
BytesRecv uint64
Timestamp time.Time
}
// InterfaceStats holds the ring buffer for a single interface
type InterfaceStats struct {
samples [ringBufferSize]Sample
head int // Points to the oldest sample
count int // Number of valid samples
lastSample Sample
}
// Stats represents current stats for an interface
type Stats struct {
Name string
BytesSent uint64
BytesRecv uint64
BitsSentRate uint64 // bits per second
BitsRecvRate uint64 // bits per second
}
// FormatSentRate returns the send rate as a human-readable string
func (s *Stats) FormatSentRate() string {
return humanize.SI(float64(s.BitsSentRate), "bit/s")
}
// FormatRecvRate returns the receive rate as a human-readable string
func (s *Stats) FormatRecvRate() string {
return humanize.SI(float64(s.BitsRecvRate), "bit/s")
}
// Monitor tracks network statistics for all interfaces
type Monitor struct {
mu sync.RWMutex
interfaces map[string]*InterfaceStats
logger *slog.Logger
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// New creates a new network monitor
func New(logger *slog.Logger) *Monitor {
ctx, cancel := context.WithCancel(context.Background())
m := &Monitor{
interfaces: make(map[string]*InterfaceStats),
logger: logger,
ctx: ctx,
cancel: cancel,
}
return m
}
// Start begins monitoring network interfaces
func (m *Monitor) Start() {
m.wg.Add(1)
go m.monitorLoop()
}
// Stop stops the monitor
func (m *Monitor) Stop() {
m.cancel()
m.wg.Wait()
}
// GetStats returns current stats for all interfaces
func (m *Monitor) GetStats() []Stats {
m.mu.RLock()
defer m.mu.RUnlock()
var stats []Stats
for name, ifaceStats := range m.interfaces {
// Skip interfaces with no samples
if ifaceStats.count == 0 {
continue
}
// Calculate rates over available samples (up to 5 seconds)
rate := m.calculateRate(ifaceStats, rateWindowSeconds)
stats = append(stats, Stats{
Name: name,
BytesSent: ifaceStats.lastSample.BytesSent,
BytesRecv: ifaceStats.lastSample.BytesRecv,
BitsSentRate: uint64(rate.sentRate * bitsPerByte), // Convert to bits/sec
BitsRecvRate: uint64(rate.recvRate * bitsPerByte), // Convert to bits/sec
})
}
return stats
}
// GetStatsForInterface returns stats for a specific interface
func (m *Monitor) GetStatsForInterface(name string) (*Stats, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
ifaceStats, ok := m.interfaces[name]
if !ok || ifaceStats.count == 0 {
return nil, false
}
rate := m.calculateRate(ifaceStats, 5)
return &Stats{
Name: name,
BytesSent: ifaceStats.lastSample.BytesSent,
BytesRecv: ifaceStats.lastSample.BytesRecv,
BitsSentRate: uint64(rate.sentRate * 8), // Convert to bits/sec
BitsRecvRate: uint64(rate.recvRate * 8), // Convert to bits/sec
}, true
}
type rateInfo struct {
sentRate float64 // bytes per second
recvRate float64 // bytes per second
}
// calculateRate calculates the average rate over the last n seconds
func (m *Monitor) calculateRate(ifaceStats *InterfaceStats, seconds int) rateInfo {
if ifaceStats.count <= 1 {
return rateInfo{}
}
// Determine how many samples to use (up to requested seconds)
samplesToUse := seconds
if samplesToUse > ifaceStats.count-1 {
samplesToUse = ifaceStats.count - 1
}
if samplesToUse > ringBufferSize-1 {
samplesToUse = ringBufferSize - 1
}
// Get the most recent sample
newestIdx := (ifaceStats.head + ifaceStats.count - 1) % ringBufferSize
newest := ifaceStats.samples[newestIdx]
// Get the sample from n seconds ago
oldestIdx := (ifaceStats.head + ifaceStats.count - 1 - samplesToUse) % ringBufferSize
oldest := ifaceStats.samples[oldestIdx]
// Calculate time difference
timeDiff := newest.Timestamp.Sub(oldest.Timestamp).Seconds()
if timeDiff <= 0 {
return rateInfo{}
}
// Calculate rates
bytesSentDiff := float64(newest.BytesSent - oldest.BytesSent)
bytesRecvDiff := float64(newest.BytesRecv - oldest.BytesRecv)
return rateInfo{
sentRate: bytesSentDiff / timeDiff,
recvRate: bytesRecvDiff / timeDiff,
}
}
// monitorLoop continuously samples network statistics
func (m *Monitor) monitorLoop() {
defer m.wg.Done()
ticker := time.NewTicker(sampleInterval)
defer ticker.Stop()
// Take initial sample
m.takeSample()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
m.takeSample()
}
}
}
// takeSample captures current network statistics
func (m *Monitor) takeSample() {
counters, err := psnet.IOCounters(true)
if err != nil {
m.logger.Warn("failed to get network counters", "error", err)
return
}
m.mu.Lock()
defer m.mu.Unlock()
now := time.Now()
currentInterfaces := make(map[string]bool)
for _, counter := range counters {
// Skip loopback and docker interfaces
if counter.Name == "lo" || strings.HasPrefix(counter.Name, "docker") {
continue
}
currentInterfaces[counter.Name] = true
// Get or create interface stats
ifaceStats, exists := m.interfaces[counter.Name]
if !exists {
ifaceStats = &InterfaceStats{}
m.interfaces[counter.Name] = ifaceStats
}
// Create new sample
sample := Sample{
BytesSent: counter.BytesSent,
BytesRecv: counter.BytesRecv,
Timestamp: now,
}
// Add to ring buffer
if ifaceStats.count < ringBufferSize {
// Buffer not full yet
idx := ifaceStats.count
ifaceStats.samples[idx] = sample
ifaceStats.count++
} else {
// Buffer is full, overwrite oldest
ifaceStats.samples[ifaceStats.head] = sample
ifaceStats.head = (ifaceStats.head + 1) % ringBufferSize
}
ifaceStats.lastSample = sample
}
// Remove interfaces that no longer exist
for name := range m.interfaces {
if !currentInterfaces[name] {
delete(m.interfaces, name)
}
}
}