Files
pixa/internal/imgcache/service.go
sneak 77c6744383 Add upstream connection info and download metrics to logging
- Capture TLS version, cipher suite, HTTP version, and remote addr
- Add download bitrate using go-humanize SI formatting
- Use consistent WxH format for dimensions (not struct notation)
- Rename input/output to src/dst for consistency
- Add separate "upstream fetched" log with connection details
2026-01-08 12:47:31 -08:00

328 lines
8.8 KiB
Go

package imgcache
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/url"
"time"
"github.com/dustin/go-humanize"
)
// Service implements the ImageCache interface, orchestrating cache, fetcher, and processor.
type Service struct {
cache *Cache
fetcher Fetcher
processor Processor
signer *Signer
whitelist *HostWhitelist
log *slog.Logger
}
// ServiceConfig holds configuration for the image service.
type ServiceConfig struct {
// Cache is the cache instance
Cache *Cache
// FetcherConfig configures the upstream fetcher (ignored if Fetcher is set)
FetcherConfig *FetcherConfig
// Fetcher is an optional custom fetcher (for testing)
Fetcher Fetcher
// SigningKey is the HMAC signing key (empty disables signing)
SigningKey string
// Whitelist is the list of hosts that don't require signatures
Whitelist []string
// Logger for logging
Logger *slog.Logger
}
// NewService creates a new image service.
func NewService(cfg *ServiceConfig) (*Service, error) {
if cfg.Cache == nil {
return nil, errors.New("cache is required")
}
// Use custom fetcher if provided, otherwise create HTTP fetcher
var fetcher Fetcher
if cfg.Fetcher != nil {
fetcher = cfg.Fetcher
} else {
fetcherCfg := cfg.FetcherConfig
if fetcherCfg == nil {
fetcherCfg = DefaultFetcherConfig()
}
fetcher = NewHTTPFetcher(fetcherCfg)
}
var signer *Signer
if cfg.SigningKey != "" {
signer = NewSigner(cfg.SigningKey)
}
log := cfg.Logger
if log == nil {
log = slog.Default()
}
return &Service{
cache: cfg.Cache,
fetcher: fetcher,
processor: NewImageProcessor(),
signer: signer,
whitelist: NewHostWhitelist(cfg.Whitelist),
log: log,
}, nil
}
// Get retrieves a processed image, fetching and processing if necessary.
func (s *Service) Get(ctx context.Context, req *ImageRequest) (*ImageResponse, error) {
// Check cache first
result, err := s.cache.Lookup(ctx, req)
if err != nil {
if errors.Is(err, ErrNegativeCache) {
return nil, fmt.Errorf("upstream returned error (cached)")
}
s.log.Warn("cache lookup failed", "error", err)
}
// Cache hit - serve from cache
if result != nil && result.Hit {
s.cache.IncrementStats(ctx, true, 0)
reader, size, err := s.cache.GetOutputWithSize(result.OutputHash)
if err != nil {
s.log.Error("failed to get cached output", "hash", result.OutputHash, "error", err)
// Fall through to re-fetch
} else {
return &ImageResponse{
Content: reader,
ContentLength: size,
ContentType: result.ContentType,
CacheStatus: CacheHit,
ETag: formatETag(result.OutputHash),
}, nil
}
}
// Cache miss - need to fetch, process, and cache
s.cache.IncrementStats(ctx, false, 0)
response, err := s.fetchAndProcess(ctx, req)
if err != nil {
return nil, err
}
response.CacheStatus = CacheMiss
return response, nil
}
// fetchAndProcess fetches from upstream, processes, and caches the result.
func (s *Service) fetchAndProcess(ctx context.Context, req *ImageRequest) (*ImageResponse, error) {
// Fetch from upstream
sourceURL := req.SourceURL()
s.log.Debug("fetching from upstream", "url", sourceURL)
fetchResult, err := s.fetcher.Fetch(ctx, sourceURL)
if err != nil {
// Store negative cache for certain errors
if isNegativeCacheable(err) {
statusCode := extractStatusCode(err)
_ = s.cache.StoreNegative(ctx, req, statusCode, err.Error())
}
return nil, fmt.Errorf("upstream fetch failed: %w", err)
}
defer func() { _ = fetchResult.Content.Close() }()
// Read and validate the source content
sourceData, err := io.ReadAll(fetchResult.Content)
if err != nil {
return nil, fmt.Errorf("failed to read upstream response: %w", err)
}
// Calculate download bitrate
fetchBytes := int64(len(sourceData))
var downloadRate string
if fetchResult.FetchDurationMs > 0 {
seconds := float64(fetchResult.FetchDurationMs) / 1000.0 //nolint:mnd // ms to seconds
bitsPerSecond := float64(fetchBytes*8) / seconds //nolint:mnd // bytes to bits
downloadRate = humanize.SI(bitsPerSecond, "bps")
}
// Log upstream fetch details
s.log.Info("upstream fetched",
"host", req.SourceHost,
"path", req.SourcePath,
"bytes", fetchBytes,
"fetch_ms", fetchResult.FetchDurationMs,
"rate", downloadRate,
"remote_addr", fetchResult.RemoteAddr,
"http", fetchResult.HTTPVersion,
"tls", fetchResult.TLSVersion,
"cipher", fetchResult.TLSCipherSuite,
)
// Validate magic bytes match content type
if err := ValidateMagicBytes(sourceData, fetchResult.ContentType); err != nil {
return nil, fmt.Errorf("content validation failed: %w", err)
}
// Store source content
_, err = s.cache.StoreSource(ctx, req, bytes.NewReader(sourceData), fetchResult)
if err != nil {
s.log.Warn("failed to store source content", "error", err)
// Continue even if caching fails
}
// Process the image
processStart := time.Now()
processResult, err := s.processor.Process(ctx, bytes.NewReader(sourceData), req)
if err != nil {
return nil, fmt.Errorf("image processing failed: %w", err)
}
processDuration := time.Since(processStart)
// Log conversion details
outputSize := processResult.ContentLength
sizePercent := float64(outputSize) / float64(fetchBytes) * 100.0 //nolint:mnd // percentage calculation
s.log.Info("image converted",
"host", req.SourceHost,
"path", req.SourcePath,
"src_format", processResult.InputFormat,
"dst_format", req.Format,
"src_bytes", fetchBytes,
"dst_bytes", outputSize,
"src_dimensions", fmt.Sprintf("%dx%d", processResult.InputWidth, processResult.InputHeight),
"dst_dimensions", fmt.Sprintf("%dx%d", processResult.Width, processResult.Height),
"size_ratio", fmt.Sprintf("%.1f%%", sizePercent),
"convert_ms", processDuration.Milliseconds(),
"quality", req.Quality,
"fit", req.FitMode,
)
// Store output content to cache
metaID, err := s.cache.GetSourceMetadataID(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get source metadata ID: %w", err)
}
outputHash, err := s.cache.StoreOutput(ctx, req, metaID, processResult.Content, processResult.ContentType)
_ = processResult.Content.Close()
if err != nil {
return nil, fmt.Errorf("failed to store output content: %w", err)
}
// Serve from the cached file on disk (same path as cache hits)
reader, size, err := s.cache.GetOutputWithSize(outputHash)
if err != nil {
return nil, fmt.Errorf("failed to read cached output: %w", err)
}
return &ImageResponse{
Content: reader,
ContentLength: size,
ContentType: processResult.ContentType,
FetchedBytes: int64(len(sourceData)),
ETag: formatETag(outputHash),
}, nil
}
// Warm pre-fetches and caches an image without returning it.
func (s *Service) Warm(ctx context.Context, req *ImageRequest) error {
_, err := s.Get(ctx, req)
return err
}
// Purge removes a cached image.
func (s *Service) Purge(_ context.Context, _ *ImageRequest) error {
// TODO: Implement purge
return errors.New("purge not implemented")
}
// Stats returns cache statistics.
func (s *Service) Stats(ctx context.Context) (*CacheStats, error) {
return s.cache.Stats(ctx)
}
// ValidateRequest validates the request signature if required.
func (s *Service) ValidateRequest(req *ImageRequest) error {
// Check if host is whitelisted (no signature required)
sourceURL := req.SourceURL()
parsedURL, err := url.Parse(sourceURL)
if err != nil {
return fmt.Errorf("invalid source URL: %w", err)
}
if s.whitelist.IsWhitelisted(parsedURL) {
return nil
}
// Signature required
if s.signer == nil {
return errors.New("signing key not configured but host not whitelisted")
}
return s.signer.Verify(req)
}
// GenerateSignedURL generates a signed URL for the given request.
func (s *Service) GenerateSignedURL(
baseURL string,
req *ImageRequest,
ttl time.Duration,
) (string, error) {
if s.signer == nil {
return "", errors.New("signing key not configured")
}
path, sig, exp := s.signer.GenerateSignedURL(req, ttl)
return fmt.Sprintf("%s%s?sig=%s&exp=%d", baseURL, path, sig, exp), nil
}
// HTTP status codes for error responses.
const (
httpStatusBadGateway = 502
httpStatusInternalError = 500
)
// isNegativeCacheable returns true if the error should be cached.
func isNegativeCacheable(err error) bool {
return errors.Is(err, ErrUpstreamError)
}
// extractStatusCode extracts HTTP status code from error message.
func extractStatusCode(err error) int {
// Default to 502 Bad Gateway for upstream errors
if errors.Is(err, ErrUpstreamError) {
return httpStatusBadGateway
}
return httpStatusInternalError
}
// etagHashLength is the number of hash characters to use for ETags.
const etagHashLength = 16
// formatETag formats a hash as a quoted ETag value.
func formatETag(hash string) string {
// Use first 16 characters of hash for a shorter but still unique ETag
if len(hash) > etagHashLength {
hash = hash[:etagHashLength]
}
return `"` + hash + `"`
}