Files
pixa/internal/handlers/handlers.go
sneak f244d9c7e0 Add per-host connection limits for upstream fetching
- Add upstream_connections_per_host config option (default: 20)
- Implement per-host semaphores to limit concurrent connections
- Semaphore released when response body is closed
- Prevents overwhelming origin servers with parallel requests
2026-01-08 05:19:20 -08:00

115 lines
2.7 KiB
Go

// Package handlers provides HTTP request handlers.
package handlers
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"time"
"go.uber.org/fx"
"sneak.berlin/go/pixa/internal/config"
"sneak.berlin/go/pixa/internal/database"
"sneak.berlin/go/pixa/internal/healthcheck"
"sneak.berlin/go/pixa/internal/imgcache"
"sneak.berlin/go/pixa/internal/logger"
)
// Params defines dependencies for Handlers.
type Params struct {
fx.In
Logger *logger.Logger
Healthcheck *healthcheck.Healthcheck
Database *database.Database
Config *config.Config
}
// Handlers provides HTTP request handlers.
type Handlers struct {
log *slog.Logger
hc *healthcheck.Healthcheck
db *database.Database
config *config.Config
imgSvc *imgcache.Service
imgCache *imgcache.Cache
}
// New creates a new Handlers instance.
func New(lc fx.Lifecycle, params Params) (*Handlers, error) {
s := &Handlers{
log: params.Logger.Get(),
hc: params.Healthcheck,
db: params.Database,
config: params.Config,
}
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
return s.initImageService()
},
})
return s, nil
}
// initImageService initializes the image cache and service.
func (s *Handlers) initImageService() error {
// Create the cache
cache, err := imgcache.NewCache(s.db.DB(), imgcache.CacheConfig{
StateDir: s.config.StateDir,
CacheTTL: imgcache.DefaultCacheTTL,
NegativeTTL: imgcache.DefaultNegativeTTL,
HotCacheSize: imgcache.DefaultHotCacheSize,
HotCacheEnabled: true,
})
if err != nil {
return err
}
s.imgCache = cache
// Create the fetcher config
fetcherCfg := imgcache.DefaultFetcherConfig()
fetcherCfg.AllowHTTP = s.config.AllowHTTP
if s.config.UpstreamConnectionsPerHost > 0 {
fetcherCfg.MaxConnectionsPerHost = s.config.UpstreamConnectionsPerHost
}
// Create the service
svc, err := imgcache.NewService(&imgcache.ServiceConfig{
Cache: cache,
FetcherConfig: fetcherCfg,
SigningKey: s.config.SigningKey,
Whitelist: s.config.WhitelistHosts,
Logger: s.log,
})
if err != nil {
return err
}
s.imgSvc = svc
s.log.Info("image service initialized")
return nil
}
func (s *Handlers) respondJSON(w http.ResponseWriter, data interface{}, status int) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if data != nil {
err := json.NewEncoder(w).Encode(data)
if err != nil {
s.log.Error("json encode error", "error", err)
}
}
}
func (s *Handlers) respondError(w http.ResponseWriter, message string, status int) {
s.respondJSON(w, map[string]interface{}{
"error": message,
"status": status,
"timestamp": time.Now().UTC().Format(time.RFC3339),
}, status)
}