Wire up image handler endpoint with service orchestration
- Add image proxy config options (signing_key, whitelist_hosts, allow_http) - Create Service to orchestrate cache, fetcher, and processor - Initialize image service in handlers OnStart hook - Implement HandleImage with URL parsing, signature validation, cache - Implement HandleRobotsTxt for search engine prevention - Parse query params for signature, quality, and fit mode
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"git.eeqj.de/sneak/smartconfig"
|
||||
"go.uber.org/fx"
|
||||
@@ -35,6 +36,11 @@ type Config struct {
|
||||
SentryDSN string
|
||||
StateDir string
|
||||
DBURL string
|
||||
|
||||
// Image proxy settings
|
||||
SigningKey string // HMAC signing key for URL signatures
|
||||
WhitelistHosts []string // Hosts that don't require signatures
|
||||
AllowHTTP bool // Allow non-TLS upstream (testing only)
|
||||
}
|
||||
|
||||
// New creates a new Config instance by loading configuration from file.
|
||||
@@ -79,6 +85,9 @@ func New(_ fx.Lifecycle, params Params) (*Config, error) {
|
||||
SentryDSN: getString(sc, "sentry_dsn", ""),
|
||||
MetricsUsername: getString(sc, "metrics.username", ""),
|
||||
MetricsPassword: getString(sc, "metrics.password", ""),
|
||||
SigningKey: getString(sc, "signing_key", ""),
|
||||
WhitelistHosts: getStringSlice(sc, "whitelist_hosts"),
|
||||
AllowHTTP: getBool(sc, "allow_http", false),
|
||||
}
|
||||
|
||||
// Build DBURL from StateDir if not explicitly set
|
||||
@@ -132,3 +141,27 @@ func getBool(sc *smartconfig.Config, key string, defaultVal bool) bool {
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
func getStringSlice(sc *smartconfig.Config, key string) []string {
|
||||
if sc == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
val, err := sc.GetString(key)
|
||||
if err != nil || val == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Parse comma-separated values
|
||||
parts := strings.Split(val, ",")
|
||||
result := make([]string, 0, len(parts))
|
||||
|
||||
for _, part := range parts {
|
||||
trimmed := strings.TrimSpace(part)
|
||||
if trimmed != "" {
|
||||
result = append(result, trimmed)
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -6,9 +6,13 @@ import (
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"go.uber.org/fx"
|
||||
"sneak.berlin/go/pixa/internal/config"
|
||||
"sneak.berlin/go/pixa/internal/database"
|
||||
"sneak.berlin/go/pixa/internal/healthcheck"
|
||||
"sneak.berlin/go/pixa/internal/imgcache"
|
||||
"sneak.berlin/go/pixa/internal/logger"
|
||||
)
|
||||
|
||||
@@ -17,12 +21,18 @@ type Params struct {
|
||||
fx.In
|
||||
Logger *logger.Logger
|
||||
Healthcheck *healthcheck.Healthcheck
|
||||
Database *database.Database
|
||||
Config *config.Config
|
||||
}
|
||||
|
||||
// Handlers provides HTTP request handlers.
|
||||
type Handlers struct {
|
||||
log *slog.Logger
|
||||
hc *healthcheck.Healthcheck
|
||||
db *database.Database
|
||||
config *config.Config
|
||||
imgSvc *imgcache.Service
|
||||
imgCache *imgcache.Cache
|
||||
}
|
||||
|
||||
// New creates a new Handlers instance.
|
||||
@@ -30,17 +40,57 @@ func New(lc fx.Lifecycle, params Params) (*Handlers, error) {
|
||||
s := &Handlers{
|
||||
log: params.Logger.Get(),
|
||||
hc: params.Healthcheck,
|
||||
db: params.Database,
|
||||
config: params.Config,
|
||||
}
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(_ context.Context) error {
|
||||
return nil
|
||||
return s.initImageService()
|
||||
},
|
||||
})
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// initImageService initializes the image cache and service.
|
||||
func (s *Handlers) initImageService() error {
|
||||
// Create the cache
|
||||
cache, err := imgcache.NewCache(s.db.DB(), imgcache.CacheConfig{
|
||||
StateDir: s.config.StateDir,
|
||||
CacheTTL: imgcache.DefaultCacheTTL,
|
||||
NegativeTTL: imgcache.DefaultNegativeTTL,
|
||||
HotCacheSize: imgcache.DefaultHotCacheSize,
|
||||
HotCacheEnabled: true,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.imgCache = cache
|
||||
|
||||
// Create the fetcher config
|
||||
fetcherCfg := imgcache.DefaultFetcherConfig()
|
||||
fetcherCfg.AllowHTTP = s.config.AllowHTTP
|
||||
|
||||
// Create the service
|
||||
svc, err := imgcache.NewService(&imgcache.ServiceConfig{
|
||||
Cache: cache,
|
||||
FetcherConfig: fetcherCfg,
|
||||
SigningKey: s.config.SigningKey,
|
||||
Whitelist: s.config.WhitelistHosts,
|
||||
Logger: s.log,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.imgSvc = svc
|
||||
s.log.Info("image service initialized")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Handlers) respondJSON(w http.ResponseWriter, data interface{}, status int) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(status)
|
||||
@@ -51,3 +101,11 @@ func (s *Handlers) respondJSON(w http.ResponseWriter, data interface{}, status i
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Handlers) respondError(w http.ResponseWriter, message string, status int) {
|
||||
s.respondJSON(w, map[string]interface{}{
|
||||
"error": message,
|
||||
"status": status,
|
||||
"timestamp": time.Now().UTC().Format(time.RFC3339),
|
||||
}, status)
|
||||
}
|
||||
|
||||
@@ -1,28 +1,145 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"sneak.berlin/go/pixa/internal/imgcache"
|
||||
)
|
||||
|
||||
// HandleImage handles the main image proxy route:
|
||||
// /v1/image/<host>/<path>/<width>x<height>.<format>
|
||||
func (s *Handlers) HandleImage() http.HandlerFunc {
|
||||
return func(_ http.ResponseWriter, _ *http.Request) {
|
||||
// FIXME: Implement image proxy handler
|
||||
// - Parse URL to extract host, path, size, format
|
||||
// - Validate signature and expiration
|
||||
// - Check source host whitelist
|
||||
// - Fetch from upstream (with SSRF protection)
|
||||
// - Process image (resize, convert format)
|
||||
// - Cache and serve result
|
||||
panic("unimplemented")
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
// Get the wildcard path from chi
|
||||
pathParam := chi.URLParam(r, "*")
|
||||
|
||||
// Parse the URL path
|
||||
parsed, err := imgcache.ParseImagePath(pathParam)
|
||||
if err != nil {
|
||||
s.log.Warn("failed to parse image URL",
|
||||
"path", pathParam,
|
||||
"error", err,
|
||||
)
|
||||
s.respondError(w, "invalid image URL: "+err.Error(), http.StatusBadRequest)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Convert to ImageRequest
|
||||
req := parsed.ToImageRequest()
|
||||
|
||||
// Parse signature params from query string
|
||||
query := r.URL.Query()
|
||||
req.Signature = query.Get("sig")
|
||||
|
||||
if expStr := query.Get("exp"); expStr != "" {
|
||||
if exp, err := strconv.ParseInt(expStr, 10, 64); err == nil {
|
||||
req.Expires = time.Unix(exp, 0)
|
||||
}
|
||||
}
|
||||
|
||||
// Parse optional quality and fit params
|
||||
if qStr := query.Get("q"); qStr != "" {
|
||||
if q, err := strconv.Atoi(qStr); err == nil && q > 0 && q <= 100 {
|
||||
req.Quality = q
|
||||
}
|
||||
}
|
||||
|
||||
if fit := query.Get("fit"); fit != "" {
|
||||
req.FitMode = imgcache.FitMode(fit)
|
||||
}
|
||||
|
||||
// Default quality if not set
|
||||
if req.Quality == 0 {
|
||||
req.Quality = 85
|
||||
}
|
||||
|
||||
// Default fit mode if not set
|
||||
if req.FitMode == "" {
|
||||
req.FitMode = imgcache.FitCover
|
||||
}
|
||||
|
||||
// Validate signature if required
|
||||
if err := s.imgSvc.ValidateRequest(req); err != nil {
|
||||
s.log.Warn("signature validation failed",
|
||||
"host", req.SourceHost,
|
||||
"path", req.SourcePath,
|
||||
"error", err,
|
||||
)
|
||||
s.respondError(w, "unauthorized", http.StatusUnauthorized)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Get the image (from cache or fetch/process)
|
||||
resp, err := s.imgSvc.Get(ctx, req)
|
||||
if err != nil {
|
||||
s.log.Error("failed to get image",
|
||||
"host", req.SourceHost,
|
||||
"path", req.SourcePath,
|
||||
"error", err,
|
||||
)
|
||||
|
||||
// Check for specific error types
|
||||
if errors.Is(err, imgcache.ErrSSRFBlocked) {
|
||||
s.respondError(w, "forbidden", http.StatusForbidden)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if errors.Is(err, imgcache.ErrUpstreamError) {
|
||||
s.respondError(w, "upstream error", http.StatusBadGateway)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
s.respondError(w, "internal error", http.StatusInternalServerError)
|
||||
|
||||
return
|
||||
}
|
||||
defer func() { _ = resp.Content.Close() }()
|
||||
|
||||
// Set response headers
|
||||
w.Header().Set("Content-Type", resp.ContentType)
|
||||
if resp.ContentLength > 0 {
|
||||
w.Header().Set("Content-Length", strconv.FormatInt(resp.ContentLength, 10))
|
||||
}
|
||||
|
||||
// Cache control headers
|
||||
w.Header().Set("Cache-Control", "public, max-age=31536000, immutable")
|
||||
w.Header().Set("X-Pixa-Cache", string(resp.CacheStatus))
|
||||
|
||||
if resp.ETag != "" {
|
||||
w.Header().Set("ETag", resp.ETag)
|
||||
}
|
||||
|
||||
// Stream the response
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
_, err = io.Copy(w, resp.Content)
|
||||
if err != nil {
|
||||
s.log.Error("failed to write response",
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// HandleRobotsTxt serves robots.txt to prevent search engine crawling.
|
||||
func (s *Handlers) HandleRobotsTxt() http.HandlerFunc {
|
||||
return func(_ http.ResponseWriter, _ *http.Request) {
|
||||
// FIXME: Implement robots.txt handler
|
||||
panic("unimplemented")
|
||||
robotsTxt := []byte("User-agent: *\nDisallow: /\n")
|
||||
|
||||
return func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
w.Header().Set("Content-Length", strconv.Itoa(len(robotsTxt)))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write(robotsTxt)
|
||||
}
|
||||
}
|
||||
|
||||
10
internal/imgcache/module.go
Normal file
10
internal/imgcache/module.go
Normal file
@@ -0,0 +1,10 @@
|
||||
package imgcache
|
||||
|
||||
import "time"
|
||||
|
||||
// CacheConfig defaults.
|
||||
const (
|
||||
DefaultCacheTTL = 24 * time.Hour
|
||||
DefaultNegativeTTL = 5 * time.Minute
|
||||
DefaultHotCacheSize = 1000
|
||||
)
|
||||
252
internal/imgcache/service.go
Normal file
252
internal/imgcache/service.go
Normal file
@@ -0,0 +1,252 @@
|
||||
package imgcache
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Service implements the ImageCache interface, orchestrating cache, fetcher, and processor.
|
||||
type Service struct {
|
||||
cache *Cache
|
||||
fetcher *HTTPFetcher
|
||||
processor Processor
|
||||
signer *Signer
|
||||
whitelist *HostWhitelist
|
||||
log *slog.Logger
|
||||
}
|
||||
|
||||
// ServiceConfig holds configuration for the image service.
|
||||
type ServiceConfig struct {
|
||||
// Cache is the cache instance
|
||||
Cache *Cache
|
||||
// FetcherConfig configures the upstream fetcher
|
||||
FetcherConfig *FetcherConfig
|
||||
// SigningKey is the HMAC signing key (empty disables signing)
|
||||
SigningKey string
|
||||
// Whitelist is the list of hosts that don't require signatures
|
||||
Whitelist []string
|
||||
// Logger for logging
|
||||
Logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewService creates a new image service.
|
||||
func NewService(cfg *ServiceConfig) (*Service, error) {
|
||||
if cfg.Cache == nil {
|
||||
return nil, errors.New("cache is required")
|
||||
}
|
||||
|
||||
fetcherCfg := cfg.FetcherConfig
|
||||
if fetcherCfg == nil {
|
||||
fetcherCfg = DefaultFetcherConfig()
|
||||
}
|
||||
|
||||
var signer *Signer
|
||||
if cfg.SigningKey != "" {
|
||||
signer = NewSigner(cfg.SigningKey)
|
||||
}
|
||||
|
||||
log := cfg.Logger
|
||||
if log == nil {
|
||||
log = slog.Default()
|
||||
}
|
||||
|
||||
return &Service{
|
||||
cache: cfg.Cache,
|
||||
fetcher: NewHTTPFetcher(fetcherCfg),
|
||||
processor: NewImageProcessor(),
|
||||
signer: signer,
|
||||
whitelist: NewHostWhitelist(cfg.Whitelist),
|
||||
log: log,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Get retrieves a processed image, fetching and processing if necessary.
|
||||
func (s *Service) Get(ctx context.Context, req *ImageRequest) (*ImageResponse, error) {
|
||||
// Check cache first
|
||||
result, err := s.cache.Lookup(ctx, req)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrNegativeCache) {
|
||||
return nil, fmt.Errorf("upstream returned error (cached)")
|
||||
}
|
||||
|
||||
s.log.Warn("cache lookup failed", "error", err)
|
||||
}
|
||||
|
||||
// Cache hit - serve from cache
|
||||
if result != nil && result.Hit {
|
||||
s.cache.IncrementStats(ctx, true, 0)
|
||||
|
||||
reader, err := s.cache.GetOutput(result.OutputHash)
|
||||
if err != nil {
|
||||
s.log.Error("failed to get cached output", "hash", result.OutputHash, "error", err)
|
||||
// Fall through to re-fetch
|
||||
} else {
|
||||
return &ImageResponse{
|
||||
Content: reader,
|
||||
ContentLength: -1, // Unknown until read
|
||||
ContentType: result.ContentType,
|
||||
CacheStatus: CacheHit,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Cache miss - need to fetch, process, and cache
|
||||
s.cache.IncrementStats(ctx, false, 0)
|
||||
|
||||
response, err := s.fetchAndProcess(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response.CacheStatus = CacheMiss
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
// fetchAndProcess fetches from upstream, processes, and caches the result.
|
||||
func (s *Service) fetchAndProcess(ctx context.Context, req *ImageRequest) (*ImageResponse, error) {
|
||||
// Fetch from upstream
|
||||
sourceURL := req.SourceURL()
|
||||
|
||||
s.log.Debug("fetching from upstream", "url", sourceURL)
|
||||
|
||||
fetchResult, err := s.fetcher.Fetch(ctx, sourceURL)
|
||||
if err != nil {
|
||||
// Store negative cache for certain errors
|
||||
if isNegativeCacheable(err) {
|
||||
statusCode := extractStatusCode(err)
|
||||
_ = s.cache.StoreNegative(ctx, req, statusCode, err.Error())
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("upstream fetch failed: %w", err)
|
||||
}
|
||||
defer func() { _ = fetchResult.Content.Close() }()
|
||||
|
||||
// Read and validate the source content
|
||||
sourceData, err := io.ReadAll(fetchResult.Content)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read upstream response: %w", err)
|
||||
}
|
||||
|
||||
// Validate magic bytes match content type
|
||||
if err := ValidateMagicBytes(sourceData, fetchResult.ContentType); err != nil {
|
||||
return nil, fmt.Errorf("content validation failed: %w", err)
|
||||
}
|
||||
|
||||
// Store source content
|
||||
_, err = s.cache.StoreSource(ctx, req, bytes.NewReader(sourceData), fetchResult)
|
||||
if err != nil {
|
||||
s.log.Warn("failed to store source content", "error", err)
|
||||
// Continue even if caching fails
|
||||
}
|
||||
|
||||
// Process the image
|
||||
processResult, err := s.processor.Process(ctx, bytes.NewReader(sourceData), req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("image processing failed: %w", err)
|
||||
}
|
||||
|
||||
// Read processed data to cache it
|
||||
processedData, err := io.ReadAll(processResult.Content)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read processed image: %w", err)
|
||||
}
|
||||
_ = processResult.Content.Close()
|
||||
|
||||
// Store output content
|
||||
metaID, err := s.cache.GetSourceMetadataID(ctx, req)
|
||||
if err == nil {
|
||||
err = s.cache.StoreOutput(ctx, req, metaID, bytes.NewReader(processedData), processResult.ContentType)
|
||||
if err != nil {
|
||||
s.log.Warn("failed to store output content", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
return &ImageResponse{
|
||||
Content: io.NopCloser(bytes.NewReader(processedData)),
|
||||
ContentLength: int64(len(processedData)),
|
||||
ContentType: processResult.ContentType,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Warm pre-fetches and caches an image without returning it.
|
||||
func (s *Service) Warm(ctx context.Context, req *ImageRequest) error {
|
||||
_, err := s.Get(ctx, req)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Purge removes a cached image.
|
||||
func (s *Service) Purge(_ context.Context, _ *ImageRequest) error {
|
||||
// TODO: Implement purge
|
||||
return errors.New("purge not implemented")
|
||||
}
|
||||
|
||||
// Stats returns cache statistics.
|
||||
func (s *Service) Stats(ctx context.Context) (*CacheStats, error) {
|
||||
return s.cache.Stats(ctx)
|
||||
}
|
||||
|
||||
// ValidateRequest validates the request signature if required.
|
||||
func (s *Service) ValidateRequest(req *ImageRequest) error {
|
||||
// Check if host is whitelisted (no signature required)
|
||||
sourceURL := req.SourceURL()
|
||||
|
||||
parsedURL, err := url.Parse(sourceURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid source URL: %w", err)
|
||||
}
|
||||
|
||||
if s.whitelist.IsWhitelisted(parsedURL) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Signature required
|
||||
if s.signer == nil {
|
||||
return errors.New("signing key not configured but host not whitelisted")
|
||||
}
|
||||
|
||||
return s.signer.Verify(req)
|
||||
}
|
||||
|
||||
// GenerateSignedURL generates a signed URL for the given request.
|
||||
func (s *Service) GenerateSignedURL(
|
||||
baseURL string,
|
||||
req *ImageRequest,
|
||||
ttl time.Duration,
|
||||
) (string, error) {
|
||||
if s.signer == nil {
|
||||
return "", errors.New("signing key not configured")
|
||||
}
|
||||
|
||||
path, sig, exp := s.signer.GenerateSignedURL(req, ttl)
|
||||
|
||||
return fmt.Sprintf("%s%s?sig=%s&exp=%d", baseURL, path, sig, exp), nil
|
||||
}
|
||||
|
||||
// HTTP status codes for error responses.
|
||||
const (
|
||||
httpStatusBadGateway = 502
|
||||
httpStatusInternalError = 500
|
||||
)
|
||||
|
||||
// isNegativeCacheable returns true if the error should be cached.
|
||||
func isNegativeCacheable(err error) bool {
|
||||
return errors.Is(err, ErrUpstreamError)
|
||||
}
|
||||
|
||||
// extractStatusCode extracts HTTP status code from error message.
|
||||
func extractStatusCode(err error) int {
|
||||
// Default to 502 Bad Gateway for upstream errors
|
||||
if errors.Is(err, ErrUpstreamError) {
|
||||
return httpStatusBadGateway
|
||||
}
|
||||
|
||||
return httpStatusInternalError
|
||||
}
|
||||
Reference in New Issue
Block a user