Files
pixa/internal/imgcache/cache.go
sneak 15d9439e3d Add fetch/conversion metrics and improve logging
FetchResult now includes:
- StatusCode: HTTP status from upstream
- FetchDurationMs: time to fetch from upstream
- RemoteAddr: upstream server address

SourceMetadata now stores:
- ContentLength: size from upstream
- FetchDurationMs: fetch timing
- RemoteAddr: for debugging

Image conversion log now includes:
- host: source hostname (was missing)
- path: source path (renamed from file)
- convert_ms: image processing time
- quality: requested quality setting
- fit: requested fit mode
2026-01-08 12:34:26 -08:00

441 lines
12 KiB
Go

package imgcache
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"path/filepath"
"sync"
"time"
)
// Cache errors.
var (
ErrCacheMiss = errors.New("cache miss")
ErrNegativeCache = errors.New("negative cache hit")
)
// HTTP status code for successful fetch.
const httpStatusOK = 200
// CacheConfig holds cache configuration.
type CacheConfig struct {
StateDir string
CacheTTL time.Duration
NegativeTTL time.Duration
HotCacheSize int
HotCacheEnabled bool
}
// hotCacheEntry stores all data needed to serve a cache hit without DB access.
type hotCacheEntry struct {
OutputHash string
ContentType string
SizeBytes int64
}
// Cache implements the caching layer for the image proxy.
type Cache struct {
db *sql.DB
srcContent *ContentStorage
dstContent *ContentStorage
srcMetadata *MetadataStorage
config CacheConfig
hotCache map[string]hotCacheEntry // cache_key -> entry
hotCacheMu sync.RWMutex
hotCacheEnabled bool
}
// NewCache creates a new cache instance.
func NewCache(db *sql.DB, config CacheConfig) (*Cache, error) {
srcContent, err := NewContentStorage(filepath.Join(config.StateDir, "cache", "src-content"))
if err != nil {
return nil, fmt.Errorf("failed to create source content storage: %w", err)
}
dstContent, err := NewContentStorage(filepath.Join(config.StateDir, "cache", "dst-content"))
if err != nil {
return nil, fmt.Errorf("failed to create destination content storage: %w", err)
}
srcMetadata, err := NewMetadataStorage(filepath.Join(config.StateDir, "cache", "src-metadata"))
if err != nil {
return nil, fmt.Errorf("failed to create source metadata storage: %w", err)
}
c := &Cache{
db: db,
srcContent: srcContent,
dstContent: dstContent,
srcMetadata: srcMetadata,
config: config,
hotCacheEnabled: config.HotCacheEnabled,
}
if config.HotCacheEnabled && config.HotCacheSize > 0 {
c.hotCache = make(map[string]hotCacheEntry, config.HotCacheSize)
}
return c, nil
}
// LookupResult contains the result of a cache lookup.
type LookupResult struct {
Hit bool
OutputHash string
ContentType string
SizeBytes int64
CacheStatus CacheStatus
}
// Lookup checks if a processed image exists in the cache.
func (c *Cache) Lookup(ctx context.Context, req *ImageRequest) (*LookupResult, error) {
cacheKey := CacheKey(req)
// Check hot cache first
if c.hotCacheEnabled {
c.hotCacheMu.RLock()
entry, ok := c.hotCache[cacheKey]
c.hotCacheMu.RUnlock()
if ok && c.dstContent.Exists(entry.OutputHash) {
return &LookupResult{
Hit: true,
OutputHash: entry.OutputHash,
ContentType: entry.ContentType,
SizeBytes: entry.SizeBytes,
CacheStatus: CacheHit,
}, nil
}
}
// Check negative cache
negCached, err := c.checkNegativeCache(ctx, req)
if err != nil {
return nil, err
}
if negCached {
return nil, ErrNegativeCache
}
// Check database
var outputHash, contentType string
var sizeBytes int64
var fetchedAt time.Time
err = c.db.QueryRowContext(ctx, `
SELECT rc.output_hash, oc.content_type, oc.size_bytes, rc.fetched_at
FROM request_cache rc
JOIN output_content oc ON rc.output_hash = oc.content_hash
WHERE rc.cache_key = ?
`, cacheKey).Scan(&outputHash, &contentType, &sizeBytes, &fetchedAt)
if errors.Is(err, sql.ErrNoRows) {
return &LookupResult{Hit: false, CacheStatus: CacheMiss}, nil
}
if err != nil {
return nil, fmt.Errorf("failed to query cache: %w", err)
}
// Check TTL
if c.config.CacheTTL > 0 && time.Since(fetchedAt) > c.config.CacheTTL {
return &LookupResult{Hit: false, CacheStatus: CacheStale}, nil
}
// Verify file exists on disk
if !c.dstContent.Exists(outputHash) {
return &LookupResult{Hit: false, CacheStatus: CacheMiss}, nil
}
// Update hot cache
if c.hotCacheEnabled {
c.hotCacheMu.Lock()
c.hotCache[cacheKey] = hotCacheEntry{
OutputHash: outputHash,
ContentType: contentType,
SizeBytes: sizeBytes,
}
c.hotCacheMu.Unlock()
}
// Update access count
_, _ = c.db.ExecContext(ctx, `
UPDATE request_cache
SET access_count = access_count + 1
WHERE cache_key = ?
`, cacheKey)
return &LookupResult{
Hit: true,
OutputHash: outputHash,
ContentType: contentType,
SizeBytes: sizeBytes,
CacheStatus: CacheHit,
}, nil
}
// GetOutput returns a reader for cached output content.
func (c *Cache) GetOutput(outputHash string) (io.ReadCloser, error) {
return c.dstContent.Load(outputHash)
}
// GetOutputWithSize returns a reader and size for cached output content.
func (c *Cache) GetOutputWithSize(outputHash string) (io.ReadCloser, int64, error) {
return c.dstContent.LoadWithSize(outputHash)
}
// StoreSource stores fetched source content and metadata.
func (c *Cache) StoreSource(
ctx context.Context,
req *ImageRequest,
content io.Reader,
result *FetchResult,
) (contentHash string, err error) {
// Store content
contentHash, size, err := c.srcContent.Store(content)
if err != nil {
return "", fmt.Errorf("failed to store source content: %w", err)
}
// Store in database
pathHash := HashPath(req.SourcePath + "?" + req.SourceQuery)
headersJSON, _ := json.Marshal(result.Headers)
_, err = c.db.ExecContext(ctx, `
INSERT INTO source_content (content_hash, content_type, size_bytes)
VALUES (?, ?, ?)
ON CONFLICT(content_hash) DO NOTHING
`, contentHash, result.ContentType, size)
if err != nil {
return "", fmt.Errorf("failed to insert source content: %w", err)
}
_, err = c.db.ExecContext(ctx, `
INSERT INTO source_metadata
(source_host, source_path, source_query, path_hash,
content_hash, status_code, content_type, response_headers)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(source_host, source_path, source_query) DO UPDATE SET
content_hash = excluded.content_hash,
status_code = excluded.status_code,
content_type = excluded.content_type,
response_headers = excluded.response_headers,
fetched_at = CURRENT_TIMESTAMP
`, req.SourceHost, req.SourcePath, req.SourceQuery, pathHash,
contentHash, httpStatusOK, result.ContentType, string(headersJSON))
if err != nil {
return "", fmt.Errorf("failed to insert source metadata: %w", err)
}
// Store metadata JSON file
meta := &SourceMetadata{
Host: req.SourceHost,
Path: req.SourcePath,
Query: req.SourceQuery,
ContentHash: contentHash,
StatusCode: result.StatusCode,
ContentType: result.ContentType,
ContentLength: result.ContentLength,
ResponseHeaders: result.Headers,
FetchedAt: time.Now().Unix(),
FetchDurationMs: result.FetchDurationMs,
RemoteAddr: result.RemoteAddr,
}
if err := c.srcMetadata.Store(req.SourceHost, pathHash, meta); err != nil {
// Non-fatal, we have it in the database
_ = err
}
return contentHash, nil
}
// StoreOutput stores processed output content and returns the output hash.
func (c *Cache) StoreOutput(
ctx context.Context,
req *ImageRequest,
sourceMetadataID int64,
content io.Reader,
contentType string,
) (string, error) {
// Store content
outputHash, size, err := c.dstContent.Store(content)
if err != nil {
return "", fmt.Errorf("failed to store output content: %w", err)
}
cacheKey := CacheKey(req)
// Store in database
_, err = c.db.ExecContext(ctx, `
INSERT INTO output_content (content_hash, content_type, size_bytes)
VALUES (?, ?, ?)
ON CONFLICT(content_hash) DO NOTHING
`, outputHash, contentType, size)
if err != nil {
return "", fmt.Errorf("failed to insert output content: %w", err)
}
_, err = c.db.ExecContext(ctx, `
INSERT INTO request_cache (cache_key, source_metadata_id, output_hash, width, height, format, quality, fit_mode)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(cache_key) DO UPDATE SET
output_hash = excluded.output_hash,
fetched_at = CURRENT_TIMESTAMP,
access_count = request_cache.access_count + 1
`, cacheKey, sourceMetadataID, outputHash, req.Size.Width, req.Size.Height, req.Format, req.Quality, req.FitMode)
if err != nil {
return "", fmt.Errorf("failed to insert request cache: %w", err)
}
// Update hot cache
if c.hotCacheEnabled {
c.hotCacheMu.Lock()
c.hotCache[cacheKey] = hotCacheEntry{
OutputHash: outputHash,
ContentType: contentType,
SizeBytes: size,
}
c.hotCacheMu.Unlock()
}
return outputHash, nil
}
// StoreNegative stores a negative cache entry for a failed fetch.
func (c *Cache) StoreNegative(ctx context.Context, req *ImageRequest, statusCode int, errMsg string) error {
expiresAt := time.Now().Add(c.config.NegativeTTL)
_, err := c.db.ExecContext(ctx, `
INSERT INTO negative_cache (source_host, source_path, source_query, status_code, error_message, expires_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(source_host, source_path, source_query) DO UPDATE SET
status_code = excluded.status_code,
error_message = excluded.error_message,
fetched_at = CURRENT_TIMESTAMP,
expires_at = excluded.expires_at
`, req.SourceHost, req.SourcePath, req.SourceQuery, statusCode, errMsg, expiresAt)
if err != nil {
return fmt.Errorf("failed to insert negative cache: %w", err)
}
return nil
}
// checkNegativeCache checks if a request is in the negative cache.
func (c *Cache) checkNegativeCache(ctx context.Context, req *ImageRequest) (bool, error) {
var expiresAt time.Time
err := c.db.QueryRowContext(ctx, `
SELECT expires_at FROM negative_cache
WHERE source_host = ? AND source_path = ? AND source_query = ?
`, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&expiresAt)
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
if err != nil {
return false, fmt.Errorf("failed to check negative cache: %w", err)
}
// Check if expired
if time.Now().After(expiresAt) {
// Clean up expired entry
_, _ = c.db.ExecContext(ctx, `
DELETE FROM negative_cache
WHERE source_host = ? AND source_path = ? AND source_query = ?
`, req.SourceHost, req.SourcePath, req.SourceQuery)
return false, nil
}
return true, nil
}
// GetSourceMetadataID returns the source metadata ID for a request.
func (c *Cache) GetSourceMetadataID(ctx context.Context, req *ImageRequest) (int64, error) {
var id int64
err := c.db.QueryRowContext(ctx, `
SELECT id FROM source_metadata
WHERE source_host = ? AND source_path = ? AND source_query = ?
`, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&id)
if err != nil {
return 0, fmt.Errorf("failed to get source metadata ID: %w", err)
}
return id, nil
}
// GetSourceContent returns a reader for cached source content by its hash.
func (c *Cache) GetSourceContent(contentHash string) (io.ReadCloser, error) {
return c.srcContent.Load(contentHash)
}
// CleanExpired removes expired entries from the cache.
func (c *Cache) CleanExpired(ctx context.Context) error {
// Clean expired negative cache entries
_, err := c.db.ExecContext(ctx, `
DELETE FROM negative_cache WHERE expires_at < CURRENT_TIMESTAMP
`)
if err != nil {
return fmt.Errorf("failed to clean negative cache: %w", err)
}
return nil
}
// Stats returns cache statistics.
func (c *Cache) Stats(ctx context.Context) (*CacheStats, error) {
var stats CacheStats
err := c.db.QueryRowContext(ctx, `
SELECT hit_count, miss_count, upstream_fetch_count, upstream_fetch_bytes, transform_count
FROM cache_stats WHERE id = 1
`).Scan(&stats.HitCount, &stats.MissCount, &stats.TotalItems, &stats.TotalSizeBytes, &stats.HitRate)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("failed to get cache stats: %w", err)
}
// Get actual counts
_ = c.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM request_cache`).Scan(&stats.TotalItems)
_ = c.db.QueryRowContext(ctx, `SELECT COALESCE(SUM(size_bytes), 0) FROM output_content`).Scan(&stats.TotalSizeBytes)
if stats.HitCount+stats.MissCount > 0 {
stats.HitRate = float64(stats.HitCount) / float64(stats.HitCount+stats.MissCount)
}
return &stats, nil
}
// IncrementStats increments cache statistics.
func (c *Cache) IncrementStats(ctx context.Context, hit bool, fetchBytes int64) {
if hit {
_, _ = c.db.ExecContext(ctx, `
UPDATE cache_stats SET hit_count = hit_count + 1, last_updated_at = CURRENT_TIMESTAMP WHERE id = 1
`)
} else {
_, _ = c.db.ExecContext(ctx, `
UPDATE cache_stats SET miss_count = miss_count + 1, last_updated_at = CURRENT_TIMESTAMP WHERE id = 1
`)
}
if fetchBytes > 0 {
_, _ = c.db.ExecContext(ctx, `
UPDATE cache_stats
SET upstream_fetch_count = upstream_fetch_count + 1,
upstream_fetch_bytes = upstream_fetch_bytes + ?,
last_updated_at = CURRENT_TIMESTAMP
WHERE id = 1
`, fetchBytes)
}
}