// Package reportbuf accumulates telemetry reports in memory // and periodically flushes them to zstd-compressed JSONL files. package reportbuf import ( "bytes" "context" "encoding/json" "fmt" "io/fs" "log/slog" "os" "path/filepath" "sync" "time" "sneak.berlin/go/netwatch/internal/config" "sneak.berlin/go/netwatch/internal/logger" "github.com/klauspost/compress/zstd" "go.uber.org/fx" ) const ( flushSizeThreshold = 10 << 20 // 10 MiB flushInterval = 1 * time.Minute defaultDataDir = "./data/reports" dirPerms fs.FileMode = 0o750 filePerms fs.FileMode = 0o640 ) // Params defines the dependencies for Buffer. type Params struct { fx.In Config *config.Config Logger *logger.Logger } // Buffer accumulates JSON lines in memory and flushes them // to zstd-compressed files on disk. type Buffer struct { buf bytes.Buffer dataDir string done chan struct{} log *slog.Logger mu sync.Mutex } // New creates a Buffer and registers lifecycle hooks to // manage the data directory and flush goroutine. func New( lc fx.Lifecycle, params Params, ) (*Buffer, error) { dir := params.Config.DataDir if dir == "" { dir = defaultDataDir } b := &Buffer{ dataDir: dir, done: make(chan struct{}), log: params.Logger.Get(), } lc.Append(fx.Hook{ OnStart: func(_ context.Context) error { err := os.MkdirAll(b.dataDir, dirPerms) if err != nil { return fmt.Errorf("create data dir: %w", err) } go b.flushLoop() return nil }, OnStop: func(_ context.Context) error { close(b.done) b.flushLocked() return nil }, }) return b, nil } // Append marshals v as a single JSON line and appends it to // the buffer. If the buffer reaches the size threshold, it is // drained and written to disk asynchronously. func (b *Buffer) Append(v any) error { line, err := json.Marshal(v) if err != nil { return fmt.Errorf("marshal report: %w", err) } b.mu.Lock() b.buf.Write(line) b.buf.WriteByte('\n') if b.buf.Len() >= flushSizeThreshold { data := b.drainBuf() b.mu.Unlock() go b.writeFile(data) return nil } b.mu.Unlock() return nil } // flushLoop runs a ticker that periodically flushes buffered // data to disk until the done channel is closed. func (b *Buffer) flushLoop() { ticker := time.NewTicker(flushInterval) defer ticker.Stop() for { select { case <-ticker.C: b.flushLocked() case <-b.done: return } } } // flushLocked acquires the lock, drains the buffer, and // writes the data to a compressed file. func (b *Buffer) flushLocked() { b.mu.Lock() if b.buf.Len() == 0 { b.mu.Unlock() return } data := b.drainBuf() b.mu.Unlock() b.writeFile(data) } // drainBuf copies the buffer contents and resets it. // The caller must hold b.mu. func (b *Buffer) drainBuf() []byte { data := make([]byte, b.buf.Len()) copy(data, b.buf.Bytes()) b.buf.Reset() return data } // writeFile creates a timestamped zstd-compressed JSONL file // in the data directory. func (b *Buffer) writeFile(data []byte) { ts := time.Now().UTC().Format("2006-01-02T15-04-05.000Z") name := fmt.Sprintf("reports-%s.jsonl.zst", ts) path := filepath.Join(b.dataDir, name) f, err := os.OpenFile( //nolint:gosec // path built from controlled dataDir + timestamp path, os.O_WRONLY|os.O_CREATE|os.O_EXCL, filePerms, ) if err != nil { b.log.Error("create report file", "error", err) return } defer func() { _ = f.Close() }() enc, err := zstd.NewWriter(f) if err != nil { b.log.Error("create zstd encoder", "error", err) return } _, writeErr := enc.Write(data) if writeErr != nil { b.log.Error("write compressed data", "error", writeErr) _ = enc.Close() return } closeErr := enc.Close() if closeErr != nil { b.log.Error("close zstd encoder", "error", closeErr) } }