From fd2d108f9c2c591e712ec694b464045a406b5002 Mon Sep 17 00:00:00 2001 From: sneak Date: Thu, 8 Jan 2026 04:01:53 -0800 Subject: [PATCH] Wire up image handler endpoint with service orchestration - Add image proxy config options (signing_key, whitelist_hosts, allow_http) - Create Service to orchestrate cache, fetcher, and processor - Initialize image service in handlers OnStart hook - Implement HandleImage with URL parsing, signature validation, cache - Implement HandleRobotsTxt for search engine prevention - Parse query params for signature, quality, and fit mode --- internal/config/config.go | 33 +++++ internal/handlers/handlers.go | 68 ++++++++- internal/handlers/image.go | 141 +++++++++++++++++-- internal/imgcache/module.go | 10 ++ internal/imgcache/service.go | 252 ++++++++++++++++++++++++++++++++++ 5 files changed, 487 insertions(+), 17 deletions(-) create mode 100644 internal/imgcache/module.go create mode 100644 internal/imgcache/service.go diff --git a/internal/config/config.go b/internal/config/config.go index ef47eba..8093e58 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "git.eeqj.de/sneak/smartconfig" "go.uber.org/fx" @@ -35,6 +36,11 @@ type Config struct { SentryDSN string StateDir string DBURL string + + // Image proxy settings + SigningKey string // HMAC signing key for URL signatures + WhitelistHosts []string // Hosts that don't require signatures + AllowHTTP bool // Allow non-TLS upstream (testing only) } // New creates a new Config instance by loading configuration from file. @@ -79,6 +85,9 @@ func New(_ fx.Lifecycle, params Params) (*Config, error) { SentryDSN: getString(sc, "sentry_dsn", ""), MetricsUsername: getString(sc, "metrics.username", ""), MetricsPassword: getString(sc, "metrics.password", ""), + SigningKey: getString(sc, "signing_key", ""), + WhitelistHosts: getStringSlice(sc, "whitelist_hosts"), + AllowHTTP: getBool(sc, "allow_http", false), } // Build DBURL from StateDir if not explicitly set @@ -132,3 +141,27 @@ func getBool(sc *smartconfig.Config, key string, defaultVal bool) bool { return val } + +func getStringSlice(sc *smartconfig.Config, key string) []string { + if sc == nil { + return nil + } + + val, err := sc.GetString(key) + if err != nil || val == "" { + return nil + } + + // Parse comma-separated values + parts := strings.Split(val, ",") + result := make([]string, 0, len(parts)) + + for _, part := range parts { + trimmed := strings.TrimSpace(part) + if trimmed != "" { + result = append(result, trimmed) + } + } + + return result +} diff --git a/internal/handlers/handlers.go b/internal/handlers/handlers.go index 37b03d6..97eecbb 100644 --- a/internal/handlers/handlers.go +++ b/internal/handlers/handlers.go @@ -6,9 +6,13 @@ import ( "encoding/json" "log/slog" "net/http" + "time" "go.uber.org/fx" + "sneak.berlin/go/pixa/internal/config" + "sneak.berlin/go/pixa/internal/database" "sneak.berlin/go/pixa/internal/healthcheck" + "sneak.berlin/go/pixa/internal/imgcache" "sneak.berlin/go/pixa/internal/logger" ) @@ -17,30 +21,76 @@ type Params struct { fx.In Logger *logger.Logger Healthcheck *healthcheck.Healthcheck + Database *database.Database + Config *config.Config } // Handlers provides HTTP request handlers. type Handlers struct { - log *slog.Logger - hc *healthcheck.Healthcheck + log *slog.Logger + hc *healthcheck.Healthcheck + db *database.Database + config *config.Config + imgSvc *imgcache.Service + imgCache *imgcache.Cache } // New creates a new Handlers instance. func New(lc fx.Lifecycle, params Params) (*Handlers, error) { s := &Handlers{ - log: params.Logger.Get(), - hc: params.Healthcheck, + log: params.Logger.Get(), + hc: params.Healthcheck, + db: params.Database, + config: params.Config, } lc.Append(fx.Hook{ OnStart: func(_ context.Context) error { - return nil + return s.initImageService() }, }) return s, nil } +// initImageService initializes the image cache and service. +func (s *Handlers) initImageService() error { + // Create the cache + cache, err := imgcache.NewCache(s.db.DB(), imgcache.CacheConfig{ + StateDir: s.config.StateDir, + CacheTTL: imgcache.DefaultCacheTTL, + NegativeTTL: imgcache.DefaultNegativeTTL, + HotCacheSize: imgcache.DefaultHotCacheSize, + HotCacheEnabled: true, + }) + if err != nil { + return err + } + + s.imgCache = cache + + // Create the fetcher config + fetcherCfg := imgcache.DefaultFetcherConfig() + fetcherCfg.AllowHTTP = s.config.AllowHTTP + + // Create the service + svc, err := imgcache.NewService(&imgcache.ServiceConfig{ + Cache: cache, + FetcherConfig: fetcherCfg, + SigningKey: s.config.SigningKey, + Whitelist: s.config.WhitelistHosts, + Logger: s.log, + }) + if err != nil { + return err + } + + s.imgSvc = svc + s.log.Info("image service initialized") + + return nil +} + func (s *Handlers) respondJSON(w http.ResponseWriter, data interface{}, status int) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) @@ -51,3 +101,11 @@ func (s *Handlers) respondJSON(w http.ResponseWriter, data interface{}, status i } } } + +func (s *Handlers) respondError(w http.ResponseWriter, message string, status int) { + s.respondJSON(w, map[string]interface{}{ + "error": message, + "status": status, + "timestamp": time.Now().UTC().Format(time.RFC3339), + }, status) +} diff --git a/internal/handlers/image.go b/internal/handlers/image.go index a5f058f..08a9076 100644 --- a/internal/handlers/image.go +++ b/internal/handlers/image.go @@ -1,28 +1,145 @@ package handlers import ( + "errors" + "io" "net/http" + "strconv" + "time" + + "github.com/go-chi/chi/v5" + "sneak.berlin/go/pixa/internal/imgcache" ) // HandleImage handles the main image proxy route: // /v1/image///x. func (s *Handlers) HandleImage() http.HandlerFunc { - return func(_ http.ResponseWriter, _ *http.Request) { - // FIXME: Implement image proxy handler - // - Parse URL to extract host, path, size, format - // - Validate signature and expiration - // - Check source host whitelist - // - Fetch from upstream (with SSRF protection) - // - Process image (resize, convert format) - // - Cache and serve result - panic("unimplemented") + return func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + // Get the wildcard path from chi + pathParam := chi.URLParam(r, "*") + + // Parse the URL path + parsed, err := imgcache.ParseImagePath(pathParam) + if err != nil { + s.log.Warn("failed to parse image URL", + "path", pathParam, + "error", err, + ) + s.respondError(w, "invalid image URL: "+err.Error(), http.StatusBadRequest) + + return + } + + // Convert to ImageRequest + req := parsed.ToImageRequest() + + // Parse signature params from query string + query := r.URL.Query() + req.Signature = query.Get("sig") + + if expStr := query.Get("exp"); expStr != "" { + if exp, err := strconv.ParseInt(expStr, 10, 64); err == nil { + req.Expires = time.Unix(exp, 0) + } + } + + // Parse optional quality and fit params + if qStr := query.Get("q"); qStr != "" { + if q, err := strconv.Atoi(qStr); err == nil && q > 0 && q <= 100 { + req.Quality = q + } + } + + if fit := query.Get("fit"); fit != "" { + req.FitMode = imgcache.FitMode(fit) + } + + // Default quality if not set + if req.Quality == 0 { + req.Quality = 85 + } + + // Default fit mode if not set + if req.FitMode == "" { + req.FitMode = imgcache.FitCover + } + + // Validate signature if required + if err := s.imgSvc.ValidateRequest(req); err != nil { + s.log.Warn("signature validation failed", + "host", req.SourceHost, + "path", req.SourcePath, + "error", err, + ) + s.respondError(w, "unauthorized", http.StatusUnauthorized) + + return + } + + // Get the image (from cache or fetch/process) + resp, err := s.imgSvc.Get(ctx, req) + if err != nil { + s.log.Error("failed to get image", + "host", req.SourceHost, + "path", req.SourcePath, + "error", err, + ) + + // Check for specific error types + if errors.Is(err, imgcache.ErrSSRFBlocked) { + s.respondError(w, "forbidden", http.StatusForbidden) + + return + } + + if errors.Is(err, imgcache.ErrUpstreamError) { + s.respondError(w, "upstream error", http.StatusBadGateway) + + return + } + + s.respondError(w, "internal error", http.StatusInternalServerError) + + return + } + defer func() { _ = resp.Content.Close() }() + + // Set response headers + w.Header().Set("Content-Type", resp.ContentType) + if resp.ContentLength > 0 { + w.Header().Set("Content-Length", strconv.FormatInt(resp.ContentLength, 10)) + } + + // Cache control headers + w.Header().Set("Cache-Control", "public, max-age=31536000, immutable") + w.Header().Set("X-Pixa-Cache", string(resp.CacheStatus)) + + if resp.ETag != "" { + w.Header().Set("ETag", resp.ETag) + } + + // Stream the response + w.WriteHeader(http.StatusOK) + + _, err = io.Copy(w, resp.Content) + if err != nil { + s.log.Error("failed to write response", + "error", err, + ) + } } } // HandleRobotsTxt serves robots.txt to prevent search engine crawling. func (s *Handlers) HandleRobotsTxt() http.HandlerFunc { - return func(_ http.ResponseWriter, _ *http.Request) { - // FIXME: Implement robots.txt handler - panic("unimplemented") + robotsTxt := []byte("User-agent: *\nDisallow: /\n") + + return func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/plain") + w.Header().Set("Content-Length", strconv.Itoa(len(robotsTxt))) + w.WriteHeader(http.StatusOK) + _, _ = w.Write(robotsTxt) } } diff --git a/internal/imgcache/module.go b/internal/imgcache/module.go new file mode 100644 index 0000000..9e045c7 --- /dev/null +++ b/internal/imgcache/module.go @@ -0,0 +1,10 @@ +package imgcache + +import "time" + +// CacheConfig defaults. +const ( + DefaultCacheTTL = 24 * time.Hour + DefaultNegativeTTL = 5 * time.Minute + DefaultHotCacheSize = 1000 +) diff --git a/internal/imgcache/service.go b/internal/imgcache/service.go new file mode 100644 index 0000000..6bc8641 --- /dev/null +++ b/internal/imgcache/service.go @@ -0,0 +1,252 @@ +package imgcache + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "log/slog" + "net/url" + "time" +) + +// Service implements the ImageCache interface, orchestrating cache, fetcher, and processor. +type Service struct { + cache *Cache + fetcher *HTTPFetcher + processor Processor + signer *Signer + whitelist *HostWhitelist + log *slog.Logger +} + +// ServiceConfig holds configuration for the image service. +type ServiceConfig struct { + // Cache is the cache instance + Cache *Cache + // FetcherConfig configures the upstream fetcher + FetcherConfig *FetcherConfig + // SigningKey is the HMAC signing key (empty disables signing) + SigningKey string + // Whitelist is the list of hosts that don't require signatures + Whitelist []string + // Logger for logging + Logger *slog.Logger +} + +// NewService creates a new image service. +func NewService(cfg *ServiceConfig) (*Service, error) { + if cfg.Cache == nil { + return nil, errors.New("cache is required") + } + + fetcherCfg := cfg.FetcherConfig + if fetcherCfg == nil { + fetcherCfg = DefaultFetcherConfig() + } + + var signer *Signer + if cfg.SigningKey != "" { + signer = NewSigner(cfg.SigningKey) + } + + log := cfg.Logger + if log == nil { + log = slog.Default() + } + + return &Service{ + cache: cfg.Cache, + fetcher: NewHTTPFetcher(fetcherCfg), + processor: NewImageProcessor(), + signer: signer, + whitelist: NewHostWhitelist(cfg.Whitelist), + log: log, + }, nil +} + +// Get retrieves a processed image, fetching and processing if necessary. +func (s *Service) Get(ctx context.Context, req *ImageRequest) (*ImageResponse, error) { + // Check cache first + result, err := s.cache.Lookup(ctx, req) + if err != nil { + if errors.Is(err, ErrNegativeCache) { + return nil, fmt.Errorf("upstream returned error (cached)") + } + + s.log.Warn("cache lookup failed", "error", err) + } + + // Cache hit - serve from cache + if result != nil && result.Hit { + s.cache.IncrementStats(ctx, true, 0) + + reader, err := s.cache.GetOutput(result.OutputHash) + if err != nil { + s.log.Error("failed to get cached output", "hash", result.OutputHash, "error", err) + // Fall through to re-fetch + } else { + return &ImageResponse{ + Content: reader, + ContentLength: -1, // Unknown until read + ContentType: result.ContentType, + CacheStatus: CacheHit, + }, nil + } + } + + // Cache miss - need to fetch, process, and cache + s.cache.IncrementStats(ctx, false, 0) + + response, err := s.fetchAndProcess(ctx, req) + if err != nil { + return nil, err + } + + response.CacheStatus = CacheMiss + + return response, nil +} + +// fetchAndProcess fetches from upstream, processes, and caches the result. +func (s *Service) fetchAndProcess(ctx context.Context, req *ImageRequest) (*ImageResponse, error) { + // Fetch from upstream + sourceURL := req.SourceURL() + + s.log.Debug("fetching from upstream", "url", sourceURL) + + fetchResult, err := s.fetcher.Fetch(ctx, sourceURL) + if err != nil { + // Store negative cache for certain errors + if isNegativeCacheable(err) { + statusCode := extractStatusCode(err) + _ = s.cache.StoreNegative(ctx, req, statusCode, err.Error()) + } + + return nil, fmt.Errorf("upstream fetch failed: %w", err) + } + defer func() { _ = fetchResult.Content.Close() }() + + // Read and validate the source content + sourceData, err := io.ReadAll(fetchResult.Content) + if err != nil { + return nil, fmt.Errorf("failed to read upstream response: %w", err) + } + + // Validate magic bytes match content type + if err := ValidateMagicBytes(sourceData, fetchResult.ContentType); err != nil { + return nil, fmt.Errorf("content validation failed: %w", err) + } + + // Store source content + _, err = s.cache.StoreSource(ctx, req, bytes.NewReader(sourceData), fetchResult) + if err != nil { + s.log.Warn("failed to store source content", "error", err) + // Continue even if caching fails + } + + // Process the image + processResult, err := s.processor.Process(ctx, bytes.NewReader(sourceData), req) + if err != nil { + return nil, fmt.Errorf("image processing failed: %w", err) + } + + // Read processed data to cache it + processedData, err := io.ReadAll(processResult.Content) + if err != nil { + return nil, fmt.Errorf("failed to read processed image: %w", err) + } + _ = processResult.Content.Close() + + // Store output content + metaID, err := s.cache.GetSourceMetadataID(ctx, req) + if err == nil { + err = s.cache.StoreOutput(ctx, req, metaID, bytes.NewReader(processedData), processResult.ContentType) + if err != nil { + s.log.Warn("failed to store output content", "error", err) + } + } + + return &ImageResponse{ + Content: io.NopCloser(bytes.NewReader(processedData)), + ContentLength: int64(len(processedData)), + ContentType: processResult.ContentType, + }, nil +} + +// Warm pre-fetches and caches an image without returning it. +func (s *Service) Warm(ctx context.Context, req *ImageRequest) error { + _, err := s.Get(ctx, req) + + return err +} + +// Purge removes a cached image. +func (s *Service) Purge(_ context.Context, _ *ImageRequest) error { + // TODO: Implement purge + return errors.New("purge not implemented") +} + +// Stats returns cache statistics. +func (s *Service) Stats(ctx context.Context) (*CacheStats, error) { + return s.cache.Stats(ctx) +} + +// ValidateRequest validates the request signature if required. +func (s *Service) ValidateRequest(req *ImageRequest) error { + // Check if host is whitelisted (no signature required) + sourceURL := req.SourceURL() + + parsedURL, err := url.Parse(sourceURL) + if err != nil { + return fmt.Errorf("invalid source URL: %w", err) + } + + if s.whitelist.IsWhitelisted(parsedURL) { + return nil + } + + // Signature required + if s.signer == nil { + return errors.New("signing key not configured but host not whitelisted") + } + + return s.signer.Verify(req) +} + +// GenerateSignedURL generates a signed URL for the given request. +func (s *Service) GenerateSignedURL( + baseURL string, + req *ImageRequest, + ttl time.Duration, +) (string, error) { + if s.signer == nil { + return "", errors.New("signing key not configured") + } + + path, sig, exp := s.signer.GenerateSignedURL(req, ttl) + + return fmt.Sprintf("%s%s?sig=%s&exp=%d", baseURL, path, sig, exp), nil +} + +// HTTP status codes for error responses. +const ( + httpStatusBadGateway = 502 + httpStatusInternalError = 500 +) + +// isNegativeCacheable returns true if the error should be cached. +func isNegativeCacheable(err error) bool { + return errors.Is(err, ErrUpstreamError) +} + +// extractStatusCode extracts HTTP status code from error message. +func extractStatusCode(err error) int { + // Default to 502 Bad Gateway for upstream errors + if errors.Is(err, ErrUpstreamError) { + return httpStatusBadGateway + } + + return httpStatusInternalError +}