270 lines
6.2 KiB
Go
270 lines
6.2 KiB
Go
// Package netmon provides network interface monitoring with historical data
|
|
//
|
|
//nolint:mnd
|
|
package netmon
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
psnet "github.com/shirou/gopsutil/v3/net"
|
|
)
|
|
|
|
const (
|
|
ringBufferSize = 60 // Keep 60 seconds of history
|
|
sampleInterval = time.Second
|
|
rateWindowSeconds = 5 // Window size for rate calculation
|
|
bitsPerByte = 8
|
|
)
|
|
|
|
// Sample represents a single point-in-time network sample
|
|
type Sample struct {
|
|
BytesSent uint64
|
|
BytesRecv uint64
|
|
Timestamp time.Time
|
|
}
|
|
|
|
// InterfaceStats holds the ring buffer for a single interface
|
|
type InterfaceStats struct {
|
|
samples [ringBufferSize]Sample
|
|
head int // Points to the oldest sample
|
|
count int // Number of valid samples
|
|
lastSample Sample
|
|
}
|
|
|
|
// Stats represents current stats for an interface
|
|
type Stats struct {
|
|
Name string
|
|
BytesSent uint64
|
|
BytesRecv uint64
|
|
BitsSentRate uint64 // bits per second
|
|
BitsRecvRate uint64 // bits per second
|
|
}
|
|
|
|
// FormatSentRate returns the send rate as a human-readable string
|
|
func (s *Stats) FormatSentRate() string {
|
|
return humanize.SI(float64(s.BitsSentRate), "bit/s")
|
|
}
|
|
|
|
// FormatRecvRate returns the receive rate as a human-readable string
|
|
func (s *Stats) FormatRecvRate() string {
|
|
return humanize.SI(float64(s.BitsRecvRate), "bit/s")
|
|
}
|
|
|
|
// Monitor tracks network statistics for all interfaces
|
|
type Monitor struct {
|
|
mu sync.RWMutex
|
|
interfaces map[string]*InterfaceStats
|
|
logger *slog.Logger
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// New creates a new network monitor
|
|
func New(logger *slog.Logger) *Monitor {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
m := &Monitor{
|
|
interfaces: make(map[string]*InterfaceStats),
|
|
logger: logger,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
return m
|
|
}
|
|
|
|
// Start begins monitoring network interfaces
|
|
func (m *Monitor) Start() {
|
|
m.wg.Add(1)
|
|
go m.monitorLoop()
|
|
}
|
|
|
|
// Stop stops the monitor
|
|
func (m *Monitor) Stop() {
|
|
m.cancel()
|
|
m.wg.Wait()
|
|
}
|
|
|
|
// GetStats returns current stats for all interfaces
|
|
func (m *Monitor) GetStats() []Stats {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
var stats []Stats
|
|
for name, ifaceStats := range m.interfaces {
|
|
// Skip interfaces with no samples
|
|
if ifaceStats.count == 0 {
|
|
continue
|
|
}
|
|
|
|
// Calculate rates over available samples (up to 5 seconds)
|
|
rate := m.calculateRate(ifaceStats, rateWindowSeconds)
|
|
|
|
stats = append(stats, Stats{
|
|
Name: name,
|
|
BytesSent: ifaceStats.lastSample.BytesSent,
|
|
BytesRecv: ifaceStats.lastSample.BytesRecv,
|
|
BitsSentRate: uint64(
|
|
rate.sentRate * bitsPerByte,
|
|
), // Convert to bits/sec
|
|
BitsRecvRate: uint64(
|
|
rate.recvRate * bitsPerByte,
|
|
), // Convert to bits/sec
|
|
})
|
|
}
|
|
|
|
return stats
|
|
}
|
|
|
|
// GetStatsForInterface returns stats for a specific interface
|
|
func (m *Monitor) GetStatsForInterface(name string) (*Stats, bool) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
ifaceStats, ok := m.interfaces[name]
|
|
if !ok || ifaceStats.count == 0 {
|
|
return nil, false
|
|
}
|
|
|
|
rate := m.calculateRate(ifaceStats, 5)
|
|
|
|
return &Stats{
|
|
Name: name,
|
|
BytesSent: ifaceStats.lastSample.BytesSent,
|
|
BytesRecv: ifaceStats.lastSample.BytesRecv,
|
|
BitsSentRate: uint64(rate.sentRate * 8), // Convert to bits/sec
|
|
BitsRecvRate: uint64(rate.recvRate * 8), // Convert to bits/sec
|
|
}, true
|
|
}
|
|
|
|
type rateInfo struct {
|
|
sentRate float64 // bytes per second
|
|
recvRate float64 // bytes per second
|
|
}
|
|
|
|
// calculateRate calculates the average rate over the last n seconds
|
|
func (m *Monitor) calculateRate(
|
|
ifaceStats *InterfaceStats,
|
|
seconds int,
|
|
) rateInfo {
|
|
if ifaceStats.count <= 1 {
|
|
return rateInfo{}
|
|
}
|
|
|
|
// Determine how many samples to use (up to requested seconds)
|
|
samplesToUse := seconds
|
|
if samplesToUse > ifaceStats.count-1 {
|
|
samplesToUse = ifaceStats.count - 1
|
|
}
|
|
if samplesToUse > ringBufferSize-1 {
|
|
samplesToUse = ringBufferSize - 1
|
|
}
|
|
|
|
// Get the most recent sample
|
|
newestIdx := (ifaceStats.head + ifaceStats.count - 1) % ringBufferSize
|
|
newest := ifaceStats.samples[newestIdx]
|
|
|
|
// Get the sample from n seconds ago
|
|
oldestIdx := (ifaceStats.head + ifaceStats.count - 1 - samplesToUse) % ringBufferSize
|
|
oldest := ifaceStats.samples[oldestIdx]
|
|
|
|
// Calculate time difference
|
|
timeDiff := newest.Timestamp.Sub(oldest.Timestamp).Seconds()
|
|
if timeDiff <= 0 {
|
|
return rateInfo{}
|
|
}
|
|
|
|
// Calculate rates
|
|
bytesSentDiff := float64(newest.BytesSent - oldest.BytesSent)
|
|
bytesRecvDiff := float64(newest.BytesRecv - oldest.BytesRecv)
|
|
|
|
return rateInfo{
|
|
sentRate: bytesSentDiff / timeDiff,
|
|
recvRate: bytesRecvDiff / timeDiff,
|
|
}
|
|
}
|
|
|
|
// monitorLoop continuously samples network statistics
|
|
func (m *Monitor) monitorLoop() {
|
|
defer m.wg.Done()
|
|
|
|
ticker := time.NewTicker(sampleInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Take initial sample
|
|
m.takeSample()
|
|
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
m.takeSample()
|
|
}
|
|
}
|
|
}
|
|
|
|
// takeSample captures current network statistics
|
|
func (m *Monitor) takeSample() {
|
|
counters, err := psnet.IOCounters(true)
|
|
if err != nil {
|
|
m.logger.Warn("failed to get network counters", "error", err)
|
|
return
|
|
}
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
currentInterfaces := make(map[string]bool)
|
|
|
|
for _, counter := range counters {
|
|
// Skip loopback and docker interfaces
|
|
if counter.Name == "lo" ||
|
|
strings.HasPrefix(counter.Name, "docker") {
|
|
continue
|
|
}
|
|
|
|
currentInterfaces[counter.Name] = true
|
|
|
|
// Get or create interface stats
|
|
ifaceStats, exists := m.interfaces[counter.Name]
|
|
if !exists {
|
|
ifaceStats = &InterfaceStats{}
|
|
m.interfaces[counter.Name] = ifaceStats
|
|
}
|
|
|
|
// Create new sample
|
|
sample := Sample{
|
|
BytesSent: counter.BytesSent,
|
|
BytesRecv: counter.BytesRecv,
|
|
Timestamp: now,
|
|
}
|
|
|
|
// Add to ring buffer
|
|
if ifaceStats.count < ringBufferSize {
|
|
// Buffer not full yet
|
|
idx := ifaceStats.count
|
|
ifaceStats.samples[idx] = sample
|
|
ifaceStats.count++
|
|
} else {
|
|
// Buffer is full, overwrite oldest
|
|
ifaceStats.samples[ifaceStats.head] = sample
|
|
ifaceStats.head = (ifaceStats.head + 1) % ringBufferSize
|
|
}
|
|
|
|
ifaceStats.lastSample = sample
|
|
}
|
|
|
|
// Remove interfaces that no longer exist
|
|
for name := range m.interfaces {
|
|
if !currentInterfaces[name] {
|
|
delete(m.interfaces, name)
|
|
}
|
|
}
|
|
}
|