ImageProcessor.Process used io.ReadAll without a size limit, allowing arbitrarily large inputs to exhaust memory. Add a configurable maxInputBytes limit (default 50 MiB, matching the fetcher limit) and reject inputs that exceed it with ErrInputDataTooLarge. Also bound the cached source content read in the service layer to prevent unexpectedly large cached files from consuming unbounded memory. Extracted loadCachedSource helper to reduce nesting complexity.
437 lines
12 KiB
Go
437 lines
12 KiB
Go
package imgcache
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/url"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
)
|
|
|
|
// Service implements the ImageCache interface, orchestrating cache, fetcher, and processor.
|
|
type Service struct {
|
|
cache *Cache
|
|
fetcher Fetcher
|
|
processor Processor
|
|
signer *Signer
|
|
whitelist *HostWhitelist
|
|
log *slog.Logger
|
|
allowHTTP bool
|
|
maxResponseSize int64
|
|
}
|
|
|
|
// ServiceConfig holds configuration for the image service.
|
|
type ServiceConfig struct {
|
|
// Cache is the cache instance
|
|
Cache *Cache
|
|
// FetcherConfig configures the upstream fetcher (ignored if Fetcher is set)
|
|
FetcherConfig *FetcherConfig
|
|
// Fetcher is an optional custom fetcher (for testing)
|
|
Fetcher Fetcher
|
|
// SigningKey is the HMAC signing key (empty disables signing)
|
|
SigningKey string
|
|
// Whitelist is the list of hosts that don't require signatures
|
|
Whitelist []string
|
|
// Logger for logging
|
|
Logger *slog.Logger
|
|
}
|
|
|
|
// NewService creates a new image service.
|
|
func NewService(cfg *ServiceConfig) (*Service, error) {
|
|
if cfg.Cache == nil {
|
|
return nil, errors.New("cache is required")
|
|
}
|
|
|
|
if cfg.SigningKey == "" {
|
|
return nil, errors.New("signing key is required")
|
|
}
|
|
|
|
// Resolve fetcher config for defaults
|
|
fetcherCfg := cfg.FetcherConfig
|
|
if fetcherCfg == nil {
|
|
fetcherCfg = DefaultFetcherConfig()
|
|
}
|
|
|
|
// Use custom fetcher if provided, otherwise create HTTP fetcher
|
|
var fetcher Fetcher
|
|
if cfg.Fetcher != nil {
|
|
fetcher = cfg.Fetcher
|
|
} else {
|
|
fetcher = NewHTTPFetcher(fetcherCfg)
|
|
}
|
|
|
|
signer := NewSigner(cfg.SigningKey)
|
|
|
|
log := cfg.Logger
|
|
if log == nil {
|
|
log = slog.Default()
|
|
}
|
|
|
|
allowHTTP := false
|
|
if cfg.FetcherConfig != nil {
|
|
allowHTTP = cfg.FetcherConfig.AllowHTTP
|
|
}
|
|
|
|
maxResponseSize := fetcherCfg.MaxResponseSize
|
|
|
|
return &Service{
|
|
cache: cfg.Cache,
|
|
fetcher: fetcher,
|
|
processor: NewImageProcessor(maxResponseSize),
|
|
signer: signer,
|
|
whitelist: NewHostWhitelist(cfg.Whitelist),
|
|
log: log,
|
|
allowHTTP: allowHTTP,
|
|
maxResponseSize: maxResponseSize,
|
|
}, nil
|
|
}
|
|
|
|
// ErrNegativeCached is returned when a URL is in the negative cache (recently failed).
|
|
var ErrNegativeCached = errors.New("request is in negative cache (recently failed)")
|
|
|
|
// Get retrieves a processed image, fetching and processing if necessary.
|
|
func (s *Service) Get(ctx context.Context, req *ImageRequest) (*ImageResponse, error) {
|
|
// Propagate AllowHTTP setting to the request
|
|
req.AllowHTTP = s.allowHTTP
|
|
|
|
// Check negative cache first - skip fetching for recently-failed URLs
|
|
negHit, err := s.cache.checkNegativeCache(ctx, req)
|
|
if err != nil {
|
|
s.log.Warn("negative cache check failed", "error", err)
|
|
}
|
|
if negHit {
|
|
s.log.Debug("negative cache hit",
|
|
"host", req.SourceHost,
|
|
"path", req.SourcePath,
|
|
)
|
|
|
|
return nil, fmt.Errorf("%w: %w", ErrUpstreamError, ErrNegativeCached)
|
|
}
|
|
|
|
// Check variant cache first (disk only, no DB)
|
|
result, err := s.cache.Lookup(ctx, req)
|
|
if err != nil {
|
|
s.log.Warn("cache lookup failed", "error", err)
|
|
}
|
|
|
|
// Cache hit - serve directly from disk
|
|
if result != nil && result.Hit {
|
|
reader, size, contentType, err := s.cache.GetVariant(result.CacheKey)
|
|
if err != nil {
|
|
s.log.Error("failed to get cached variant", "key", result.CacheKey, "error", err)
|
|
// Fall through to re-process
|
|
} else {
|
|
s.cache.IncrementStats(ctx, true, 0)
|
|
|
|
return &ImageResponse{
|
|
Content: reader,
|
|
ContentLength: size,
|
|
ContentType: contentType,
|
|
CacheStatus: CacheHit,
|
|
ETag: formatETag(result.CacheKey),
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
// Cache miss - check if we have source content cached
|
|
cacheKey := CacheKey(req)
|
|
s.cache.IncrementStats(ctx, false, 0)
|
|
|
|
response, err := s.processFromSourceOrFetch(ctx, req, cacheKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response.CacheStatus = CacheMiss
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// loadCachedSource attempts to load source content from cache, returning nil
|
|
// if the cached data is unavailable or exceeds maxResponseSize.
|
|
func (s *Service) loadCachedSource(contentHash ContentHash) []byte {
|
|
reader, err := s.cache.GetSourceContent(contentHash)
|
|
if err != nil {
|
|
s.log.Warn("failed to load cached source, fetching", "error", err)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Bound the read to maxResponseSize to prevent unbounded memory use
|
|
// from unexpectedly large cached files.
|
|
limited := io.LimitReader(reader, s.maxResponseSize+1)
|
|
data, err := io.ReadAll(limited)
|
|
_ = reader.Close()
|
|
|
|
if err != nil {
|
|
s.log.Warn("failed to read cached source, fetching", "error", err)
|
|
|
|
return nil
|
|
}
|
|
|
|
if int64(len(data)) > s.maxResponseSize {
|
|
s.log.Warn("cached source exceeds max response size, discarding",
|
|
"hash", contentHash,
|
|
"max_bytes", s.maxResponseSize,
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
return data
|
|
}
|
|
|
|
// processFromSourceOrFetch processes an image, using cached source content if available.
|
|
func (s *Service) processFromSourceOrFetch(
|
|
ctx context.Context,
|
|
req *ImageRequest,
|
|
cacheKey VariantKey,
|
|
) (*ImageResponse, error) {
|
|
// Check if we have cached source content
|
|
contentHash, _, err := s.cache.LookupSource(ctx, req)
|
|
if err != nil {
|
|
s.log.Warn("source lookup failed", "error", err)
|
|
}
|
|
|
|
var sourceData []byte
|
|
var fetchBytes int64
|
|
|
|
if contentHash != "" {
|
|
s.log.Debug("using cached source", "hash", contentHash)
|
|
sourceData = s.loadCachedSource(contentHash)
|
|
}
|
|
|
|
// Fetch from upstream if we don't have source data or it's empty
|
|
if len(sourceData) == 0 {
|
|
resp, err := s.fetchAndProcess(ctx, req, cacheKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// Process using cached source
|
|
fetchBytes = int64(len(sourceData))
|
|
|
|
return s.processAndStore(ctx, req, cacheKey, sourceData, fetchBytes)
|
|
}
|
|
|
|
// fetchAndProcess fetches from upstream, processes, and caches the result.
|
|
func (s *Service) fetchAndProcess(
|
|
ctx context.Context,
|
|
req *ImageRequest,
|
|
cacheKey VariantKey,
|
|
) (*ImageResponse, error) {
|
|
// Fetch from upstream
|
|
sourceURL := req.SourceURL()
|
|
|
|
s.log.Debug("fetching from upstream", "url", sourceURL)
|
|
|
|
fetchResult, err := s.fetcher.Fetch(ctx, sourceURL)
|
|
if err != nil {
|
|
// Store negative cache for certain errors
|
|
if isNegativeCacheable(err) {
|
|
statusCode := extractStatusCode(err)
|
|
_ = s.cache.StoreNegative(ctx, req, statusCode, err.Error())
|
|
}
|
|
|
|
return nil, fmt.Errorf("upstream fetch failed: %w", err)
|
|
}
|
|
defer func() { _ = fetchResult.Content.Close() }()
|
|
|
|
// Read and validate the source content
|
|
sourceData, err := io.ReadAll(fetchResult.Content)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read upstream response: %w", err)
|
|
}
|
|
|
|
// Calculate download bitrate
|
|
fetchBytes := int64(len(sourceData))
|
|
var downloadRate string
|
|
|
|
if fetchResult.FetchDurationMs > 0 {
|
|
seconds := float64(fetchResult.FetchDurationMs) / 1000.0 //nolint:mnd // ms to seconds
|
|
bitsPerSecond := float64(fetchBytes*8) / seconds //nolint:mnd // bytes to bits
|
|
downloadRate = humanize.SI(bitsPerSecond, "bps")
|
|
}
|
|
|
|
// Log upstream fetch details
|
|
s.log.Info("upstream fetched",
|
|
"host", req.SourceHost,
|
|
"path", req.SourcePath,
|
|
"bytes", fetchBytes,
|
|
"fetch_ms", fetchResult.FetchDurationMs,
|
|
"rate", downloadRate,
|
|
"remote_addr", fetchResult.RemoteAddr,
|
|
"http", fetchResult.HTTPVersion,
|
|
"tls", fetchResult.TLSVersion,
|
|
"cipher", fetchResult.TLSCipherSuite,
|
|
)
|
|
|
|
// Validate magic bytes match content type
|
|
if err := ValidateMagicBytes(sourceData, fetchResult.ContentType); err != nil {
|
|
return nil, fmt.Errorf("content validation failed: %w", err)
|
|
}
|
|
|
|
// Store source content
|
|
_, err = s.cache.StoreSource(ctx, req, bytes.NewReader(sourceData), fetchResult)
|
|
if err != nil {
|
|
s.log.Warn("failed to store source content", "error", err)
|
|
// Continue even if caching fails
|
|
}
|
|
|
|
return s.processAndStore(ctx, req, cacheKey, sourceData, fetchBytes)
|
|
}
|
|
|
|
// processAndStore processes an image and stores the result.
|
|
func (s *Service) processAndStore(
|
|
ctx context.Context,
|
|
req *ImageRequest,
|
|
cacheKey VariantKey,
|
|
sourceData []byte,
|
|
fetchBytes int64,
|
|
) (*ImageResponse, error) {
|
|
// Process the image
|
|
processStart := time.Now()
|
|
|
|
processResult, err := s.processor.Process(ctx, bytes.NewReader(sourceData), req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("image processing failed: %w", err)
|
|
}
|
|
|
|
processDuration := time.Since(processStart)
|
|
|
|
// Read processed content
|
|
processedData, err := io.ReadAll(processResult.Content)
|
|
_ = processResult.Content.Close()
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read processed content: %w", err)
|
|
}
|
|
|
|
// Log conversion details
|
|
outputSize := int64(len(processedData))
|
|
|
|
var sizePercent float64
|
|
if fetchBytes > 0 {
|
|
sizePercent = float64(outputSize) / float64(fetchBytes) * 100.0 //nolint:mnd // percentage calculation
|
|
}
|
|
|
|
s.log.Info("image converted",
|
|
"host", req.SourceHost,
|
|
"path", req.SourcePath,
|
|
"src_format", processResult.InputFormat,
|
|
"dst_format", req.Format,
|
|
"src_bytes", fetchBytes,
|
|
"dst_bytes", outputSize,
|
|
"src_dimensions", fmt.Sprintf("%dx%d", processResult.InputWidth, processResult.InputHeight),
|
|
"dst_dimensions", fmt.Sprintf("%dx%d", processResult.Width, processResult.Height),
|
|
"size_ratio", fmt.Sprintf("%.1f%%", sizePercent),
|
|
"convert_ms", processDuration.Milliseconds(),
|
|
"quality", req.Quality,
|
|
"fit", req.FitMode,
|
|
)
|
|
|
|
// Store variant to cache
|
|
if err := s.cache.StoreVariant(cacheKey, bytes.NewReader(processedData), processResult.ContentType); err != nil {
|
|
s.log.Warn("failed to store variant", "error", err)
|
|
// Continue even if caching fails
|
|
}
|
|
|
|
return &ImageResponse{
|
|
Content: io.NopCloser(bytes.NewReader(processedData)),
|
|
ContentLength: outputSize,
|
|
ContentType: processResult.ContentType,
|
|
FetchedBytes: fetchBytes,
|
|
ETag: formatETag(cacheKey),
|
|
}, nil
|
|
}
|
|
|
|
// Warm pre-fetches and caches an image without returning it.
|
|
func (s *Service) Warm(ctx context.Context, req *ImageRequest) error {
|
|
_, err := s.Get(ctx, req)
|
|
|
|
return err
|
|
}
|
|
|
|
// Purge removes a cached image.
|
|
func (s *Service) Purge(_ context.Context, _ *ImageRequest) error {
|
|
// TODO: Implement purge
|
|
return errors.New("purge not implemented")
|
|
}
|
|
|
|
// Stats returns cache statistics.
|
|
func (s *Service) Stats(ctx context.Context) (*CacheStats, error) {
|
|
return s.cache.Stats(ctx)
|
|
}
|
|
|
|
// ValidateRequest validates the request signature if required.
|
|
func (s *Service) ValidateRequest(req *ImageRequest) error {
|
|
// Check if host is whitelisted (no signature required)
|
|
sourceURL := req.SourceURL()
|
|
|
|
parsedURL, err := url.Parse(sourceURL)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid source URL: %w", err)
|
|
}
|
|
|
|
if s.whitelist.IsWhitelisted(parsedURL) {
|
|
return nil
|
|
}
|
|
|
|
// Signature required for non-whitelisted hosts
|
|
return s.signer.Verify(req)
|
|
}
|
|
|
|
// GenerateSignedURL generates a signed URL for the given request.
|
|
func (s *Service) GenerateSignedURL(
|
|
baseURL string,
|
|
req *ImageRequest,
|
|
ttl time.Duration,
|
|
) (string, error) {
|
|
path, sig, exp := s.signer.GenerateSignedURL(req, ttl)
|
|
|
|
return fmt.Sprintf("%s%s?sig=%s&exp=%d", baseURL, path, sig, exp), nil
|
|
}
|
|
|
|
// HTTP status codes for error responses.
|
|
const (
|
|
httpStatusBadGateway = 502
|
|
httpStatusInternalError = 500
|
|
)
|
|
|
|
// isNegativeCacheable returns true if the error should be cached.
|
|
func isNegativeCacheable(err error) bool {
|
|
return errors.Is(err, ErrUpstreamError)
|
|
}
|
|
|
|
// extractStatusCode extracts HTTP status code from error message.
|
|
func extractStatusCode(err error) int {
|
|
// Default to 502 Bad Gateway for upstream errors
|
|
if errors.Is(err, ErrUpstreamError) {
|
|
return httpStatusBadGateway
|
|
}
|
|
|
|
return httpStatusInternalError
|
|
}
|
|
|
|
// etagHashLength is the number of hash characters to use for ETags.
|
|
const etagHashLength = 16
|
|
|
|
// formatETag formats a VariantKey as a quoted ETag value.
|
|
func formatETag(key VariantKey) string {
|
|
hash := string(key)
|
|
// Use first 16 characters of hash for a shorter but still unique ETag
|
|
if len(hash) > etagHashLength {
|
|
hash = hash[:etagHashLength]
|
|
}
|
|
|
|
return `"` + hash + `"`
|
|
}
|