Track wire bytes separately from decompressed stream bytes

The stream stats were showing decompressed data sizes, not actual wire
bandwidth. This change adds wire byte tracking by disabling automatic
gzip decompression in the HTTP client and wrapping the response body
with a counting reader before decompression. Both wire (compressed) and
decompressed bytes are now tracked and exposed in the API responses.
This commit is contained in:
2025-12-27 12:56:57 +07:00
parent 95bbb655ab
commit ab392d874c
3 changed files with 126 additions and 27 deletions

View File

@@ -4,9 +4,11 @@ package streamer
import (
"bufio"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"math"
"math/rand"
"net/http"
@@ -19,6 +21,25 @@ import (
"git.eeqj.de/sneak/routewatch/internal/ristypes"
)
// countingReader wraps an io.Reader and counts bytes read
type countingReader struct {
reader io.Reader
count int64
}
// Read implements io.Reader and counts bytes
func (c *countingReader) Read(p []byte) (int, error) {
n, err := c.reader.Read(p)
atomic.AddInt64(&c.count, int64(n))
return n, err
}
// Count returns the total bytes read
func (c *countingReader) Count() int64 {
return atomic.LoadInt64(&c.count)
}
// Configuration constants for the RIS Live streamer.
const (
risLiveURL = "https://ris-live.ripe.net/v1/stream/?format=json&" +
@@ -103,6 +124,10 @@ func New(logger *logger.Logger, metrics *metrics.Tracker) *Streamer {
logger: logger,
client: &http.Client{
Timeout: 0, // No timeout for streaming
Transport: &http.Transport{
// Disable automatic gzip decompression so we can measure wire bytes
DisableCompression: true,
},
},
handlers: make([]*handlerInfo, 0),
metrics: metrics,
@@ -316,16 +341,18 @@ func (s *Streamer) logMetrics() {
uptime,
"total_messages",
metrics.TotalMessages,
"total_bytes",
"wire_bytes",
metrics.TotalWireBytes,
"wire_mb",
fmt.Sprintf("%.2f", float64(metrics.TotalWireBytes)/bytesPerMB),
"wire_mbps",
fmt.Sprintf("%.2f", metrics.WireBitsPerSec/bitsPerMegabit),
"decompressed_bytes",
metrics.TotalBytes,
"total_mb",
"decompressed_mb",
fmt.Sprintf("%.2f", float64(metrics.TotalBytes)/bytesPerMB),
"messages_per_sec",
fmt.Sprintf("%.2f", metrics.MessagesPerSec),
"bits_per_sec",
fmt.Sprintf("%.0f", metrics.BitsPerSec),
"mbps",
fmt.Sprintf("%.2f", metrics.BitsPerSec/bitsPerMegabit),
"total_dropped",
totalDropped,
)
@@ -438,6 +465,9 @@ func (s *Streamer) stream(ctx context.Context) error {
return fmt.Errorf("failed to create request: %w", err)
}
// Explicitly request gzip compression
req.Header.Set("Accept-Encoding", "gzip")
resp, err := s.client.Do(req)
if err != nil {
return fmt.Errorf("failed to connect to RIS Live: %w", err)
@@ -452,9 +482,28 @@ func (s *Streamer) stream(ctx context.Context) error {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
s.logger.Info("Connected to RIS Live stream")
// Wrap body with counting reader to track actual wire bytes
wireCounter := &countingReader{reader: resp.Body}
// Check if response is gzip-compressed and decompress if needed
var reader io.Reader = wireCounter
if resp.Header.Get("Content-Encoding") == "gzip" {
gzReader, err := gzip.NewReader(wireCounter)
if err != nil {
return fmt.Errorf("failed to create gzip reader: %w", err)
}
defer func() { _ = gzReader.Close() }()
reader = gzReader
s.logger.Info("Connected to RIS Live stream", "compression", "gzip")
} else {
s.logger.Info("Connected to RIS Live stream", "compression", "none")
}
s.metrics.SetConnected(true)
// Track wire bytes for metrics updates
var lastWireBytes int64
// Start metrics logging goroutine
metricsTicker := time.NewTicker(metricsLogInterval)
defer metricsTicker.Stop()
@@ -470,7 +519,27 @@ func (s *Streamer) stream(ctx context.Context) error {
}
}()
scanner := bufio.NewScanner(resp.Body)
// Wire byte update ticker - update metrics with actual wire bytes periodically
wireUpdateTicker := time.NewTicker(time.Second)
defer wireUpdateTicker.Stop()
go func() {
for {
select {
case <-wireUpdateTicker.C:
currentBytes := wireCounter.Count()
delta := currentBytes - lastWireBytes
if delta > 0 {
s.metrics.RecordWireBytes(delta)
lastWireBytes = currentBytes
}
case <-ctx.Done():
return
}
}
}()
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
select {
@@ -486,7 +555,7 @@ func (s *Streamer) stream(ctx context.Context) error {
continue
}
// Update metrics with message size
// Update metrics with decompressed message size
s.updateMetrics(len(line))
// Call raw handler if registered