Define ContentHash, VariantKey, and PathHash types to replace raw strings, providing compile-time type safety for storage operations. Update storage layer to use typed parameters, refactor cache to use variant storage keyed by VariantKey, and implement source content reuse on cache misses.
342 lines
10 KiB
Go
342 lines
10 KiB
Go
package imgcache
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Cache errors.
|
|
var (
|
|
ErrCacheMiss = errors.New("cache miss")
|
|
ErrNegativeCache = errors.New("negative cache hit")
|
|
)
|
|
|
|
// HTTP status code for successful fetch.
|
|
const httpStatusOK = 200
|
|
|
|
// CacheConfig holds cache configuration.
|
|
type CacheConfig struct {
|
|
StateDir string
|
|
CacheTTL time.Duration
|
|
NegativeTTL time.Duration
|
|
}
|
|
|
|
// variantMeta stores content type for fast cache hits without reading .meta file.
|
|
type variantMeta struct {
|
|
ContentType string
|
|
Size int64
|
|
}
|
|
|
|
// Cache implements the caching layer for the image proxy.
|
|
type Cache struct {
|
|
db *sql.DB
|
|
srcContent *ContentStorage // source images by content hash
|
|
variants *VariantStorage // processed variants by cache key
|
|
srcMetadata *MetadataStorage // source metadata by host/path
|
|
config CacheConfig
|
|
|
|
// In-memory cache of variant metadata (content type, size) to avoid reading .meta files
|
|
metaCache map[VariantKey]variantMeta
|
|
metaCacheMu sync.RWMutex
|
|
}
|
|
|
|
// NewCache creates a new cache instance.
|
|
func NewCache(db *sql.DB, config CacheConfig) (*Cache, error) {
|
|
srcContent, err := NewContentStorage(filepath.Join(config.StateDir, "cache", "sources"))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create source content storage: %w", err)
|
|
}
|
|
|
|
variants, err := NewVariantStorage(filepath.Join(config.StateDir, "cache", "variants"))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create variant storage: %w", err)
|
|
}
|
|
|
|
srcMetadata, err := NewMetadataStorage(filepath.Join(config.StateDir, "cache", "metadata"))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create source metadata storage: %w", err)
|
|
}
|
|
|
|
return &Cache{
|
|
db: db,
|
|
srcContent: srcContent,
|
|
variants: variants,
|
|
srcMetadata: srcMetadata,
|
|
config: config,
|
|
metaCache: make(map[VariantKey]variantMeta),
|
|
}, nil
|
|
}
|
|
|
|
// LookupResult contains the result of a cache lookup.
|
|
type LookupResult struct {
|
|
Hit bool
|
|
CacheKey VariantKey
|
|
ContentType string
|
|
SizeBytes int64
|
|
CacheStatus CacheStatus
|
|
}
|
|
|
|
// Lookup checks if a processed variant exists on disk (no DB access for hits).
|
|
func (c *Cache) Lookup(_ context.Context, req *ImageRequest) (*LookupResult, error) {
|
|
cacheKey := CacheKey(req)
|
|
|
|
// Check variant storage directly - no DB needed for cache hits
|
|
if c.variants.Exists(cacheKey) {
|
|
return &LookupResult{
|
|
Hit: true,
|
|
CacheKey: cacheKey,
|
|
CacheStatus: CacheHit,
|
|
}, nil
|
|
}
|
|
|
|
return &LookupResult{
|
|
Hit: false,
|
|
CacheKey: cacheKey,
|
|
CacheStatus: CacheMiss,
|
|
}, nil
|
|
}
|
|
|
|
// GetVariant returns a reader, size, and content type for a cached variant.
|
|
func (c *Cache) GetVariant(cacheKey VariantKey) (io.ReadCloser, int64, string, error) {
|
|
return c.variants.LoadWithMeta(cacheKey)
|
|
}
|
|
|
|
// StoreSource stores fetched source content and metadata.
|
|
func (c *Cache) StoreSource(
|
|
ctx context.Context,
|
|
req *ImageRequest,
|
|
content io.Reader,
|
|
result *FetchResult,
|
|
) (ContentHash, error) {
|
|
// Store content
|
|
contentHash, size, err := c.srcContent.Store(content)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to store source content: %w", err)
|
|
}
|
|
|
|
// Store in database
|
|
pathHash := HashPath(req.SourcePath + "?" + req.SourceQuery)
|
|
headersJSON, _ := json.Marshal(result.Headers)
|
|
|
|
_, err = c.db.ExecContext(ctx, `
|
|
INSERT INTO source_content (content_hash, content_type, size_bytes)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(content_hash) DO NOTHING
|
|
`, contentHash, result.ContentType, size)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to insert source content: %w", err)
|
|
}
|
|
|
|
_, err = c.db.ExecContext(ctx, `
|
|
INSERT INTO source_metadata
|
|
(source_host, source_path, source_query, path_hash,
|
|
content_hash, status_code, content_type, response_headers)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(source_host, source_path, source_query) DO UPDATE SET
|
|
content_hash = excluded.content_hash,
|
|
status_code = excluded.status_code,
|
|
content_type = excluded.content_type,
|
|
response_headers = excluded.response_headers,
|
|
fetched_at = CURRENT_TIMESTAMP
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery, pathHash,
|
|
contentHash, httpStatusOK, result.ContentType, string(headersJSON))
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to insert source metadata: %w", err)
|
|
}
|
|
|
|
// Store metadata JSON file
|
|
meta := &SourceMetadata{
|
|
Host: req.SourceHost,
|
|
Path: req.SourcePath,
|
|
Query: req.SourceQuery,
|
|
ContentHash: string(contentHash),
|
|
StatusCode: result.StatusCode,
|
|
ContentType: result.ContentType,
|
|
ContentLength: result.ContentLength,
|
|
ResponseHeaders: result.Headers,
|
|
FetchedAt: time.Now().UTC().Unix(),
|
|
FetchDurationMs: result.FetchDurationMs,
|
|
RemoteAddr: result.RemoteAddr,
|
|
}
|
|
|
|
if err := c.srcMetadata.Store(req.SourceHost, pathHash, meta); err != nil {
|
|
// Non-fatal, we have it in the database
|
|
_ = err
|
|
}
|
|
|
|
return contentHash, nil
|
|
}
|
|
|
|
// StoreVariant stores a processed variant by its cache key.
|
|
func (c *Cache) StoreVariant(cacheKey VariantKey, content io.Reader, contentType string) error {
|
|
_, err := c.variants.Store(cacheKey, content, contentType)
|
|
return err
|
|
}
|
|
|
|
// LookupSource checks if we have cached source content for a request.
|
|
// Returns the content hash and content type if found, or empty values if not.
|
|
func (c *Cache) LookupSource(ctx context.Context, req *ImageRequest) (ContentHash, string, error) {
|
|
var hashStr, contentType string
|
|
|
|
err := c.db.QueryRowContext(ctx, `
|
|
SELECT content_hash, content_type FROM source_metadata
|
|
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&hashStr, &contentType)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return "", "", nil
|
|
}
|
|
|
|
if err != nil {
|
|
return "", "", fmt.Errorf("failed to lookup source: %w", err)
|
|
}
|
|
|
|
contentHash := ContentHash(hashStr)
|
|
|
|
// Verify the content file exists
|
|
if !c.srcContent.Exists(contentHash) {
|
|
return "", "", nil
|
|
}
|
|
|
|
return contentHash, contentType, nil
|
|
}
|
|
|
|
// StoreNegative stores a negative cache entry for a failed fetch.
|
|
func (c *Cache) StoreNegative(ctx context.Context, req *ImageRequest, statusCode int, errMsg string) error {
|
|
expiresAt := time.Now().UTC().Add(c.config.NegativeTTL)
|
|
|
|
_, err := c.db.ExecContext(ctx, `
|
|
INSERT INTO negative_cache (source_host, source_path, source_query, status_code, error_message, expires_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(source_host, source_path, source_query) DO UPDATE SET
|
|
status_code = excluded.status_code,
|
|
error_message = excluded.error_message,
|
|
fetched_at = CURRENT_TIMESTAMP,
|
|
expires_at = excluded.expires_at
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery, statusCode, errMsg, expiresAt)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert negative cache: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// checkNegativeCache checks if a request is in the negative cache.
|
|
func (c *Cache) checkNegativeCache(ctx context.Context, req *ImageRequest) (bool, error) {
|
|
var expiresAt time.Time
|
|
|
|
err := c.db.QueryRowContext(ctx, `
|
|
SELECT expires_at FROM negative_cache
|
|
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&expiresAt)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return false, nil
|
|
}
|
|
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to check negative cache: %w", err)
|
|
}
|
|
|
|
// Check if expired
|
|
if time.Now().After(expiresAt) {
|
|
// Clean up expired entry
|
|
_, _ = c.db.ExecContext(ctx, `
|
|
DELETE FROM negative_cache
|
|
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery)
|
|
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// GetSourceMetadataID returns the source metadata ID for a request.
|
|
func (c *Cache) GetSourceMetadataID(ctx context.Context, req *ImageRequest) (int64, error) {
|
|
var id int64
|
|
|
|
err := c.db.QueryRowContext(ctx, `
|
|
SELECT id FROM source_metadata
|
|
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
|
`, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&id)
|
|
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to get source metadata ID: %w", err)
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// GetSourceContent returns a reader for cached source content by its hash.
|
|
func (c *Cache) GetSourceContent(contentHash ContentHash) (io.ReadCloser, error) {
|
|
return c.srcContent.Load(contentHash)
|
|
}
|
|
|
|
// CleanExpired removes expired entries from the cache.
|
|
func (c *Cache) CleanExpired(ctx context.Context) error {
|
|
// Clean expired negative cache entries
|
|
_, err := c.db.ExecContext(ctx, `
|
|
DELETE FROM negative_cache WHERE expires_at < CURRENT_TIMESTAMP
|
|
`)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to clean negative cache: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stats returns cache statistics.
|
|
func (c *Cache) Stats(ctx context.Context) (*CacheStats, error) {
|
|
var stats CacheStats
|
|
|
|
err := c.db.QueryRowContext(ctx, `
|
|
SELECT hit_count, miss_count, upstream_fetch_count, upstream_fetch_bytes, transform_count
|
|
FROM cache_stats WHERE id = 1
|
|
`).Scan(&stats.HitCount, &stats.MissCount, &stats.TotalItems, &stats.TotalSizeBytes, &stats.HitRate)
|
|
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
return nil, fmt.Errorf("failed to get cache stats: %w", err)
|
|
}
|
|
|
|
// Get actual counts
|
|
_ = c.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM request_cache`).Scan(&stats.TotalItems)
|
|
_ = c.db.QueryRowContext(ctx, `SELECT COALESCE(SUM(size_bytes), 0) FROM output_content`).Scan(&stats.TotalSizeBytes)
|
|
|
|
if stats.HitCount+stats.MissCount > 0 {
|
|
stats.HitRate = float64(stats.HitCount) / float64(stats.HitCount+stats.MissCount)
|
|
}
|
|
|
|
return &stats, nil
|
|
}
|
|
|
|
// IncrementStats increments cache statistics.
|
|
func (c *Cache) IncrementStats(ctx context.Context, hit bool, fetchBytes int64) {
|
|
if hit {
|
|
_, _ = c.db.ExecContext(ctx, `
|
|
UPDATE cache_stats SET hit_count = hit_count + 1, last_updated_at = CURRENT_TIMESTAMP WHERE id = 1
|
|
`)
|
|
} else {
|
|
_, _ = c.db.ExecContext(ctx, `
|
|
UPDATE cache_stats SET miss_count = miss_count + 1, last_updated_at = CURRENT_TIMESTAMP WHERE id = 1
|
|
`)
|
|
}
|
|
|
|
if fetchBytes > 0 {
|
|
_, _ = c.db.ExecContext(ctx, `
|
|
UPDATE cache_stats
|
|
SET upstream_fetch_count = upstream_fetch_count + 1,
|
|
upstream_fetch_bytes = upstream_fetch_bytes + ?,
|
|
last_updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = 1
|
|
`, fetchBytes)
|
|
}
|
|
}
|