Add backend with buffered zstd-compressed report storage
Introduce the Go backend (netwatch-server) with an HTTP API that accepts telemetry reports and persists them as zstd-compressed JSONL files. Reports are buffered in memory and flushed to disk when the buffer reaches 10 MiB or every 60 seconds.
This commit is contained in:
199
backend/internal/reportbuf/reportbuf.go
Normal file
199
backend/internal/reportbuf/reportbuf.go
Normal file
@@ -0,0 +1,199 @@
|
||||
// Package reportbuf accumulates telemetry reports in memory
|
||||
// and periodically flushes them to zstd-compressed JSONL files.
|
||||
package reportbuf
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"sneak.berlin/go/netwatch/internal/config"
|
||||
"sneak.berlin/go/netwatch/internal/logger"
|
||||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"go.uber.org/fx"
|
||||
)
|
||||
|
||||
const (
|
||||
flushSizeThreshold = 10 << 20 // 10 MiB
|
||||
flushInterval = 1 * time.Minute
|
||||
defaultDataDir = "./data/reports"
|
||||
dirPerms fs.FileMode = 0o750
|
||||
filePerms fs.FileMode = 0o640
|
||||
)
|
||||
|
||||
// Params defines the dependencies for Buffer.
|
||||
type Params struct {
|
||||
fx.In
|
||||
|
||||
Config *config.Config
|
||||
Logger *logger.Logger
|
||||
}
|
||||
|
||||
// Buffer accumulates JSON lines in memory and flushes them
|
||||
// to zstd-compressed files on disk.
|
||||
type Buffer struct {
|
||||
buf bytes.Buffer
|
||||
dataDir string
|
||||
done chan struct{}
|
||||
log *slog.Logger
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// New creates a Buffer and registers lifecycle hooks to
|
||||
// manage the data directory and flush goroutine.
|
||||
func New(
|
||||
lc fx.Lifecycle,
|
||||
params Params,
|
||||
) (*Buffer, error) {
|
||||
dir := params.Config.DataDir
|
||||
if dir == "" {
|
||||
dir = defaultDataDir
|
||||
}
|
||||
|
||||
b := &Buffer{
|
||||
dataDir: dir,
|
||||
done: make(chan struct{}),
|
||||
log: params.Logger.Get(),
|
||||
}
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(_ context.Context) error {
|
||||
err := os.MkdirAll(b.dataDir, dirPerms)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create data dir: %w", err)
|
||||
}
|
||||
|
||||
go b.flushLoop()
|
||||
|
||||
return nil
|
||||
},
|
||||
OnStop: func(_ context.Context) error {
|
||||
close(b.done)
|
||||
b.flushLocked()
|
||||
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// Append marshals v as a single JSON line and appends it to
|
||||
// the buffer. If the buffer reaches the size threshold, it is
|
||||
// drained and written to disk asynchronously.
|
||||
func (b *Buffer) Append(v any) error {
|
||||
line, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal report: %w", err)
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
b.buf.Write(line)
|
||||
b.buf.WriteByte('\n')
|
||||
|
||||
if b.buf.Len() >= flushSizeThreshold {
|
||||
data := b.drainBuf()
|
||||
b.mu.Unlock()
|
||||
|
||||
go b.writeFile(data)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
b.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// flushLoop runs a ticker that periodically flushes buffered
|
||||
// data to disk until the done channel is closed.
|
||||
func (b *Buffer) flushLoop() {
|
||||
ticker := time.NewTicker(flushInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
b.flushLocked()
|
||||
case <-b.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// flushLocked acquires the lock, drains the buffer, and
|
||||
// writes the data to a compressed file.
|
||||
func (b *Buffer) flushLocked() {
|
||||
b.mu.Lock()
|
||||
|
||||
if b.buf.Len() == 0 {
|
||||
b.mu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
data := b.drainBuf()
|
||||
b.mu.Unlock()
|
||||
|
||||
b.writeFile(data)
|
||||
}
|
||||
|
||||
// drainBuf copies the buffer contents and resets it.
|
||||
// The caller must hold b.mu.
|
||||
func (b *Buffer) drainBuf() []byte {
|
||||
data := make([]byte, b.buf.Len())
|
||||
copy(data, b.buf.Bytes())
|
||||
b.buf.Reset()
|
||||
|
||||
return data
|
||||
}
|
||||
|
||||
// writeFile creates a timestamped zstd-compressed JSONL file
|
||||
// in the data directory.
|
||||
func (b *Buffer) writeFile(data []byte) {
|
||||
ts := time.Now().UTC().Format("2006-01-02T15-04-05.000Z")
|
||||
name := fmt.Sprintf("reports-%s.jsonl.zst", ts)
|
||||
path := filepath.Join(b.dataDir, name)
|
||||
|
||||
f, err := os.OpenFile( //nolint:gosec // path built from controlled dataDir + timestamp
|
||||
path,
|
||||
os.O_WRONLY|os.O_CREATE|os.O_EXCL,
|
||||
filePerms,
|
||||
)
|
||||
if err != nil {
|
||||
b.log.Error("create report file", "error", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
defer func() { _ = f.Close() }()
|
||||
|
||||
enc, err := zstd.NewWriter(f)
|
||||
if err != nil {
|
||||
b.log.Error("create zstd encoder", "error", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
_, writeErr := enc.Write(data)
|
||||
if writeErr != nil {
|
||||
b.log.Error("write compressed data", "error", writeErr)
|
||||
|
||||
_ = enc.Close()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
closeErr := enc.Close()
|
||||
if closeErr != nil {
|
||||
b.log.Error("close zstd encoder", "error", closeErr)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user