Hot cache entries now store all data needed to serve a cache hit without any database access: - OutputHash (for file lookup) - ContentType (for Content-Type header) - SizeBytes (for Content-Length header) Previously hot cache only stored OutputHash, causing empty Content-Type headers on cached WebP responses.
438 lines
12 KiB
Go
438 lines
12 KiB
Go
package imgcache
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Cache errors.
|
|
var (
|
|
ErrCacheMiss = errors.New("cache miss")
|
|
ErrNegativeCache = errors.New("negative cache hit")
|
|
)
|
|
|
|
// HTTP status code for successful fetch.
|
|
const httpStatusOK = 200
|
|
|
|
// CacheConfig holds cache configuration.
|
|
type CacheConfig struct {
|
|
StateDir string
|
|
CacheTTL time.Duration
|
|
NegativeTTL time.Duration
|
|
HotCacheSize int
|
|
HotCacheEnabled bool
|
|
}
|
|
|
|
// hotCacheEntry stores all data needed to serve a cache hit without DB access.
|
|
type hotCacheEntry struct {
|
|
OutputHash string
|
|
ContentType string
|
|
SizeBytes int64
|
|
}
|
|
|
|
// Cache implements the caching layer for the image proxy.
|
|
type Cache struct {
|
|
db *sql.DB
|
|
srcContent *ContentStorage
|
|
dstContent *ContentStorage
|
|
srcMetadata *MetadataStorage
|
|
config CacheConfig
|
|
hotCache map[string]hotCacheEntry // cache_key -> entry
|
|
hotCacheMu sync.RWMutex
|
|
hotCacheEnabled bool
|
|
}
|
|
|
|
// NewCache creates a new cache instance.
|
|
func NewCache(db *sql.DB, config CacheConfig) (*Cache, error) {
|
|
srcContent, err := NewContentStorage(filepath.Join(config.StateDir, "cache", "src-content"))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create source content storage: %w", err)
|
|
}
|
|
|
|
dstContent, err := NewContentStorage(filepath.Join(config.StateDir, "cache", "dst-content"))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create destination content storage: %w", err)
|
|
}
|
|
|
|
srcMetadata, err := NewMetadataStorage(filepath.Join(config.StateDir, "cache", "src-metadata"))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create source metadata storage: %w", err)
|
|
}
|
|
|
|
c := &Cache{
|
|
db: db,
|
|
srcContent: srcContent,
|
|
dstContent: dstContent,
|
|
srcMetadata: srcMetadata,
|
|
config: config,
|
|
hotCacheEnabled: config.HotCacheEnabled,
|
|
}
|
|
|
|
if config.HotCacheEnabled && config.HotCacheSize > 0 {
|
|
c.hotCache = make(map[string]hotCacheEntry, config.HotCacheSize)
|
|
}
|
|
|
|
return c, nil
|
|
}
|
|
|
|
// LookupResult contains the result of a cache lookup.
|
|
type LookupResult struct {
|
|
Hit bool
|
|
OutputHash string
|
|
ContentType string
|
|
SizeBytes int64
|
|
CacheStatus CacheStatus
|
|
}
|
|
|
|
// Lookup checks if a processed image exists in the cache.
|
|
func (c *Cache) Lookup(ctx context.Context, req *ImageRequest) (*LookupResult, error) {
|
|
cacheKey := CacheKey(req)
|
|
|
|
// Check hot cache first
|
|
if c.hotCacheEnabled {
|
|
c.hotCacheMu.RLock()
|
|
entry, ok := c.hotCache[cacheKey]
|
|
c.hotCacheMu.RUnlock()
|
|
|
|
if ok && c.dstContent.Exists(entry.OutputHash) {
|
|
return &LookupResult{
|
|
Hit: true,
|
|
OutputHash: entry.OutputHash,
|
|
ContentType: entry.ContentType,
|
|
SizeBytes: entry.SizeBytes,
|
|
CacheStatus: CacheHit,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
// Check negative cache
|
|
negCached, err := c.checkNegativeCache(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if negCached {
|
|
return nil, ErrNegativeCache
|
|
}
|
|
|
|
// Check database
|
|
var outputHash, contentType string
|
|
var sizeBytes int64
|
|
var fetchedAt time.Time
|
|
|
|
err = c.db.QueryRowContext(ctx, `
|
|
SELECT rc.output_hash, oc.content_type, oc.size_bytes, rc.fetched_at
|
|
FROM request_cache rc
|
|
JOIN output_content oc ON rc.output_hash = oc.content_hash
|
|
WHERE rc.cache_key = ?
|
|
`, cacheKey).Scan(&outputHash, &contentType, &sizeBytes, &fetchedAt)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return &LookupResult{Hit: false, CacheStatus: CacheMiss}, nil
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query cache: %w", err)
|
|
}
|
|
|
|
// Check TTL
|
|
if c.config.CacheTTL > 0 && time.Since(fetchedAt) > c.config.CacheTTL {
|
|
return &LookupResult{Hit: false, CacheStatus: CacheStale}, nil
|
|
}
|
|
|
|
// Verify file exists on disk
|
|
if !c.dstContent.Exists(outputHash) {
|
|
return &LookupResult{Hit: false, CacheStatus: CacheMiss}, nil
|
|
}
|
|
|
|
// Update hot cache
|
|
if c.hotCacheEnabled {
|
|
c.hotCacheMu.Lock()
|
|
c.hotCache[cacheKey] = hotCacheEntry{
|
|
OutputHash: outputHash,
|
|
ContentType: contentType,
|
|
SizeBytes: sizeBytes,
|
|
}
|
|
c.hotCacheMu.Unlock()
|
|
}
|
|
|
|
// Update access count
|
|
_, _ = c.db.ExecContext(ctx, `
|
|
UPDATE request_cache
|
|
SET access_count = access_count + 1
|
|
WHERE cache_key = ?
|
|
`, cacheKey)
|
|
|
|
return &LookupResult{
|
|
Hit: true,
|
|
OutputHash: outputHash,
|
|
ContentType: contentType,
|
|
SizeBytes: sizeBytes,
|
|
CacheStatus: CacheHit,
|
|
}, nil
|
|
}
|
|
|
|
// GetOutput returns a reader for cached output content.
|
|
func (c *Cache) GetOutput(outputHash string) (io.ReadCloser, error) {
|
|
return c.dstContent.Load(outputHash)
|
|
}
|
|
|
|
// GetOutputWithSize returns a reader and size for cached output content.
|
|
func (c *Cache) GetOutputWithSize(outputHash string) (io.ReadCloser, int64, error) {
|
|
return c.dstContent.LoadWithSize(outputHash)
|
|
}
|
|
|
|
// StoreSource stores fetched source content and metadata.
|
|
func (c *Cache) StoreSource(
|
|
ctx context.Context,
|
|
req *ImageRequest,
|
|
content io.Reader,
|
|
result *FetchResult,
|
|
) (contentHash string, err error) {
|
|
// Store content
|
|
contentHash, size, err := c.srcContent.Store(content)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to store source content: %w", err)
|
|
}
|
|
|
|
// Store in database
|
|
pathHash := HashPath(req.SourcePath + "?" + req.SourceQuery)
|
|
headersJSON, _ := json.Marshal(result.Headers)
|
|
|
|
_, err = c.db.ExecContext(ctx, `
|
|
INSERT INTO source_content (content_hash, content_type, size_bytes)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(content_hash) DO NOTHING
|
|
`, contentHash, result.ContentType, size)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to insert source content: %w", err)
|
|
}
|
|
|
|
_, err = c.db.ExecContext(ctx, `
|
|
INSERT INTO source_metadata
|
|
(source_host, source_path, source_query, path_hash,
|
|
content_hash, status_code, content_type, response_headers)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(source_host, source_path, source_query) DO UPDATE SET
|
|
content_hash = excluded.content_hash,
|
|
status_code = excluded.status_code,
|
|
content_type = excluded.content_type,
|
|
response_headers = excluded.response_headers,
|
|
fetched_at = CURRENT_TIMESTAMP
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery, pathHash,
|
|
contentHash, httpStatusOK, result.ContentType, string(headersJSON))
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to insert source metadata: %w", err)
|
|
}
|
|
|
|
// Store metadata JSON file
|
|
meta := &SourceMetadata{
|
|
Host: req.SourceHost,
|
|
Path: req.SourcePath,
|
|
Query: req.SourceQuery,
|
|
ContentHash: contentHash,
|
|
StatusCode: httpStatusOK,
|
|
ContentType: result.ContentType,
|
|
ResponseHeaders: result.Headers,
|
|
FetchedAt: time.Now().Unix(),
|
|
}
|
|
|
|
if err := c.srcMetadata.Store(req.SourceHost, pathHash, meta); err != nil {
|
|
// Non-fatal, we have it in the database
|
|
_ = err
|
|
}
|
|
|
|
return contentHash, nil
|
|
}
|
|
|
|
// StoreOutput stores processed output content and returns the output hash.
|
|
func (c *Cache) StoreOutput(
|
|
ctx context.Context,
|
|
req *ImageRequest,
|
|
sourceMetadataID int64,
|
|
content io.Reader,
|
|
contentType string,
|
|
) (string, error) {
|
|
// Store content
|
|
outputHash, size, err := c.dstContent.Store(content)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to store output content: %w", err)
|
|
}
|
|
|
|
cacheKey := CacheKey(req)
|
|
|
|
// Store in database
|
|
_, err = c.db.ExecContext(ctx, `
|
|
INSERT INTO output_content (content_hash, content_type, size_bytes)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(content_hash) DO NOTHING
|
|
`, outputHash, contentType, size)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to insert output content: %w", err)
|
|
}
|
|
|
|
_, err = c.db.ExecContext(ctx, `
|
|
INSERT INTO request_cache (cache_key, source_metadata_id, output_hash, width, height, format, quality, fit_mode)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(cache_key) DO UPDATE SET
|
|
output_hash = excluded.output_hash,
|
|
fetched_at = CURRENT_TIMESTAMP,
|
|
access_count = request_cache.access_count + 1
|
|
`, cacheKey, sourceMetadataID, outputHash, req.Size.Width, req.Size.Height, req.Format, req.Quality, req.FitMode)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to insert request cache: %w", err)
|
|
}
|
|
|
|
// Update hot cache
|
|
if c.hotCacheEnabled {
|
|
c.hotCacheMu.Lock()
|
|
c.hotCache[cacheKey] = hotCacheEntry{
|
|
OutputHash: outputHash,
|
|
ContentType: contentType,
|
|
SizeBytes: size,
|
|
}
|
|
c.hotCacheMu.Unlock()
|
|
}
|
|
|
|
return outputHash, nil
|
|
}
|
|
|
|
// StoreNegative stores a negative cache entry for a failed fetch.
|
|
func (c *Cache) StoreNegative(ctx context.Context, req *ImageRequest, statusCode int, errMsg string) error {
|
|
expiresAt := time.Now().Add(c.config.NegativeTTL)
|
|
|
|
_, err := c.db.ExecContext(ctx, `
|
|
INSERT INTO negative_cache (source_host, source_path, source_query, status_code, error_message, expires_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(source_host, source_path, source_query) DO UPDATE SET
|
|
status_code = excluded.status_code,
|
|
error_message = excluded.error_message,
|
|
fetched_at = CURRENT_TIMESTAMP,
|
|
expires_at = excluded.expires_at
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery, statusCode, errMsg, expiresAt)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert negative cache: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// checkNegativeCache checks if a request is in the negative cache.
|
|
func (c *Cache) checkNegativeCache(ctx context.Context, req *ImageRequest) (bool, error) {
|
|
var expiresAt time.Time
|
|
|
|
err := c.db.QueryRowContext(ctx, `
|
|
SELECT expires_at FROM negative_cache
|
|
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&expiresAt)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return false, nil
|
|
}
|
|
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to check negative cache: %w", err)
|
|
}
|
|
|
|
// Check if expired
|
|
if time.Now().After(expiresAt) {
|
|
// Clean up expired entry
|
|
_, _ = c.db.ExecContext(ctx, `
|
|
DELETE FROM negative_cache
|
|
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery)
|
|
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// GetSourceMetadataID returns the source metadata ID for a request.
|
|
func (c *Cache) GetSourceMetadataID(ctx context.Context, req *ImageRequest) (int64, error) {
|
|
var id int64
|
|
|
|
err := c.db.QueryRowContext(ctx, `
|
|
SELECT id FROM source_metadata
|
|
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&id)
|
|
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to get source metadata ID: %w", err)
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// GetSourceContent returns a reader for cached source content by its hash.
|
|
func (c *Cache) GetSourceContent(contentHash string) (io.ReadCloser, error) {
|
|
return c.srcContent.Load(contentHash)
|
|
}
|
|
|
|
// CleanExpired removes expired entries from the cache.
|
|
func (c *Cache) CleanExpired(ctx context.Context) error {
|
|
// Clean expired negative cache entries
|
|
_, err := c.db.ExecContext(ctx, `
|
|
DELETE FROM negative_cache WHERE expires_at < CURRENT_TIMESTAMP
|
|
`)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to clean negative cache: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stats returns cache statistics.
|
|
func (c *Cache) Stats(ctx context.Context) (*CacheStats, error) {
|
|
var stats CacheStats
|
|
|
|
err := c.db.QueryRowContext(ctx, `
|
|
SELECT hit_count, miss_count, upstream_fetch_count, upstream_fetch_bytes, transform_count
|
|
FROM cache_stats WHERE id = 1
|
|
`).Scan(&stats.HitCount, &stats.MissCount, &stats.TotalItems, &stats.TotalSizeBytes, &stats.HitRate)
|
|
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
return nil, fmt.Errorf("failed to get cache stats: %w", err)
|
|
}
|
|
|
|
// Get actual counts
|
|
_ = c.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM request_cache`).Scan(&stats.TotalItems)
|
|
_ = c.db.QueryRowContext(ctx, `SELECT COALESCE(SUM(size_bytes), 0) FROM output_content`).Scan(&stats.TotalSizeBytes)
|
|
|
|
if stats.HitCount+stats.MissCount > 0 {
|
|
stats.HitRate = float64(stats.HitCount) / float64(stats.HitCount+stats.MissCount)
|
|
}
|
|
|
|
return &stats, nil
|
|
}
|
|
|
|
// IncrementStats increments cache statistics.
|
|
func (c *Cache) IncrementStats(ctx context.Context, hit bool, fetchBytes int64) {
|
|
if hit {
|
|
_, _ = c.db.ExecContext(ctx, `
|
|
UPDATE cache_stats SET hit_count = hit_count + 1, last_updated_at = CURRENT_TIMESTAMP WHERE id = 1
|
|
`)
|
|
} else {
|
|
_, _ = c.db.ExecContext(ctx, `
|
|
UPDATE cache_stats SET miss_count = miss_count + 1, last_updated_at = CURRENT_TIMESTAMP WHERE id = 1
|
|
`)
|
|
}
|
|
|
|
if fetchBytes > 0 {
|
|
_, _ = c.db.ExecContext(ctx, `
|
|
UPDATE cache_stats
|
|
SET upstream_fetch_count = upstream_fetch_count + 1,
|
|
upstream_fetch_bytes = upstream_fetch_bytes + ?,
|
|
last_updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = 1
|
|
`, fetchBytes)
|
|
}
|
|
}
|