Files
sneak b57afeddbd Add backend with buffered zstd-compressed report storage
Introduce the Go backend (netwatch-server) with an HTTP API that
accepts telemetry reports and persists them as zstd-compressed JSONL
files. Reports are buffered in memory and flushed to disk when the
buffer reaches 10 MiB or every 60 seconds.
2026-02-27 12:14:34 +07:00

148 lines
2.7 KiB
Go

// Package server provides the HTTP server lifecycle,
// including startup, routing, signal handling, and graceful
// shutdown.
package server
import (
"context"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"sneak.berlin/go/netwatch/internal/config"
"sneak.berlin/go/netwatch/internal/globals"
"sneak.berlin/go/netwatch/internal/handlers"
"sneak.berlin/go/netwatch/internal/logger"
"sneak.berlin/go/netwatch/internal/middleware"
"github.com/go-chi/chi/v5"
"go.uber.org/fx"
)
// Params defines the dependencies for Server.
type Params struct {
fx.In
Config *config.Config
Globals *globals.Globals
Handlers *handlers.Handlers
Logger *logger.Logger
Middleware *middleware.Middleware
}
// Server is the top-level HTTP server orchestrator.
type Server struct {
cancelFunc context.CancelFunc
exitCode int
h *handlers.Handlers
httpServer *http.Server
log *slog.Logger
mw *middleware.Middleware
params Params
router *chi.Mux
startupTime time.Time
}
// New creates a Server and registers lifecycle hooks for
// starting and stopping it.
func New(
lc fx.Lifecycle,
params Params,
) (*Server, error) {
s := new(Server)
s.params = params
s.mw = params.Middleware
s.h = params.Handlers
s.log = params.Logger.Get()
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
s.startupTime = time.Now().UTC()
go func() { //nolint:contextcheck // fx OnStart ctx is startup-only; run() creates its own
s.run()
}()
return nil
},
OnStop: func(_ context.Context) error {
if s.cancelFunc != nil {
s.cancelFunc()
}
return nil
},
})
return s, nil
}
// ServeHTTP delegates to the chi router.
func (s *Server) ServeHTTP(
w http.ResponseWriter,
r *http.Request,
) {
s.router.ServeHTTP(w, r)
}
func (s *Server) run() {
exitCode := s.serve()
os.Exit(exitCode)
}
func (s *Server) serve() int {
var ctx context.Context //nolint:wsl // ctx must be declared before multi-assign
ctx, s.cancelFunc = context.WithCancel(
context.Background(),
)
go func() {
c := make(chan os.Signal, 1)
signal.Ignore(syscall.SIGPIPE)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
sig := <-c
s.log.Info("signal received", "signal", sig)
if s.cancelFunc != nil {
s.cancelFunc()
}
}()
go func() {
s.serveUntilShutdown()
}()
<-ctx.Done()
s.cleanShutdown()
return s.exitCode
}
const shutdownTimeout = 5 * time.Second
func (s *Server) cleanShutdown() {
s.exitCode = 0
ctxShutdown, shutdownCancel := context.WithTimeout(
context.Background(),
shutdownTimeout,
)
defer shutdownCancel()
err := s.httpServer.Shutdown(ctxShutdown)
if err != nil {
s.log.Error(
"server clean shutdown failed",
"error", err,
)
}
s.log.Info("server stopped")
}