diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index f3d5c87..e6c8614 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -16,12 +16,16 @@ type Tracker struct { connectedSince time.Time isConnected atomic.Bool - // Stream metrics + // Stream metrics (decompressed data) messageCounter metrics.Counter byteCounter metrics.Counter messageRate metrics.Meter byteRate metrics.Meter + // Wire bytes metrics (actual bytes on the wire, before decompression) + wireByteCounter metrics.Counter + wireByteRate metrics.Meter + // Route update metrics ipv4UpdateRate metrics.Meter ipv6UpdateRate metrics.Meter @@ -32,13 +36,15 @@ func New() *Tracker { registry := metrics.NewRegistry() return &Tracker{ - registry: registry, - messageCounter: metrics.NewCounter(), - byteCounter: metrics.NewCounter(), - messageRate: metrics.NewMeter(), - byteRate: metrics.NewMeter(), - ipv4UpdateRate: metrics.NewMeter(), - ipv6UpdateRate: metrics.NewMeter(), + registry: registry, + messageCounter: metrics.NewCounter(), + byteCounter: metrics.NewCounter(), + messageRate: metrics.NewMeter(), + byteRate: metrics.NewMeter(), + wireByteCounter: metrics.NewCounter(), + wireByteRate: metrics.NewMeter(), + ipv4UpdateRate: metrics.NewMeter(), + ipv6UpdateRate: metrics.NewMeter(), } } @@ -57,7 +63,7 @@ func (t *Tracker) IsConnected() bool { return t.isConnected.Load() } -// RecordMessage records a received message and its size +// RecordMessage records a received message and its decompressed size func (t *Tracker) RecordMessage(bytes int64) { t.messageCounter.Inc(1) t.byteCounter.Inc(bytes) @@ -65,6 +71,12 @@ func (t *Tracker) RecordMessage(bytes int64) { t.byteRate.Mark(bytes) } +// RecordWireBytes records actual bytes received on the wire (before decompression) +func (t *Tracker) RecordWireBytes(bytes int64) { + t.wireByteCounter.Inc(bytes) + t.wireByteRate.Mark(bytes) +} + // GetStreamMetrics returns current streaming metrics func (t *Tracker) GetStreamMetrics() StreamMetrics { t.mu.RLock() @@ -76,22 +88,28 @@ func (t *Tracker) GetStreamMetrics() StreamMetrics { // Safely convert counters to uint64 msgCount := t.messageCounter.Count() byteCount := t.byteCounter.Count() + wireByteCount := t.wireByteCounter.Count() - var totalMessages, totalBytes uint64 + var totalMessages, totalBytes, totalWireBytes uint64 if msgCount >= 0 { totalMessages = uint64(msgCount) } if byteCount >= 0 { totalBytes = uint64(byteCount) } + if wireByteCount >= 0 { + totalWireBytes = uint64(wireByteCount) + } return StreamMetrics{ - TotalMessages: totalMessages, - TotalBytes: totalBytes, - ConnectedSince: connectedSince, - Connected: t.isConnected.Load(), - MessagesPerSec: t.messageRate.Rate1(), - BitsPerSec: t.byteRate.Rate1() * bitsPerByte, + TotalMessages: totalMessages, + TotalBytes: totalBytes, + TotalWireBytes: totalWireBytes, + ConnectedSince: connectedSince, + Connected: t.isConnected.Load(), + MessagesPerSec: t.messageRate.Rate1(), + BitsPerSec: t.byteRate.Rate1() * bitsPerByte, + WireBitsPerSec: t.wireByteRate.Rate1() * bitsPerByte, } } @@ -117,16 +135,20 @@ func (t *Tracker) GetRouteMetrics() RouteMetrics { type StreamMetrics struct { // TotalMessages is the total number of messages received since startup TotalMessages uint64 - // TotalBytes is the total number of bytes received since startup + // TotalBytes is the total number of decompressed bytes received since startup TotalBytes uint64 + // TotalWireBytes is the total number of bytes received on the wire (before decompression) + TotalWireBytes uint64 // ConnectedSince is the time when the current connection was established ConnectedSince time.Time // Connected indicates whether the stream is currently connected Connected bool // MessagesPerSec is the rate of messages received per second (1-minute average) MessagesPerSec float64 - // BitsPerSec is the rate of bits received per second (1-minute average) + // BitsPerSec is the rate of decompressed bits received per second (1-minute average) BitsPerSec float64 + // WireBitsPerSec is the rate of bits received on the wire per second (1-minute average) + WireBitsPerSec float64 } // RouteMetrics contains route update statistics diff --git a/internal/server/handlers.go b/internal/server/handlers.go index ba7219e..38a4692 100644 --- a/internal/server/handlers.go +++ b/internal/server/handlers.go @@ -65,8 +65,10 @@ func (s *Server) handleStatusJSON() http.HandlerFunc { Uptime string `json:"uptime"` TotalMessages uint64 `json:"total_messages"` TotalBytes uint64 `json:"total_bytes"` + TotalWireBytes uint64 `json:"total_wire_bytes"` MessagesPerSec float64 `json:"messages_per_sec"` MbitsPerSec float64 `json:"mbits_per_sec"` + WireMbitsPerSec float64 `json:"wire_mbits_per_sec"` Connected bool `json:"connected"` GoVersion string `json:"go_version"` Goroutines int `json:"goroutines"` @@ -150,8 +152,10 @@ func (s *Server) handleStatusJSON() http.HandlerFunc { Uptime: uptime, TotalMessages: metrics.TotalMessages, TotalBytes: metrics.TotalBytes, + TotalWireBytes: metrics.TotalWireBytes, MessagesPerSec: metrics.MessagesPerSec, MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit, + WireMbitsPerSec: metrics.WireBitsPerSec / bitsPerMegabit, Connected: metrics.Connected, GoVersion: runtime.Version(), Goroutines: runtime.NumGoroutine(), @@ -199,8 +203,10 @@ func (s *Server) handleStats() http.HandlerFunc { Uptime string `json:"uptime"` TotalMessages uint64 `json:"total_messages"` TotalBytes uint64 `json:"total_bytes"` + TotalWireBytes uint64 `json:"total_wire_bytes"` MessagesPerSec float64 `json:"messages_per_sec"` MbitsPerSec float64 `json:"mbits_per_sec"` + WireMbitsPerSec float64 `json:"wire_mbits_per_sec"` Connected bool `json:"connected"` GoVersion string `json:"go_version"` Goroutines int `json:"goroutines"` @@ -311,8 +317,10 @@ func (s *Server) handleStats() http.HandlerFunc { Uptime: uptime, TotalMessages: metrics.TotalMessages, TotalBytes: metrics.TotalBytes, + TotalWireBytes: metrics.TotalWireBytes, MessagesPerSec: metrics.MessagesPerSec, MbitsPerSec: metrics.BitsPerSec / bitsPerMegabit, + WireMbitsPerSec: metrics.WireBitsPerSec / bitsPerMegabit, Connected: metrics.Connected, GoVersion: runtime.Version(), Goroutines: runtime.NumGoroutine(), diff --git a/internal/streamer/streamer.go b/internal/streamer/streamer.go index 48c0fe2..f4f9bb9 100644 --- a/internal/streamer/streamer.go +++ b/internal/streamer/streamer.go @@ -4,9 +4,11 @@ package streamer import ( "bufio" + "compress/gzip" "context" "encoding/json" "fmt" + "io" "math" "math/rand" "net/http" @@ -19,6 +21,25 @@ import ( "git.eeqj.de/sneak/routewatch/internal/ristypes" ) +// countingReader wraps an io.Reader and counts bytes read +type countingReader struct { + reader io.Reader + count int64 +} + +// Read implements io.Reader and counts bytes +func (c *countingReader) Read(p []byte) (int, error) { + n, err := c.reader.Read(p) + atomic.AddInt64(&c.count, int64(n)) + + return n, err +} + +// Count returns the total bytes read +func (c *countingReader) Count() int64 { + return atomic.LoadInt64(&c.count) +} + // Configuration constants for the RIS Live streamer. const ( risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json&" + @@ -103,6 +124,10 @@ func New(logger *logger.Logger, metrics *metrics.Tracker) *Streamer { logger: logger, client: &http.Client{ Timeout: 0, // No timeout for streaming + Transport: &http.Transport{ + // Disable automatic gzip decompression so we can measure wire bytes + DisableCompression: true, + }, }, handlers: make([]*handlerInfo, 0), metrics: metrics, @@ -316,16 +341,18 @@ func (s *Streamer) logMetrics() { uptime, "total_messages", metrics.TotalMessages, - "total_bytes", + "wire_bytes", + metrics.TotalWireBytes, + "wire_mb", + fmt.Sprintf("%.2f", float64(metrics.TotalWireBytes)/bytesPerMB), + "wire_mbps", + fmt.Sprintf("%.2f", metrics.WireBitsPerSec/bitsPerMegabit), + "decompressed_bytes", metrics.TotalBytes, - "total_mb", + "decompressed_mb", fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB), "messages_per_sec", fmt.Sprintf("%.2f", metrics.MessagesPerSec), - "bits_per_sec", - fmt.Sprintf("%.0f", metrics.BitsPerSec), - "mbps", - fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit), "total_dropped", totalDropped, ) @@ -438,6 +465,9 @@ func (s *Streamer) stream(ctx context.Context) error { return fmt.Errorf("failed to create request: %w", err) } + // Explicitly request gzip compression + req.Header.Set("Accept-Encoding", "gzip") + resp, err := s.client.Do(req) if err != nil { return fmt.Errorf("failed to connect to RIS Live: %w", err) @@ -452,9 +482,28 @@ func (s *Streamer) stream(ctx context.Context) error { return fmt.Errorf("unexpected status code: %d", resp.StatusCode) } - s.logger.Info("Connected to RIS Live stream") + // Wrap body with counting reader to track actual wire bytes + wireCounter := &countingReader{reader: resp.Body} + + // Check if response is gzip-compressed and decompress if needed + var reader io.Reader = wireCounter + if resp.Header.Get("Content-Encoding") == "gzip" { + gzReader, err := gzip.NewReader(wireCounter) + if err != nil { + return fmt.Errorf("failed to create gzip reader: %w", err) + } + defer func() { _ = gzReader.Close() }() + reader = gzReader + s.logger.Info("Connected to RIS Live stream", "compression", "gzip") + } else { + s.logger.Info("Connected to RIS Live stream", "compression", "none") + } + s.metrics.SetConnected(true) + // Track wire bytes for metrics updates + var lastWireBytes int64 + // Start metrics logging goroutine metricsTicker := time.NewTicker(metricsLogInterval) defer metricsTicker.Stop() @@ -470,7 +519,27 @@ func (s *Streamer) stream(ctx context.Context) error { } }() - scanner := bufio.NewScanner(resp.Body) + // Wire byte update ticker - update metrics with actual wire bytes periodically + wireUpdateTicker := time.NewTicker(time.Second) + defer wireUpdateTicker.Stop() + + go func() { + for { + select { + case <-wireUpdateTicker.C: + currentBytes := wireCounter.Count() + delta := currentBytes - lastWireBytes + if delta > 0 { + s.metrics.RecordWireBytes(delta) + lastWireBytes = currentBytes + } + case <-ctx.Done(): + return + } + } + }() + + scanner := bufio.NewScanner(reader) for scanner.Scan() { select { @@ -486,7 +555,7 @@ func (s *Streamer) stream(ctx context.Context) error { continue } - // Update metrics with message size + // Update metrics with decompressed message size s.updateMetrics(len(line)) // Call raw handler if registered