// Package metrics provides centralized metrics tracking for the RouteWatch application package metrics import ( "sync" "sync/atomic" "time" "github.com/rcrowley/go-metrics" ) // Tracker provides centralized metrics tracking type Tracker struct { mu sync.RWMutex registry metrics.Registry connectedSince time.Time isConnected atomic.Bool reconnectCount atomic.Uint64 // Stream metrics (decompressed data) messageCounter metrics.Counter byteCounter metrics.Counter messageRate metrics.Meter byteRate metrics.Meter // Wire bytes metrics (actual bytes on the wire, before decompression) wireByteCounter metrics.Counter wireByteRate metrics.Meter // Route update metrics ipv4UpdateRate metrics.Meter ipv6UpdateRate metrics.Meter // Announcement/withdrawal metrics announcementCounter metrics.Counter withdrawalCounter metrics.Counter churnRate metrics.Meter // combined announcements + withdrawals per second // BGP peer tracking bgpPeerCount atomic.Int32 } // New creates a new metrics tracker func New() *Tracker { registry := metrics.NewRegistry() return &Tracker{ registry: registry, messageCounter: metrics.NewCounter(), byteCounter: metrics.NewCounter(), messageRate: metrics.NewMeter(), byteRate: metrics.NewMeter(), wireByteCounter: metrics.NewCounter(), wireByteRate: metrics.NewMeter(), ipv4UpdateRate: metrics.NewMeter(), ipv6UpdateRate: metrics.NewMeter(), announcementCounter: metrics.NewCounter(), withdrawalCounter: metrics.NewCounter(), churnRate: metrics.NewMeter(), } } // SetConnected updates the connection status func (t *Tracker) SetConnected(connected bool) { wasConnected := t.isConnected.Swap(connected) if connected { t.mu.Lock() t.connectedSince = time.Now() t.mu.Unlock() // Increment reconnect count (but not for the initial connection) if wasConnected || t.reconnectCount.Load() > 0 { t.reconnectCount.Add(1) } } } // GetReconnectCount returns the number of reconnections since startup func (t *Tracker) GetReconnectCount() uint64 { return t.reconnectCount.Load() } // IsConnected returns the current connection status func (t *Tracker) IsConnected() bool { return t.isConnected.Load() } // RecordMessage records a received message and its decompressed size func (t *Tracker) RecordMessage(bytes int64) { t.messageCounter.Inc(1) t.byteCounter.Inc(bytes) t.messageRate.Mark(1) t.byteRate.Mark(bytes) } // RecordWireBytes records actual bytes received on the wire (before decompression) func (t *Tracker) RecordWireBytes(bytes int64) { t.wireByteCounter.Inc(bytes) t.wireByteRate.Mark(bytes) } // GetStreamMetrics returns current streaming metrics func (t *Tracker) GetStreamMetrics() StreamMetrics { t.mu.RLock() connectedSince := t.connectedSince t.mu.RUnlock() const bitsPerByte = 8 // Safely convert counters to uint64 msgCount := t.messageCounter.Count() byteCount := t.byteCounter.Count() wireByteCount := t.wireByteCounter.Count() var totalMessages, totalBytes, totalWireBytes uint64 if msgCount >= 0 { totalMessages = uint64(msgCount) } if byteCount >= 0 { totalBytes = uint64(byteCount) } if wireByteCount >= 0 { totalWireBytes = uint64(wireByteCount) } return StreamMetrics{ TotalMessages: totalMessages, TotalBytes: totalBytes, TotalWireBytes: totalWireBytes, ConnectedSince: connectedSince, Connected: t.isConnected.Load(), MessagesPerSec: t.messageRate.Rate1(), BitsPerSec: t.byteRate.Rate1() * bitsPerByte, WireBitsPerSec: t.wireByteRate.Rate1() * bitsPerByte, ReconnectCount: t.reconnectCount.Load(), } } // RecordIPv4Update records an IPv4 route update func (t *Tracker) RecordIPv4Update() { t.ipv4UpdateRate.Mark(1) } // RecordIPv6Update records an IPv6 route update func (t *Tracker) RecordIPv6Update() { t.ipv6UpdateRate.Mark(1) } // RecordAnnouncement records a route announcement func (t *Tracker) RecordAnnouncement() { t.announcementCounter.Inc(1) t.churnRate.Mark(1) } // RecordWithdrawal records a route withdrawal func (t *Tracker) RecordWithdrawal() { t.withdrawalCounter.Inc(1) t.churnRate.Mark(1) } // SetBGPPeerCount updates the current BGP peer count func (t *Tracker) SetBGPPeerCount(count int) { // BGP peer count is always small (< 1000), so int32 is safe if count > 0 && count < 1<<31 { t.bgpPeerCount.Store(int32(count)) //nolint:gosec // count is validated } } // GetBGPPeerCount returns the current BGP peer count func (t *Tracker) GetBGPPeerCount() int { return int(t.bgpPeerCount.Load()) } // GetAnnouncementCount returns the total announcement count func (t *Tracker) GetAnnouncementCount() uint64 { count := t.announcementCounter.Count() if count < 0 { return 0 } return uint64(count) } // GetWithdrawalCount returns the total withdrawal count func (t *Tracker) GetWithdrawalCount() uint64 { count := t.withdrawalCounter.Count() if count < 0 { return 0 } return uint64(count) } // GetChurnRate returns the route churn rate per second func (t *Tracker) GetChurnRate() float64 { return t.churnRate.Rate1() } // GetRouteMetrics returns current route update metrics func (t *Tracker) GetRouteMetrics() RouteMetrics { return RouteMetrics{ IPv4UpdatesPerSec: t.ipv4UpdateRate.Rate1(), IPv6UpdatesPerSec: t.ipv6UpdateRate.Rate1(), } } // StreamMetrics contains streaming statistics type StreamMetrics struct { // TotalMessages is the total number of messages received since startup TotalMessages uint64 // TotalBytes is the total number of decompressed bytes received since startup TotalBytes uint64 // TotalWireBytes is the total number of bytes received on the wire (before decompression) TotalWireBytes uint64 // ConnectedSince is the time when the current connection was established ConnectedSince time.Time // Connected indicates whether the stream is currently connected Connected bool // MessagesPerSec is the rate of messages received per second (1-minute average) MessagesPerSec float64 // BitsPerSec is the rate of decompressed bits received per second (1-minute average) BitsPerSec float64 // WireBitsPerSec is the rate of bits received on the wire per second (1-minute average) WireBitsPerSec float64 // ReconnectCount is the number of reconnections since startup ReconnectCount uint64 } // RouteMetrics contains route update statistics type RouteMetrics struct { // IPv4UpdatesPerSec is the rate of IPv4 route updates per second (1-minute average) IPv4UpdatesPerSec float64 // IPv6UpdatesPerSec is the rate of IPv6 route updates per second (1-minute average) IPv6UpdatesPerSec float64 }