Add cache service with hot cache and negative caching
Implements cache lookup with in-memory hot path, source/output storage, negative caching for failed fetches, TTL expiration, and statistics tracking.
This commit is contained in:
412
internal/imgcache/cache.go
Normal file
412
internal/imgcache/cache.go
Normal file
@@ -0,0 +1,412 @@
|
||||
package imgcache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Cache errors.
|
||||
var (
|
||||
ErrCacheMiss = errors.New("cache miss")
|
||||
ErrNegativeCache = errors.New("negative cache hit")
|
||||
)
|
||||
|
||||
// HTTP status code for successful fetch.
|
||||
const httpStatusOK = 200
|
||||
|
||||
// CacheConfig holds cache configuration.
|
||||
type CacheConfig struct {
|
||||
StateDir string
|
||||
CacheTTL time.Duration
|
||||
NegativeTTL time.Duration
|
||||
HotCacheSize int
|
||||
HotCacheEnabled bool
|
||||
}
|
||||
|
||||
// Cache implements the caching layer for the image proxy.
|
||||
type Cache struct {
|
||||
db *sql.DB
|
||||
srcContent *ContentStorage
|
||||
dstContent *ContentStorage
|
||||
srcMetadata *MetadataStorage
|
||||
config CacheConfig
|
||||
hotCache map[string]string // cache_key -> output_hash
|
||||
hotCacheMu sync.RWMutex
|
||||
hotCacheEnabled bool
|
||||
}
|
||||
|
||||
// NewCache creates a new cache instance.
|
||||
func NewCache(db *sql.DB, config CacheConfig) (*Cache, error) {
|
||||
srcContent, err := NewContentStorage(filepath.Join(config.StateDir, "cache", "src-content"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create source content storage: %w", err)
|
||||
}
|
||||
|
||||
dstContent, err := NewContentStorage(filepath.Join(config.StateDir, "cache", "dst-content"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create destination content storage: %w", err)
|
||||
}
|
||||
|
||||
srcMetadata, err := NewMetadataStorage(filepath.Join(config.StateDir, "cache", "src-metadata"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create source metadata storage: %w", err)
|
||||
}
|
||||
|
||||
c := &Cache{
|
||||
db: db,
|
||||
srcContent: srcContent,
|
||||
dstContent: dstContent,
|
||||
srcMetadata: srcMetadata,
|
||||
config: config,
|
||||
hotCacheEnabled: config.HotCacheEnabled,
|
||||
}
|
||||
|
||||
if config.HotCacheEnabled && config.HotCacheSize > 0 {
|
||||
c.hotCache = make(map[string]string, config.HotCacheSize)
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// LookupResult contains the result of a cache lookup.
|
||||
type LookupResult struct {
|
||||
Hit bool
|
||||
OutputHash string
|
||||
ContentType string
|
||||
CacheStatus CacheStatus
|
||||
}
|
||||
|
||||
// Lookup checks if a processed image exists in the cache.
|
||||
func (c *Cache) Lookup(ctx context.Context, req *ImageRequest) (*LookupResult, error) {
|
||||
cacheKey := CacheKey(req)
|
||||
|
||||
// Check hot cache first
|
||||
if c.hotCacheEnabled {
|
||||
c.hotCacheMu.RLock()
|
||||
outputHash, ok := c.hotCache[cacheKey]
|
||||
c.hotCacheMu.RUnlock()
|
||||
|
||||
if ok && c.dstContent.Exists(outputHash) {
|
||||
return &LookupResult{
|
||||
Hit: true,
|
||||
OutputHash: outputHash,
|
||||
CacheStatus: CacheHit,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Check negative cache
|
||||
negCached, err := c.checkNegativeCache(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if negCached {
|
||||
return nil, ErrNegativeCache
|
||||
}
|
||||
|
||||
// Check database
|
||||
var outputHash, contentType string
|
||||
var fetchedAt time.Time
|
||||
|
||||
err = c.db.QueryRowContext(ctx, `
|
||||
SELECT rc.output_hash, oc.content_type, rc.fetched_at
|
||||
FROM request_cache rc
|
||||
JOIN output_content oc ON rc.output_hash = oc.content_hash
|
||||
WHERE rc.cache_key = ?
|
||||
`, cacheKey).Scan(&outputHash, &contentType, &fetchedAt)
|
||||
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return &LookupResult{Hit: false, CacheStatus: CacheMiss}, nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query cache: %w", err)
|
||||
}
|
||||
|
||||
// Check TTL
|
||||
if c.config.CacheTTL > 0 && time.Since(fetchedAt) > c.config.CacheTTL {
|
||||
return &LookupResult{Hit: false, CacheStatus: CacheStale}, nil
|
||||
}
|
||||
|
||||
// Verify file exists on disk
|
||||
if !c.dstContent.Exists(outputHash) {
|
||||
return &LookupResult{Hit: false, CacheStatus: CacheMiss}, nil
|
||||
}
|
||||
|
||||
// Update hot cache
|
||||
if c.hotCacheEnabled {
|
||||
c.hotCacheMu.Lock()
|
||||
c.hotCache[cacheKey] = outputHash
|
||||
c.hotCacheMu.Unlock()
|
||||
}
|
||||
|
||||
// Update access count
|
||||
_, _ = c.db.ExecContext(ctx, `
|
||||
UPDATE request_cache
|
||||
SET access_count = access_count + 1
|
||||
WHERE cache_key = ?
|
||||
`, cacheKey)
|
||||
|
||||
return &LookupResult{
|
||||
Hit: true,
|
||||
OutputHash: outputHash,
|
||||
ContentType: contentType,
|
||||
CacheStatus: CacheHit,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetOutput returns a reader for cached output content.
|
||||
func (c *Cache) GetOutput(outputHash string) (io.ReadCloser, error) {
|
||||
return c.dstContent.Load(outputHash)
|
||||
}
|
||||
|
||||
// StoreSource stores fetched source content and metadata.
|
||||
func (c *Cache) StoreSource(
|
||||
ctx context.Context,
|
||||
req *ImageRequest,
|
||||
content io.Reader,
|
||||
result *FetchResult,
|
||||
) (contentHash string, err error) {
|
||||
// Store content
|
||||
contentHash, size, err := c.srcContent.Store(content)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to store source content: %w", err)
|
||||
}
|
||||
|
||||
// Store in database
|
||||
pathHash := HashPath(req.SourcePath + "?" + req.SourceQuery)
|
||||
headersJSON, _ := json.Marshal(result.Headers)
|
||||
|
||||
_, err = c.db.ExecContext(ctx, `
|
||||
INSERT INTO source_content (content_hash, content_type, size_bytes)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT(content_hash) DO NOTHING
|
||||
`, contentHash, result.ContentType, size)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to insert source content: %w", err)
|
||||
}
|
||||
|
||||
_, err = c.db.ExecContext(ctx, `
|
||||
INSERT INTO source_metadata
|
||||
(source_host, source_path, source_query, path_hash,
|
||||
content_hash, status_code, content_type, response_headers)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(source_host, source_path, source_query) DO UPDATE SET
|
||||
content_hash = excluded.content_hash,
|
||||
status_code = excluded.status_code,
|
||||
content_type = excluded.content_type,
|
||||
response_headers = excluded.response_headers,
|
||||
fetched_at = CURRENT_TIMESTAMP
|
||||
`, req.SourceHost, req.SourcePath, req.SourceQuery, pathHash,
|
||||
contentHash, httpStatusOK, result.ContentType, string(headersJSON))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to insert source metadata: %w", err)
|
||||
}
|
||||
|
||||
// Store metadata JSON file
|
||||
meta := &SourceMetadata{
|
||||
Host: req.SourceHost,
|
||||
Path: req.SourcePath,
|
||||
Query: req.SourceQuery,
|
||||
ContentHash: contentHash,
|
||||
StatusCode: httpStatusOK,
|
||||
ContentType: result.ContentType,
|
||||
ResponseHeaders: result.Headers,
|
||||
FetchedAt: time.Now().Unix(),
|
||||
}
|
||||
|
||||
if err := c.srcMetadata.Store(req.SourceHost, pathHash, meta); err != nil {
|
||||
// Non-fatal, we have it in the database
|
||||
_ = err
|
||||
}
|
||||
|
||||
return contentHash, nil
|
||||
}
|
||||
|
||||
// StoreOutput stores processed output content.
|
||||
func (c *Cache) StoreOutput(
|
||||
ctx context.Context,
|
||||
req *ImageRequest,
|
||||
sourceMetadataID int64,
|
||||
content io.Reader,
|
||||
contentType string,
|
||||
) error {
|
||||
// Store content
|
||||
outputHash, size, err := c.dstContent.Store(content)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to store output content: %w", err)
|
||||
}
|
||||
|
||||
cacheKey := CacheKey(req)
|
||||
|
||||
// Store in database
|
||||
_, err = c.db.ExecContext(ctx, `
|
||||
INSERT INTO output_content (content_hash, content_type, size_bytes)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT(content_hash) DO NOTHING
|
||||
`, outputHash, contentType, size)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert output content: %w", err)
|
||||
}
|
||||
|
||||
_, err = c.db.ExecContext(ctx, `
|
||||
INSERT INTO request_cache (cache_key, source_metadata_id, output_hash, width, height, format, quality, fit_mode)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(cache_key) DO UPDATE SET
|
||||
output_hash = excluded.output_hash,
|
||||
fetched_at = CURRENT_TIMESTAMP,
|
||||
access_count = request_cache.access_count + 1
|
||||
`, cacheKey, sourceMetadataID, outputHash, req.Size.Width, req.Size.Height, req.Format, req.Quality, req.FitMode)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert request cache: %w", err)
|
||||
}
|
||||
|
||||
// Update hot cache
|
||||
if c.hotCacheEnabled {
|
||||
c.hotCacheMu.Lock()
|
||||
c.hotCache[cacheKey] = outputHash
|
||||
c.hotCacheMu.Unlock()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StoreNegative stores a negative cache entry for a failed fetch.
|
||||
func (c *Cache) StoreNegative(ctx context.Context, req *ImageRequest, statusCode int, errMsg string) error {
|
||||
expiresAt := time.Now().Add(c.config.NegativeTTL)
|
||||
|
||||
_, err := c.db.ExecContext(ctx, `
|
||||
INSERT INTO negative_cache (source_host, source_path, source_query, status_code, error_message, expires_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(source_host, source_path, source_query) DO UPDATE SET
|
||||
status_code = excluded.status_code,
|
||||
error_message = excluded.error_message,
|
||||
fetched_at = CURRENT_TIMESTAMP,
|
||||
expires_at = excluded.expires_at
|
||||
`, req.SourceHost, req.SourcePath, req.SourceQuery, statusCode, errMsg, expiresAt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert negative cache: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkNegativeCache checks if a request is in the negative cache.
|
||||
func (c *Cache) checkNegativeCache(ctx context.Context, req *ImageRequest) (bool, error) {
|
||||
var expiresAt time.Time
|
||||
|
||||
err := c.db.QueryRowContext(ctx, `
|
||||
SELECT expires_at FROM negative_cache
|
||||
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
||||
`, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&expiresAt)
|
||||
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to check negative cache: %w", err)
|
||||
}
|
||||
|
||||
// Check if expired
|
||||
if time.Now().After(expiresAt) {
|
||||
// Clean up expired entry
|
||||
_, _ = c.db.ExecContext(ctx, `
|
||||
DELETE FROM negative_cache
|
||||
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
||||
`, req.SourceHost, req.SourcePath, req.SourceQuery)
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// GetSourceMetadataID returns the source metadata ID for a request.
|
||||
func (c *Cache) GetSourceMetadataID(ctx context.Context, req *ImageRequest) (int64, error) {
|
||||
var id int64
|
||||
|
||||
err := c.db.QueryRowContext(ctx, `
|
||||
SELECT id FROM source_metadata
|
||||
WHERE source_host = ? AND source_path = ? AND source_query = ?
|
||||
`, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&id)
|
||||
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get source metadata ID: %w", err)
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// GetSourceContent returns a reader for cached source content by its hash.
|
||||
func (c *Cache) GetSourceContent(contentHash string) (io.ReadCloser, error) {
|
||||
return c.srcContent.Load(contentHash)
|
||||
}
|
||||
|
||||
// CleanExpired removes expired entries from the cache.
|
||||
func (c *Cache) CleanExpired(ctx context.Context) error {
|
||||
// Clean expired negative cache entries
|
||||
_, err := c.db.ExecContext(ctx, `
|
||||
DELETE FROM negative_cache WHERE expires_at < CURRENT_TIMESTAMP
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to clean negative cache: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stats returns cache statistics.
|
||||
func (c *Cache) Stats(ctx context.Context) (*CacheStats, error) {
|
||||
var stats CacheStats
|
||||
|
||||
err := c.db.QueryRowContext(ctx, `
|
||||
SELECT hit_count, miss_count, upstream_fetch_count, upstream_fetch_bytes, transform_count
|
||||
FROM cache_stats WHERE id = 1
|
||||
`).Scan(&stats.HitCount, &stats.MissCount, &stats.TotalItems, &stats.TotalSizeBytes, &stats.HitRate)
|
||||
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, fmt.Errorf("failed to get cache stats: %w", err)
|
||||
}
|
||||
|
||||
// Get actual counts
|
||||
_ = c.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM request_cache`).Scan(&stats.TotalItems)
|
||||
_ = c.db.QueryRowContext(ctx, `SELECT COALESCE(SUM(size_bytes), 0) FROM output_content`).Scan(&stats.TotalSizeBytes)
|
||||
|
||||
if stats.HitCount+stats.MissCount > 0 {
|
||||
stats.HitRate = float64(stats.HitCount) / float64(stats.HitCount+stats.MissCount)
|
||||
}
|
||||
|
||||
return &stats, nil
|
||||
}
|
||||
|
||||
// IncrementStats increments cache statistics.
|
||||
func (c *Cache) IncrementStats(ctx context.Context, hit bool, fetchBytes int64) {
|
||||
if hit {
|
||||
_, _ = c.db.ExecContext(ctx, `
|
||||
UPDATE cache_stats SET hit_count = hit_count + 1, last_updated_at = CURRENT_TIMESTAMP WHERE id = 1
|
||||
`)
|
||||
} else {
|
||||
_, _ = c.db.ExecContext(ctx, `
|
||||
UPDATE cache_stats SET miss_count = miss_count + 1, last_updated_at = CURRENT_TIMESTAMP WHERE id = 1
|
||||
`)
|
||||
}
|
||||
|
||||
if fetchBytes > 0 {
|
||||
_, _ = c.db.ExecContext(ctx, `
|
||||
UPDATE cache_stats
|
||||
SET upstream_fetch_count = upstream_fetch_count + 1,
|
||||
upstream_fetch_bytes = upstream_fetch_bytes + ?,
|
||||
last_updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = 1
|
||||
`, fetchBytes)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user