- Add /ip and /ip/{addr} JSON endpoints returning comprehensive IP info
- Include ASN, netblock, country code, org name, abuse contact, RIR data
- Extend ASN schema with WHOIS fields (country, org, abuse contact, etc)
- Create background WHOIS fetcher for rate-limited ASN info updates
- Store raw WHOIS responses for debugging and data preservation
- Queue on-demand WHOIS lookups when stale data is requested
- Refactor handleIPInfo to serve all IP endpoints consistently
161 lines
4.5 KiB
Go
161 lines
4.5 KiB
Go
// Package metrics provides centralized metrics tracking for the RouteWatch application
|
|
package metrics
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/rcrowley/go-metrics"
|
|
)
|
|
|
|
// Tracker provides centralized metrics tracking
|
|
type Tracker struct {
|
|
mu sync.RWMutex
|
|
registry metrics.Registry
|
|
connectedSince time.Time
|
|
isConnected atomic.Bool
|
|
|
|
// Stream metrics (decompressed data)
|
|
messageCounter metrics.Counter
|
|
byteCounter metrics.Counter
|
|
messageRate metrics.Meter
|
|
byteRate metrics.Meter
|
|
|
|
// Wire bytes metrics (actual bytes on the wire, before decompression)
|
|
wireByteCounter metrics.Counter
|
|
wireByteRate metrics.Meter
|
|
|
|
// Route update metrics
|
|
ipv4UpdateRate metrics.Meter
|
|
ipv6UpdateRate metrics.Meter
|
|
}
|
|
|
|
// New creates a new metrics tracker
|
|
func New() *Tracker {
|
|
registry := metrics.NewRegistry()
|
|
|
|
return &Tracker{
|
|
registry: registry,
|
|
messageCounter: metrics.NewCounter(),
|
|
byteCounter: metrics.NewCounter(),
|
|
messageRate: metrics.NewMeter(),
|
|
byteRate: metrics.NewMeter(),
|
|
wireByteCounter: metrics.NewCounter(),
|
|
wireByteRate: metrics.NewMeter(),
|
|
ipv4UpdateRate: metrics.NewMeter(),
|
|
ipv6UpdateRate: metrics.NewMeter(),
|
|
}
|
|
}
|
|
|
|
// SetConnected updates the connection status
|
|
func (t *Tracker) SetConnected(connected bool) {
|
|
t.isConnected.Store(connected)
|
|
if connected {
|
|
t.mu.Lock()
|
|
t.connectedSince = time.Now()
|
|
t.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// IsConnected returns the current connection status
|
|
func (t *Tracker) IsConnected() bool {
|
|
return t.isConnected.Load()
|
|
}
|
|
|
|
// RecordMessage records a received message and its decompressed size
|
|
func (t *Tracker) RecordMessage(bytes int64) {
|
|
t.messageCounter.Inc(1)
|
|
t.byteCounter.Inc(bytes)
|
|
t.messageRate.Mark(1)
|
|
t.byteRate.Mark(bytes)
|
|
}
|
|
|
|
// RecordWireBytes records actual bytes received on the wire (before decompression)
|
|
func (t *Tracker) RecordWireBytes(bytes int64) {
|
|
t.wireByteCounter.Inc(bytes)
|
|
t.wireByteRate.Mark(bytes)
|
|
}
|
|
|
|
// GetStreamMetrics returns current streaming metrics
|
|
func (t *Tracker) GetStreamMetrics() StreamMetrics {
|
|
t.mu.RLock()
|
|
connectedSince := t.connectedSince
|
|
t.mu.RUnlock()
|
|
|
|
const bitsPerByte = 8
|
|
|
|
// Safely convert counters to uint64
|
|
msgCount := t.messageCounter.Count()
|
|
byteCount := t.byteCounter.Count()
|
|
wireByteCount := t.wireByteCounter.Count()
|
|
|
|
var totalMessages, totalBytes, totalWireBytes uint64
|
|
if msgCount >= 0 {
|
|
totalMessages = uint64(msgCount)
|
|
}
|
|
if byteCount >= 0 {
|
|
totalBytes = uint64(byteCount)
|
|
}
|
|
if wireByteCount >= 0 {
|
|
totalWireBytes = uint64(wireByteCount)
|
|
}
|
|
|
|
return StreamMetrics{
|
|
TotalMessages: totalMessages,
|
|
TotalBytes: totalBytes,
|
|
TotalWireBytes: totalWireBytes,
|
|
ConnectedSince: connectedSince,
|
|
Connected: t.isConnected.Load(),
|
|
MessagesPerSec: t.messageRate.Rate1(),
|
|
BitsPerSec: t.byteRate.Rate1() * bitsPerByte,
|
|
WireBitsPerSec: t.wireByteRate.Rate1() * bitsPerByte,
|
|
}
|
|
}
|
|
|
|
// RecordIPv4Update records an IPv4 route update
|
|
func (t *Tracker) RecordIPv4Update() {
|
|
t.ipv4UpdateRate.Mark(1)
|
|
}
|
|
|
|
// RecordIPv6Update records an IPv6 route update
|
|
func (t *Tracker) RecordIPv6Update() {
|
|
t.ipv6UpdateRate.Mark(1)
|
|
}
|
|
|
|
// GetRouteMetrics returns current route update metrics
|
|
func (t *Tracker) GetRouteMetrics() RouteMetrics {
|
|
return RouteMetrics{
|
|
IPv4UpdatesPerSec: t.ipv4UpdateRate.Rate1(),
|
|
IPv6UpdatesPerSec: t.ipv6UpdateRate.Rate1(),
|
|
}
|
|
}
|
|
|
|
// StreamMetrics contains streaming statistics
|
|
type StreamMetrics struct {
|
|
// TotalMessages is the total number of messages received since startup
|
|
TotalMessages uint64
|
|
// TotalBytes is the total number of decompressed bytes received since startup
|
|
TotalBytes uint64
|
|
// TotalWireBytes is the total number of bytes received on the wire (before decompression)
|
|
TotalWireBytes uint64
|
|
// ConnectedSince is the time when the current connection was established
|
|
ConnectedSince time.Time
|
|
// Connected indicates whether the stream is currently connected
|
|
Connected bool
|
|
// MessagesPerSec is the rate of messages received per second (1-minute average)
|
|
MessagesPerSec float64
|
|
// BitsPerSec is the rate of decompressed bits received per second (1-minute average)
|
|
BitsPerSec float64
|
|
// WireBitsPerSec is the rate of bits received on the wire per second (1-minute average)
|
|
WireBitsPerSec float64
|
|
}
|
|
|
|
// RouteMetrics contains route update statistics
|
|
type RouteMetrics struct {
|
|
// IPv4UpdatesPerSec is the rate of IPv4 route updates per second (1-minute average)
|
|
IPv4UpdatesPerSec float64
|
|
// IPv6UpdatesPerSec is the rate of IPv6 route updates per second (1-minute average)
|
|
IPv6UpdatesPerSec float64
|
|
}
|