400 lines
11 KiB
Go
400 lines
11 KiB
Go
package imgcache
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/url"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
)
|
|
|
|
// Service implements the ImageCache interface, orchestrating cache, fetcher, and processor.
|
|
type Service struct {
|
|
cache *Cache
|
|
fetcher Fetcher
|
|
processor Processor
|
|
signer *Signer
|
|
whitelist *HostWhitelist
|
|
log *slog.Logger
|
|
}
|
|
|
|
// ServiceConfig holds configuration for the image service.
|
|
type ServiceConfig struct {
|
|
// Cache is the cache instance
|
|
Cache *Cache
|
|
// FetcherConfig configures the upstream fetcher (ignored if Fetcher is set)
|
|
FetcherConfig *FetcherConfig
|
|
// Fetcher is an optional custom fetcher (for testing)
|
|
Fetcher Fetcher
|
|
// SigningKey is the HMAC signing key (empty disables signing)
|
|
SigningKey string
|
|
// Whitelist is the list of hosts that don't require signatures
|
|
Whitelist []string
|
|
// Logger for logging
|
|
Logger *slog.Logger
|
|
}
|
|
|
|
// NewService creates a new image service.
|
|
func NewService(cfg *ServiceConfig) (*Service, error) {
|
|
if cfg.Cache == nil {
|
|
return nil, errors.New("cache is required")
|
|
}
|
|
|
|
if cfg.SigningKey == "" {
|
|
return nil, errors.New("signing key is required")
|
|
}
|
|
|
|
// Use custom fetcher if provided, otherwise create HTTP fetcher
|
|
var fetcher Fetcher
|
|
if cfg.Fetcher != nil {
|
|
fetcher = cfg.Fetcher
|
|
} else {
|
|
fetcherCfg := cfg.FetcherConfig
|
|
if fetcherCfg == nil {
|
|
fetcherCfg = DefaultFetcherConfig()
|
|
}
|
|
fetcher = NewHTTPFetcher(fetcherCfg)
|
|
}
|
|
|
|
signer := NewSigner(cfg.SigningKey)
|
|
|
|
log := cfg.Logger
|
|
if log == nil {
|
|
log = slog.Default()
|
|
}
|
|
|
|
return &Service{
|
|
cache: cfg.Cache,
|
|
fetcher: fetcher,
|
|
processor: NewImageProcessor(),
|
|
signer: signer,
|
|
whitelist: NewHostWhitelist(cfg.Whitelist),
|
|
log: log,
|
|
}, nil
|
|
}
|
|
|
|
// ErrNegativeCached is returned when a URL is in the negative cache (recently failed).
|
|
var ErrNegativeCached = errors.New("request is in negative cache (recently failed)")
|
|
|
|
// Get retrieves a processed image, fetching and processing if necessary.
|
|
func (s *Service) Get(ctx context.Context, req *ImageRequest) (*ImageResponse, error) {
|
|
// Check negative cache first - skip fetching for recently-failed URLs
|
|
negHit, err := s.cache.checkNegativeCache(ctx, req)
|
|
if err != nil {
|
|
s.log.Warn("negative cache check failed", "error", err)
|
|
}
|
|
if negHit {
|
|
s.log.Debug("negative cache hit",
|
|
"host", req.SourceHost,
|
|
"path", req.SourcePath,
|
|
)
|
|
return nil, fmt.Errorf("%w: %w", ErrUpstreamError, ErrNegativeCached)
|
|
}
|
|
|
|
// Check variant cache first (disk only, no DB)
|
|
result, err := s.cache.Lookup(ctx, req)
|
|
if err != nil {
|
|
s.log.Warn("cache lookup failed", "error", err)
|
|
}
|
|
|
|
// Cache hit - serve directly from disk
|
|
if result != nil && result.Hit {
|
|
reader, size, contentType, err := s.cache.GetVariant(result.CacheKey)
|
|
if err != nil {
|
|
s.log.Error("failed to get cached variant", "key", result.CacheKey, "error", err)
|
|
// Fall through to re-process
|
|
} else {
|
|
s.cache.IncrementStats(ctx, true, 0)
|
|
|
|
return &ImageResponse{
|
|
Content: reader,
|
|
ContentLength: size,
|
|
ContentType: contentType,
|
|
CacheStatus: CacheHit,
|
|
ETag: formatETag(result.CacheKey),
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
// Cache miss - check if we have source content cached
|
|
cacheKey := CacheKey(req)
|
|
s.cache.IncrementStats(ctx, false, 0)
|
|
|
|
response, err := s.processFromSourceOrFetch(ctx, req, cacheKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response.CacheStatus = CacheMiss
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// processFromSourceOrFetch processes an image, using cached source content if available.
|
|
func (s *Service) processFromSourceOrFetch(
|
|
ctx context.Context,
|
|
req *ImageRequest,
|
|
cacheKey VariantKey,
|
|
) (*ImageResponse, error) {
|
|
// Check if we have cached source content
|
|
contentHash, _, err := s.cache.LookupSource(ctx, req)
|
|
if err != nil {
|
|
s.log.Warn("source lookup failed", "error", err)
|
|
}
|
|
|
|
var sourceData []byte
|
|
var fetchBytes int64
|
|
|
|
if contentHash != "" {
|
|
// We have cached source - load it
|
|
s.log.Debug("using cached source", "hash", contentHash)
|
|
|
|
reader, err := s.cache.GetSourceContent(contentHash)
|
|
if err != nil {
|
|
s.log.Warn("failed to load cached source, fetching", "error", err)
|
|
// Fall through to fetch
|
|
} else {
|
|
sourceData, err = io.ReadAll(reader)
|
|
_ = reader.Close()
|
|
|
|
if err != nil {
|
|
s.log.Warn("failed to read cached source, fetching", "error", err)
|
|
// Fall through to fetch
|
|
}
|
|
}
|
|
}
|
|
|
|
// Fetch from upstream if we don't have source data or it's empty
|
|
if len(sourceData) == 0 {
|
|
resp, err := s.fetchAndProcess(ctx, req, cacheKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// Process using cached source
|
|
fetchBytes = int64(len(sourceData))
|
|
|
|
return s.processAndStore(ctx, req, cacheKey, sourceData, fetchBytes)
|
|
}
|
|
|
|
// fetchAndProcess fetches from upstream, processes, and caches the result.
|
|
func (s *Service) fetchAndProcess(
|
|
ctx context.Context,
|
|
req *ImageRequest,
|
|
cacheKey VariantKey,
|
|
) (*ImageResponse, error) {
|
|
// Fetch from upstream
|
|
sourceURL := req.SourceURL()
|
|
|
|
s.log.Debug("fetching from upstream", "url", sourceURL)
|
|
|
|
fetchResult, err := s.fetcher.Fetch(ctx, sourceURL)
|
|
if err != nil {
|
|
// Store negative cache for certain errors
|
|
if isNegativeCacheable(err) {
|
|
statusCode := extractStatusCode(err)
|
|
_ = s.cache.StoreNegative(ctx, req, statusCode, err.Error())
|
|
}
|
|
|
|
return nil, fmt.Errorf("upstream fetch failed: %w", err)
|
|
}
|
|
defer func() { _ = fetchResult.Content.Close() }()
|
|
|
|
// Read and validate the source content
|
|
sourceData, err := io.ReadAll(fetchResult.Content)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read upstream response: %w", err)
|
|
}
|
|
|
|
// Calculate download bitrate
|
|
fetchBytes := int64(len(sourceData))
|
|
var downloadRate string
|
|
|
|
if fetchResult.FetchDurationMs > 0 {
|
|
seconds := float64(fetchResult.FetchDurationMs) / 1000.0 //nolint:mnd // ms to seconds
|
|
bitsPerSecond := float64(fetchBytes*8) / seconds //nolint:mnd // bytes to bits
|
|
downloadRate = humanize.SI(bitsPerSecond, "bps")
|
|
}
|
|
|
|
// Log upstream fetch details
|
|
s.log.Info("upstream fetched",
|
|
"host", req.SourceHost,
|
|
"path", req.SourcePath,
|
|
"bytes", fetchBytes,
|
|
"fetch_ms", fetchResult.FetchDurationMs,
|
|
"rate", downloadRate,
|
|
"remote_addr", fetchResult.RemoteAddr,
|
|
"http", fetchResult.HTTPVersion,
|
|
"tls", fetchResult.TLSVersion,
|
|
"cipher", fetchResult.TLSCipherSuite,
|
|
)
|
|
|
|
// Validate magic bytes match content type
|
|
if err := ValidateMagicBytes(sourceData, fetchResult.ContentType); err != nil {
|
|
return nil, fmt.Errorf("content validation failed: %w", err)
|
|
}
|
|
|
|
// Store source content
|
|
_, err = s.cache.StoreSource(ctx, req, bytes.NewReader(sourceData), fetchResult)
|
|
if err != nil {
|
|
s.log.Warn("failed to store source content", "error", err)
|
|
// Continue even if caching fails
|
|
}
|
|
|
|
return s.processAndStore(ctx, req, cacheKey, sourceData, fetchBytes)
|
|
}
|
|
|
|
// processAndStore processes an image and stores the result.
|
|
func (s *Service) processAndStore(
|
|
ctx context.Context,
|
|
req *ImageRequest,
|
|
cacheKey VariantKey,
|
|
sourceData []byte,
|
|
fetchBytes int64,
|
|
) (*ImageResponse, error) {
|
|
// Process the image
|
|
processStart := time.Now()
|
|
|
|
processResult, err := s.processor.Process(ctx, bytes.NewReader(sourceData), req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("image processing failed: %w", err)
|
|
}
|
|
|
|
processDuration := time.Since(processStart)
|
|
|
|
// Read processed content
|
|
processedData, err := io.ReadAll(processResult.Content)
|
|
_ = processResult.Content.Close()
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read processed content: %w", err)
|
|
}
|
|
|
|
// Log conversion details
|
|
outputSize := int64(len(processedData))
|
|
|
|
var sizePercent float64
|
|
if fetchBytes > 0 {
|
|
sizePercent = float64(outputSize) / float64(fetchBytes) * 100.0 //nolint:mnd // percentage calculation
|
|
}
|
|
|
|
s.log.Info("image converted",
|
|
"host", req.SourceHost,
|
|
"path", req.SourcePath,
|
|
"src_format", processResult.InputFormat,
|
|
"dst_format", req.Format,
|
|
"src_bytes", fetchBytes,
|
|
"dst_bytes", outputSize,
|
|
"src_dimensions", fmt.Sprintf("%dx%d", processResult.InputWidth, processResult.InputHeight),
|
|
"dst_dimensions", fmt.Sprintf("%dx%d", processResult.Width, processResult.Height),
|
|
"size_ratio", fmt.Sprintf("%.1f%%", sizePercent),
|
|
"convert_ms", processDuration.Milliseconds(),
|
|
"quality", req.Quality,
|
|
"fit", req.FitMode,
|
|
)
|
|
|
|
// Store variant to cache
|
|
if err := s.cache.StoreVariant(cacheKey, bytes.NewReader(processedData), processResult.ContentType); err != nil {
|
|
s.log.Warn("failed to store variant", "error", err)
|
|
// Continue even if caching fails
|
|
}
|
|
|
|
return &ImageResponse{
|
|
Content: io.NopCloser(bytes.NewReader(processedData)),
|
|
ContentLength: outputSize,
|
|
ContentType: processResult.ContentType,
|
|
FetchedBytes: fetchBytes,
|
|
ETag: formatETag(cacheKey),
|
|
}, nil
|
|
}
|
|
|
|
// Warm pre-fetches and caches an image without returning it.
|
|
func (s *Service) Warm(ctx context.Context, req *ImageRequest) error {
|
|
_, err := s.Get(ctx, req)
|
|
|
|
return err
|
|
}
|
|
|
|
// Purge removes a cached image.
|
|
func (s *Service) Purge(_ context.Context, _ *ImageRequest) error {
|
|
// TODO: Implement purge
|
|
return errors.New("purge not implemented")
|
|
}
|
|
|
|
// Stats returns cache statistics.
|
|
func (s *Service) Stats(ctx context.Context) (*CacheStats, error) {
|
|
return s.cache.Stats(ctx)
|
|
}
|
|
|
|
// ValidateRequest validates the request signature if required.
|
|
func (s *Service) ValidateRequest(req *ImageRequest) error {
|
|
// Check if host is whitelisted (no signature required)
|
|
sourceURL := req.SourceURL()
|
|
|
|
parsedURL, err := url.Parse(sourceURL)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid source URL: %w", err)
|
|
}
|
|
|
|
if s.whitelist.IsWhitelisted(parsedURL) {
|
|
return nil
|
|
}
|
|
|
|
// Signature required for non-whitelisted hosts
|
|
return s.signer.Verify(req)
|
|
}
|
|
|
|
// GenerateSignedURL generates a signed URL for the given request.
|
|
func (s *Service) GenerateSignedURL(
|
|
baseURL string,
|
|
req *ImageRequest,
|
|
ttl time.Duration,
|
|
) (string, error) {
|
|
path, sig, exp := s.signer.GenerateSignedURL(req, ttl)
|
|
|
|
return fmt.Sprintf("%s%s?sig=%s&exp=%d", baseURL, path, sig, exp), nil
|
|
}
|
|
|
|
// HTTP status codes for error responses.
|
|
const (
|
|
httpStatusBadGateway = 502
|
|
httpStatusInternalError = 500
|
|
)
|
|
|
|
// isNegativeCacheable returns true if the error should be cached.
|
|
func isNegativeCacheable(err error) bool {
|
|
return errors.Is(err, ErrUpstreamError)
|
|
}
|
|
|
|
// extractStatusCode extracts HTTP status code from error message.
|
|
func extractStatusCode(err error) int {
|
|
// Default to 502 Bad Gateway for upstream errors
|
|
if errors.Is(err, ErrUpstreamError) {
|
|
return httpStatusBadGateway
|
|
}
|
|
|
|
return httpStatusInternalError
|
|
}
|
|
|
|
// etagHashLength is the number of hash characters to use for ETags.
|
|
const etagHashLength = 16
|
|
|
|
// formatETag formats a VariantKey as a quoted ETag value.
|
|
func formatETag(key VariantKey) string {
|
|
hash := string(key)
|
|
// Use first 16 characters of hash for a shorter but still unique ETag
|
|
if len(hash) > etagHashLength {
|
|
hash = hash[:etagHashLength]
|
|
}
|
|
|
|
return `"` + hash + `"`
|
|
}
|