Files
pixa/internal/imgcache/service.go
clawbot d7e1cfaa24
All checks were successful
check / check (push) Successful in 58s
refactor: extract imageprocessor into its own package
Move ImageProcessor, Params, New(), DefaultMaxInputBytes, ErrInputDataTooLarge,
and related types from internal/imgcache/ into a new standalone package
internal/imageprocessor/.

The imageprocessor package defines its own Format, FitMode, Size, Request, and
Result types, making it fully independent with no imports from imgcache. The
imgcache service converts between its own types and imageprocessor types at the
boundary.

Changes:
- New package: internal/imageprocessor/ with imageprocessor.go and tests
- Removed: processor.go and processor_test.go from internal/imgcache/
- Removed: Processor interface and ProcessResult from imgcache.go (now unused)
- Updated: service.go uses *imageprocessor.ImageProcessor directly
- Copied: testdata/red.avif for AVIF decode test

Addresses review feedback on PR #37: image processing is a distinct concern
from the HTTP service layer and belongs in its own package.
2026-03-17 20:32:20 -07:00

445 lines
12 KiB
Go

package imgcache
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/url"
"time"
"github.com/dustin/go-humanize"
"sneak.berlin/go/pixa/internal/imageprocessor"
)
// Service implements the ImageCache interface, orchestrating cache, fetcher, and processor.
type Service struct {
cache *Cache
fetcher Fetcher
processor *imageprocessor.ImageProcessor
signer *Signer
whitelist *HostWhitelist
log *slog.Logger
allowHTTP bool
maxResponseSize int64
}
// ServiceConfig holds configuration for the image service.
type ServiceConfig struct {
// Cache is the cache instance
Cache *Cache
// FetcherConfig configures the upstream fetcher (ignored if Fetcher is set)
FetcherConfig *FetcherConfig
// Fetcher is an optional custom fetcher (for testing)
Fetcher Fetcher
// SigningKey is the HMAC signing key (empty disables signing)
SigningKey string
// Whitelist is the list of hosts that don't require signatures
Whitelist []string
// Logger for logging
Logger *slog.Logger
}
// NewService creates a new image service.
func NewService(cfg *ServiceConfig) (*Service, error) {
if cfg.Cache == nil {
return nil, errors.New("cache is required")
}
if cfg.SigningKey == "" {
return nil, errors.New("signing key is required")
}
// Resolve fetcher config for defaults
fetcherCfg := cfg.FetcherConfig
if fetcherCfg == nil {
fetcherCfg = DefaultFetcherConfig()
}
// Use custom fetcher if provided, otherwise create HTTP fetcher
var fetcher Fetcher
if cfg.Fetcher != nil {
fetcher = cfg.Fetcher
} else {
fetcher = NewHTTPFetcher(fetcherCfg)
}
signer := NewSigner(cfg.SigningKey)
log := cfg.Logger
if log == nil {
log = slog.Default()
}
allowHTTP := false
if cfg.FetcherConfig != nil {
allowHTTP = cfg.FetcherConfig.AllowHTTP
}
maxResponseSize := fetcherCfg.MaxResponseSize
return &Service{
cache: cfg.Cache,
fetcher: fetcher,
processor: imageprocessor.New(imageprocessor.Params{MaxInputBytes: maxResponseSize}),
signer: signer,
whitelist: NewHostWhitelist(cfg.Whitelist),
log: log,
allowHTTP: allowHTTP,
maxResponseSize: maxResponseSize,
}, nil
}
// ErrNegativeCached is returned when a URL is in the negative cache (recently failed).
var ErrNegativeCached = errors.New("request is in negative cache (recently failed)")
// Get retrieves a processed image, fetching and processing if necessary.
func (s *Service) Get(ctx context.Context, req *ImageRequest) (*ImageResponse, error) {
// Propagate AllowHTTP setting to the request
req.AllowHTTP = s.allowHTTP
// Check negative cache first - skip fetching for recently-failed URLs
negHit, err := s.cache.checkNegativeCache(ctx, req)
if err != nil {
s.log.Warn("negative cache check failed", "error", err)
}
if negHit {
s.log.Debug("negative cache hit",
"host", req.SourceHost,
"path", req.SourcePath,
)
return nil, fmt.Errorf("%w: %w", ErrUpstreamError, ErrNegativeCached)
}
// Check variant cache first (disk only, no DB)
result, err := s.cache.Lookup(ctx, req)
if err != nil {
s.log.Warn("cache lookup failed", "error", err)
}
// Cache hit - serve directly from disk
if result != nil && result.Hit {
reader, size, contentType, err := s.cache.GetVariant(result.CacheKey)
if err != nil {
s.log.Error("failed to get cached variant", "key", result.CacheKey, "error", err)
// Fall through to re-process
} else {
s.cache.IncrementStats(ctx, true, 0)
return &ImageResponse{
Content: reader,
ContentLength: size,
ContentType: contentType,
CacheStatus: CacheHit,
ETag: formatETag(result.CacheKey),
}, nil
}
}
// Cache miss - check if we have source content cached
cacheKey := CacheKey(req)
s.cache.IncrementStats(ctx, false, 0)
response, err := s.processFromSourceOrFetch(ctx, req, cacheKey)
if err != nil {
return nil, err
}
response.CacheStatus = CacheMiss
return response, nil
}
// loadCachedSource attempts to load source content from cache, returning nil
// if the cached data is unavailable or exceeds maxResponseSize.
func (s *Service) loadCachedSource(contentHash ContentHash) []byte {
reader, err := s.cache.GetSourceContent(contentHash)
if err != nil {
s.log.Warn("failed to load cached source, fetching", "error", err)
return nil
}
// Bound the read to maxResponseSize to prevent unbounded memory use
// from unexpectedly large cached files.
limited := io.LimitReader(reader, s.maxResponseSize+1)
data, err := io.ReadAll(limited)
_ = reader.Close()
if err != nil {
s.log.Warn("failed to read cached source, fetching", "error", err)
return nil
}
if int64(len(data)) > s.maxResponseSize {
s.log.Warn("cached source exceeds max response size, discarding",
"hash", contentHash,
"max_bytes", s.maxResponseSize,
)
return nil
}
return data
}
// processFromSourceOrFetch processes an image, using cached source content if available.
func (s *Service) processFromSourceOrFetch(
ctx context.Context,
req *ImageRequest,
cacheKey VariantKey,
) (*ImageResponse, error) {
// Check if we have cached source content
contentHash, _, err := s.cache.LookupSource(ctx, req)
if err != nil {
s.log.Warn("source lookup failed", "error", err)
}
var sourceData []byte
var fetchBytes int64
if contentHash != "" {
s.log.Debug("using cached source", "hash", contentHash)
sourceData = s.loadCachedSource(contentHash)
}
// Fetch from upstream if we don't have source data or it's empty
if len(sourceData) == 0 {
resp, err := s.fetchAndProcess(ctx, req, cacheKey)
if err != nil {
return nil, err
}
return resp, nil
}
// Process using cached source
fetchBytes = int64(len(sourceData))
return s.processAndStore(ctx, req, cacheKey, sourceData, fetchBytes)
}
// fetchAndProcess fetches from upstream, processes, and caches the result.
func (s *Service) fetchAndProcess(
ctx context.Context,
req *ImageRequest,
cacheKey VariantKey,
) (*ImageResponse, error) {
// Fetch from upstream
sourceURL := req.SourceURL()
s.log.Debug("fetching from upstream", "url", sourceURL)
fetchResult, err := s.fetcher.Fetch(ctx, sourceURL)
if err != nil {
// Store negative cache for certain errors
if isNegativeCacheable(err) {
statusCode := extractStatusCode(err)
_ = s.cache.StoreNegative(ctx, req, statusCode, err.Error())
}
return nil, fmt.Errorf("upstream fetch failed: %w", err)
}
defer func() { _ = fetchResult.Content.Close() }()
// Read and validate the source content
sourceData, err := io.ReadAll(fetchResult.Content)
if err != nil {
return nil, fmt.Errorf("failed to read upstream response: %w", err)
}
// Calculate download bitrate
fetchBytes := int64(len(sourceData))
var downloadRate string
if fetchResult.FetchDurationMs > 0 {
seconds := float64(fetchResult.FetchDurationMs) / 1000.0 //nolint:mnd // ms to seconds
bitsPerSecond := float64(fetchBytes*8) / seconds //nolint:mnd // bytes to bits
downloadRate = humanize.SI(bitsPerSecond, "bps")
}
// Log upstream fetch details
s.log.Info("upstream fetched",
"host", req.SourceHost,
"path", req.SourcePath,
"bytes", fetchBytes,
"fetch_ms", fetchResult.FetchDurationMs,
"rate", downloadRate,
"remote_addr", fetchResult.RemoteAddr,
"http", fetchResult.HTTPVersion,
"tls", fetchResult.TLSVersion,
"cipher", fetchResult.TLSCipherSuite,
)
// Validate magic bytes match content type
if err := ValidateMagicBytes(sourceData, fetchResult.ContentType); err != nil {
return nil, fmt.Errorf("content validation failed: %w", err)
}
// Store source content
_, err = s.cache.StoreSource(ctx, req, bytes.NewReader(sourceData), fetchResult)
if err != nil {
s.log.Warn("failed to store source content", "error", err)
// Continue even if caching fails
}
return s.processAndStore(ctx, req, cacheKey, sourceData, fetchBytes)
}
// processAndStore processes an image and stores the result.
func (s *Service) processAndStore(
ctx context.Context,
req *ImageRequest,
cacheKey VariantKey,
sourceData []byte,
fetchBytes int64,
) (*ImageResponse, error) {
// Process the image
processStart := time.Now()
processReq := &imageprocessor.Request{
Size: imageprocessor.Size{Width: req.Size.Width, Height: req.Size.Height},
Format: imageprocessor.Format(req.Format),
Quality: req.Quality,
FitMode: imageprocessor.FitMode(req.FitMode),
}
processResult, err := s.processor.Process(ctx, bytes.NewReader(sourceData), processReq)
if err != nil {
return nil, fmt.Errorf("image processing failed: %w", err)
}
processDuration := time.Since(processStart)
// Read processed content
processedData, err := io.ReadAll(processResult.Content)
_ = processResult.Content.Close()
if err != nil {
return nil, fmt.Errorf("failed to read processed content: %w", err)
}
// Log conversion details
outputSize := int64(len(processedData))
var sizePercent float64
if fetchBytes > 0 {
sizePercent = float64(outputSize) / float64(fetchBytes) * 100.0 //nolint:mnd // percentage calculation
}
s.log.Info("image converted",
"host", req.SourceHost,
"path", req.SourcePath,
"src_format", processResult.InputFormat,
"dst_format", req.Format,
"src_bytes", fetchBytes,
"dst_bytes", outputSize,
"src_dimensions", fmt.Sprintf("%dx%d", processResult.InputWidth, processResult.InputHeight),
"dst_dimensions", fmt.Sprintf("%dx%d", processResult.Width, processResult.Height),
"size_ratio", fmt.Sprintf("%.1f%%", sizePercent),
"convert_ms", processDuration.Milliseconds(),
"quality", req.Quality,
"fit", req.FitMode,
)
// Store variant to cache
if err := s.cache.StoreVariant(cacheKey, bytes.NewReader(processedData), processResult.ContentType); err != nil {
s.log.Warn("failed to store variant", "error", err)
// Continue even if caching fails
}
return &ImageResponse{
Content: io.NopCloser(bytes.NewReader(processedData)),
ContentLength: outputSize,
ContentType: processResult.ContentType,
FetchedBytes: fetchBytes,
ETag: formatETag(cacheKey),
}, nil
}
// Warm pre-fetches and caches an image without returning it.
func (s *Service) Warm(ctx context.Context, req *ImageRequest) error {
_, err := s.Get(ctx, req)
return err
}
// Purge removes a cached image.
func (s *Service) Purge(_ context.Context, _ *ImageRequest) error {
// TODO: Implement purge
return errors.New("purge not implemented")
}
// Stats returns cache statistics.
func (s *Service) Stats(ctx context.Context) (*CacheStats, error) {
return s.cache.Stats(ctx)
}
// ValidateRequest validates the request signature if required.
func (s *Service) ValidateRequest(req *ImageRequest) error {
// Check if host is whitelisted (no signature required)
sourceURL := req.SourceURL()
parsedURL, err := url.Parse(sourceURL)
if err != nil {
return fmt.Errorf("invalid source URL: %w", err)
}
if s.whitelist.IsWhitelisted(parsedURL) {
return nil
}
// Signature required for non-whitelisted hosts
return s.signer.Verify(req)
}
// GenerateSignedURL generates a signed URL for the given request.
func (s *Service) GenerateSignedURL(
baseURL string,
req *ImageRequest,
ttl time.Duration,
) (string, error) {
path, sig, exp := s.signer.GenerateSignedURL(req, ttl)
return fmt.Sprintf("%s%s?sig=%s&exp=%d", baseURL, path, sig, exp), nil
}
// HTTP status codes for error responses.
const (
httpStatusBadGateway = 502
httpStatusInternalError = 500
)
// isNegativeCacheable returns true if the error should be cached.
func isNegativeCacheable(err error) bool {
return errors.Is(err, ErrUpstreamError)
}
// extractStatusCode extracts HTTP status code from error message.
func extractStatusCode(err error) int {
// Default to 502 Bad Gateway for upstream errors
if errors.Is(err, ErrUpstreamError) {
return httpStatusBadGateway
}
return httpStatusInternalError
}
// etagHashLength is the number of hash characters to use for ETags.
const etagHashLength = 16
// formatETag formats a VariantKey as a quoted ETag value.
func formatETag(key VariantKey) string {
hash := string(key)
// Use first 16 characters of hash for a shorter but still unique ETag
if len(hash) > etagHashLength {
hash = hash[:etagHashLength]
}
return `"` + hash + `"`
}