Stats() was scanning 5 SQL columns (hit_count, miss_count, upstream_fetch_count, upstream_fetch_bytes, transform_count) into mismatched struct fields, causing HitRate to contain the integer transform_count instead of a 0.0-1.0 ratio. Simplify the query to only fetch hit_count and miss_count, then compute TotalItems, TotalSizeBytes, and HitRate correctly. Fixes #4
344 lines
10 KiB
Go
344 lines
10 KiB
Go
package imgcache
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Cache errors.
|
|
var (
|
|
ErrCacheMiss = errors.New("cache miss")
|
|
ErrNegativeCache = errors.New("negative cache hit")
|
|
)
|
|
|
|
// HTTP status code for successful fetch.
|
|
const httpStatusOK = 200
|
|
|
|
// CacheConfig holds cache configuration.
|
|
type CacheConfig struct {
|
|
StateDir string
|
|
CacheTTL time.Duration
|
|
NegativeTTL time.Duration
|
|
}
|
|
|
|
// variantMeta stores content type for fast cache hits without reading .meta file.
|
|
type variantMeta struct {
|
|
ContentType string
|
|
Size int64
|
|
}
|
|
|
|
// Cache implements the caching layer for the image proxy.
|
|
type Cache struct {
|
|
db *sql.DB
|
|
srcContent *ContentStorage // source images by content hash
|
|
variants *VariantStorage // processed variants by cache key
|
|
srcMetadata *MetadataStorage // source metadata by host/path
|
|
config CacheConfig
|
|
|
|
// In-memory cache of variant metadata (content type, size) to avoid reading .meta files
|
|
metaCache map[VariantKey]variantMeta
|
|
metaCacheMu sync.RWMutex
|
|
}
|
|
|
|
// NewCache creates a new cache instance.
|
|
func NewCache(db *sql.DB, config CacheConfig) (*Cache, error) {
|
|
srcContent, err := NewContentStorage(filepath.Join(config.StateDir, "cache", "sources"))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create source content storage: %w", err)
|
|
}
|
|
|
|
variants, err := NewVariantStorage(filepath.Join(config.StateDir, "cache", "variants"))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create variant storage: %w", err)
|
|
}
|
|
|
|
srcMetadata, err := NewMetadataStorage(filepath.Join(config.StateDir, "cache", "metadata"))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create source metadata storage: %w", err)
|
|
}
|
|
|
|
return &Cache{
|
|
db: db,
|
|
srcContent: srcContent,
|
|
variants: variants,
|
|
srcMetadata: srcMetadata,
|
|
config: config,
|
|
metaCache: make(map[VariantKey]variantMeta),
|
|
}, nil
|
|
}
|
|
|
|
// LookupResult contains the result of a cache lookup.
|
|
type LookupResult struct {
|
|
Hit bool
|
|
CacheKey VariantKey
|
|
ContentType string
|
|
SizeBytes int64
|
|
CacheStatus CacheStatus
|
|
}
|
|
|
|
// Lookup checks if a processed variant exists on disk (no DB access for hits).
|
|
func (c *Cache) Lookup(_ context.Context, req *ImageRequest) (*LookupResult, error) {
|
|
cacheKey := CacheKey(req)
|
|
|
|
// Check variant storage directly - no DB needed for cache hits
|
|
if c.variants.Exists(cacheKey) {
|
|
return &LookupResult{
|
|
Hit: true,
|
|
CacheKey: cacheKey,
|
|
CacheStatus: CacheHit,
|
|
}, nil
|
|
}
|
|
|
|
return &LookupResult{
|
|
Hit: false,
|
|
CacheKey: cacheKey,
|
|
CacheStatus: CacheMiss,
|
|
}, nil
|
|
}
|
|
|
|
// GetVariant returns a reader, size, and content type for a cached variant.
|
|
func (c *Cache) GetVariant(cacheKey VariantKey) (io.ReadCloser, int64, string, error) {
|
|
return c.variants.LoadWithMeta(cacheKey)
|
|
}
|
|
|
|
// StoreSource stores fetched source content and metadata.
|
|
func (c *Cache) StoreSource(
|
|
ctx context.Context,
|
|
req *ImageRequest,
|
|
content io.Reader,
|
|
result *FetchResult,
|
|
) (ContentHash, error) {
|
|
// Store content
|
|
contentHash, size, err := c.srcContent.Store(content)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to store source content: %w", err)
|
|
}
|
|
|
|
// Store in database
|
|
pathHash := HashPath(req.SourcePath + "?" + req.SourceQuery)
|
|
headersJSON, _ := json.Marshal(result.Headers)
|
|
|
|
_, err = c.db.ExecContext(ctx, `
|
|
INSERT INTO source_content (content_hash, content_type, size_bytes)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(content_hash) DO NOTHING
|
|
`, contentHash, result.ContentType, size)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to insert source content: %w", err)
|
|
}
|
|
|
|
_, err = c.db.ExecContext(ctx, `
|
|
INSERT INTO source_metadata
|
|
(source_host, source_path, source_query, path_hash,
|
|
content_hash, status_code, content_type, response_headers)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(source_host, source_path, source_query) DO UPDATE SET
|
|
content_hash = excluded.content_hash,
|
|
status_code = excluded.status_code,
|
|
content_type = excluded.content_type,
|
|
response_headers = excluded.response_headers,
|
|
fetched_at = CURRENT_TIMESTAMP
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery, pathHash,
|
|
contentHash, httpStatusOK, result.ContentType, string(headersJSON))
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to insert source metadata: %w", err)
|
|
}
|
|
|
|
// Store metadata JSON file
|
|
meta := &SourceMetadata{
|
|
Host: req.SourceHost,
|
|
Path: req.SourcePath,
|
|
Query: req.SourceQuery,
|
|
ContentHash: string(contentHash),
|
|
StatusCode: result.StatusCode,
|
|
ContentType: result.ContentType,
|
|
ContentLength: result.ContentLength,
|
|
ResponseHeaders: result.Headers,
|
|
FetchedAt: time.Now().UTC().Unix(),
|
|
FetchDurationMs: result.FetchDurationMs,
|
|
RemoteAddr: result.RemoteAddr,
|
|
}
|
|
|
|
if err := c.srcMetadata.Store(req.SourceHost, pathHash, meta); err != nil {
|
|
// Non-fatal, we have it in the database
|
|
_ = err
|
|
}
|
|
|
|
return contentHash, nil
|
|
}
|
|
|
|
// StoreVariant stores a processed variant by its cache key.
|
|
func (c *Cache) StoreVariant(cacheKey VariantKey, content io.Reader, contentType string) error {
|
|
_, err := c.variants.Store(cacheKey, content, contentType)
|
|
return err
|
|
}
|
|
|
|
// LookupSource checks if we have cached source content for a request.
|
|
// Returns the content hash and content type if found, or empty values if not.
|
|
func (c *Cache) LookupSource(ctx context.Context, req *ImageRequest) (ContentHash, string, error) {
|
|
var hashStr, contentType string
|
|
|
|
err := c.db.QueryRowContext(ctx, `
|
|
SELECT content_hash, content_type FROM source_metadata
|
|
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&hashStr, &contentType)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return "", "", nil
|
|
}
|
|
|
|
if err != nil {
|
|
return "", "", fmt.Errorf("failed to lookup source: %w", err)
|
|
}
|
|
|
|
contentHash := ContentHash(hashStr)
|
|
|
|
// Verify the content file exists
|
|
if !c.srcContent.Exists(contentHash) {
|
|
return "", "", nil
|
|
}
|
|
|
|
return contentHash, contentType, nil
|
|
}
|
|
|
|
// StoreNegative stores a negative cache entry for a failed fetch.
|
|
func (c *Cache) StoreNegative(ctx context.Context, req *ImageRequest, statusCode int, errMsg string) error {
|
|
expiresAt := time.Now().UTC().Add(c.config.NegativeTTL)
|
|
|
|
_, err := c.db.ExecContext(ctx, `
|
|
INSERT INTO negative_cache (source_host, source_path, source_query, status_code, error_message, expires_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(source_host, source_path, source_query) DO UPDATE SET
|
|
status_code = excluded.status_code,
|
|
error_message = excluded.error_message,
|
|
fetched_at = CURRENT_TIMESTAMP,
|
|
expires_at = excluded.expires_at
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery, statusCode, errMsg, expiresAt)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert negative cache: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// checkNegativeCache checks if a request is in the negative cache.
|
|
func (c *Cache) checkNegativeCache(ctx context.Context, req *ImageRequest) (bool, error) {
|
|
var expiresAt time.Time
|
|
|
|
err := c.db.QueryRowContext(ctx, `
|
|
SELECT expires_at FROM negative_cache
|
|
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&expiresAt)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return false, nil
|
|
}
|
|
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to check negative cache: %w", err)
|
|
}
|
|
|
|
// Check if expired
|
|
if time.Now().After(expiresAt) {
|
|
// Clean up expired entry
|
|
_, _ = c.db.ExecContext(ctx, `
|
|
DELETE FROM negative_cache
|
|
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery)
|
|
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// GetSourceMetadataID returns the source metadata ID for a request.
|
|
func (c *Cache) GetSourceMetadataID(ctx context.Context, req *ImageRequest) (int64, error) {
|
|
var id int64
|
|
|
|
err := c.db.QueryRowContext(ctx, `
|
|
SELECT id FROM source_metadata
|
|
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&id)
|
|
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to get source metadata ID: %w", err)
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// GetSourceContent returns a reader for cached source content by its hash.
|
|
func (c *Cache) GetSourceContent(contentHash ContentHash) (io.ReadCloser, error) {
|
|
return c.srcContent.Load(contentHash)
|
|
}
|
|
|
|
// CleanExpired removes expired entries from the cache.
|
|
func (c *Cache) CleanExpired(ctx context.Context) error {
|
|
// Clean expired negative cache entries
|
|
_, err := c.db.ExecContext(ctx, `
|
|
DELETE FROM negative_cache WHERE expires_at < CURRENT_TIMESTAMP
|
|
`)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to clean negative cache: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stats returns cache statistics.
|
|
func (c *Cache) Stats(ctx context.Context) (*CacheStats, error) {
|
|
var stats CacheStats
|
|
|
|
// Fetch hit/miss counts from the stats table
|
|
err := c.db.QueryRowContext(ctx, `
|
|
SELECT hit_count, miss_count
|
|
FROM cache_stats WHERE id = 1
|
|
`).Scan(&stats.HitCount, &stats.MissCount)
|
|
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
return nil, fmt.Errorf("failed to get cache stats: %w", err)
|
|
}
|
|
|
|
// Get actual item count and total size from content tables
|
|
_ = c.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM request_cache`).Scan(&stats.TotalItems)
|
|
_ = c.db.QueryRowContext(ctx, `SELECT COALESCE(SUM(size_bytes), 0) FROM output_content`).Scan(&stats.TotalSizeBytes)
|
|
|
|
// Compute hit rate as a ratio
|
|
if stats.HitCount+stats.MissCount > 0 {
|
|
stats.HitRate = float64(stats.HitCount) / float64(stats.HitCount+stats.MissCount)
|
|
}
|
|
|
|
return &stats, nil
|
|
}
|
|
|
|
// IncrementStats increments cache statistics.
|
|
func (c *Cache) IncrementStats(ctx context.Context, hit bool, fetchBytes int64) {
|
|
if hit {
|
|
_, _ = c.db.ExecContext(ctx, `
|
|
UPDATE cache_stats SET hit_count = hit_count + 1, last_updated_at = CURRENT_TIMESTAMP WHERE id = 1
|
|
`)
|
|
} else {
|
|
_, _ = c.db.ExecContext(ctx, `
|
|
UPDATE cache_stats SET miss_count = miss_count + 1, last_updated_at = CURRENT_TIMESTAMP WHERE id = 1
|
|
`)
|
|
}
|
|
|
|
if fetchBytes > 0 {
|
|
_, _ = c.db.ExecContext(ctx, `
|
|
UPDATE cache_stats
|
|
SET upstream_fetch_count = upstream_fetch_count + 1,
|
|
upstream_fetch_bytes = upstream_fetch_bytes + ?,
|
|
last_updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = 1
|
|
`, fetchBytes)
|
|
}
|
|
}
|