diff --git a/internal/imgcache/cache.go b/internal/imgcache/cache.go new file mode 100644 index 0000000..2a6a628 --- /dev/null +++ b/internal/imgcache/cache.go @@ -0,0 +1,412 @@ +package imgcache + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "io" + "path/filepath" + "sync" + "time" +) + +// Cache errors. +var ( + ErrCacheMiss = errors.New("cache miss") + ErrNegativeCache = errors.New("negative cache hit") +) + +// HTTP status code for successful fetch. +const httpStatusOK = 200 + +// CacheConfig holds cache configuration. +type CacheConfig struct { + StateDir string + CacheTTL time.Duration + NegativeTTL time.Duration + HotCacheSize int + HotCacheEnabled bool +} + +// Cache implements the caching layer for the image proxy. +type Cache struct { + db *sql.DB + srcContent *ContentStorage + dstContent *ContentStorage + srcMetadata *MetadataStorage + config CacheConfig + hotCache map[string]string // cache_key -> output_hash + hotCacheMu sync.RWMutex + hotCacheEnabled bool +} + +// NewCache creates a new cache instance. +func NewCache(db *sql.DB, config CacheConfig) (*Cache, error) { + srcContent, err := NewContentStorage(filepath.Join(config.StateDir, "cache", "src-content")) + if err != nil { + return nil, fmt.Errorf("failed to create source content storage: %w", err) + } + + dstContent, err := NewContentStorage(filepath.Join(config.StateDir, "cache", "dst-content")) + if err != nil { + return nil, fmt.Errorf("failed to create destination content storage: %w", err) + } + + srcMetadata, err := NewMetadataStorage(filepath.Join(config.StateDir, "cache", "src-metadata")) + if err != nil { + return nil, fmt.Errorf("failed to create source metadata storage: %w", err) + } + + c := &Cache{ + db: db, + srcContent: srcContent, + dstContent: dstContent, + srcMetadata: srcMetadata, + config: config, + hotCacheEnabled: config.HotCacheEnabled, + } + + if config.HotCacheEnabled && config.HotCacheSize > 0 { + c.hotCache = make(map[string]string, config.HotCacheSize) + } + + return c, nil +} + +// LookupResult contains the result of a cache lookup. +type LookupResult struct { + Hit bool + OutputHash string + ContentType string + CacheStatus CacheStatus +} + +// Lookup checks if a processed image exists in the cache. +func (c *Cache) Lookup(ctx context.Context, req *ImageRequest) (*LookupResult, error) { + cacheKey := CacheKey(req) + + // Check hot cache first + if c.hotCacheEnabled { + c.hotCacheMu.RLock() + outputHash, ok := c.hotCache[cacheKey] + c.hotCacheMu.RUnlock() + + if ok && c.dstContent.Exists(outputHash) { + return &LookupResult{ + Hit: true, + OutputHash: outputHash, + CacheStatus: CacheHit, + }, nil + } + } + + // Check negative cache + negCached, err := c.checkNegativeCache(ctx, req) + if err != nil { + return nil, err + } + + if negCached { + return nil, ErrNegativeCache + } + + // Check database + var outputHash, contentType string + var fetchedAt time.Time + + err = c.db.QueryRowContext(ctx, ` + SELECT rc.output_hash, oc.content_type, rc.fetched_at + FROM request_cache rc + JOIN output_content oc ON rc.output_hash = oc.content_hash + WHERE rc.cache_key = ? + `, cacheKey).Scan(&outputHash, &contentType, &fetchedAt) + + if errors.Is(err, sql.ErrNoRows) { + return &LookupResult{Hit: false, CacheStatus: CacheMiss}, nil + } + + if err != nil { + return nil, fmt.Errorf("failed to query cache: %w", err) + } + + // Check TTL + if c.config.CacheTTL > 0 && time.Since(fetchedAt) > c.config.CacheTTL { + return &LookupResult{Hit: false, CacheStatus: CacheStale}, nil + } + + // Verify file exists on disk + if !c.dstContent.Exists(outputHash) { + return &LookupResult{Hit: false, CacheStatus: CacheMiss}, nil + } + + // Update hot cache + if c.hotCacheEnabled { + c.hotCacheMu.Lock() + c.hotCache[cacheKey] = outputHash + c.hotCacheMu.Unlock() + } + + // Update access count + _, _ = c.db.ExecContext(ctx, ` + UPDATE request_cache + SET access_count = access_count + 1 + WHERE cache_key = ? + `, cacheKey) + + return &LookupResult{ + Hit: true, + OutputHash: outputHash, + ContentType: contentType, + CacheStatus: CacheHit, + }, nil +} + +// GetOutput returns a reader for cached output content. +func (c *Cache) GetOutput(outputHash string) (io.ReadCloser, error) { + return c.dstContent.Load(outputHash) +} + +// StoreSource stores fetched source content and metadata. +func (c *Cache) StoreSource( + ctx context.Context, + req *ImageRequest, + content io.Reader, + result *FetchResult, +) (contentHash string, err error) { + // Store content + contentHash, size, err := c.srcContent.Store(content) + if err != nil { + return "", fmt.Errorf("failed to store source content: %w", err) + } + + // Store in database + pathHash := HashPath(req.SourcePath + "?" + req.SourceQuery) + headersJSON, _ := json.Marshal(result.Headers) + + _, err = c.db.ExecContext(ctx, ` + INSERT INTO source_content (content_hash, content_type, size_bytes) + VALUES (?, ?, ?) + ON CONFLICT(content_hash) DO NOTHING + `, contentHash, result.ContentType, size) + if err != nil { + return "", fmt.Errorf("failed to insert source content: %w", err) + } + + _, err = c.db.ExecContext(ctx, ` + INSERT INTO source_metadata + (source_host, source_path, source_query, path_hash, + content_hash, status_code, content_type, response_headers) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(source_host, source_path, source_query) DO UPDATE SET + content_hash = excluded.content_hash, + status_code = excluded.status_code, + content_type = excluded.content_type, + response_headers = excluded.response_headers, + fetched_at = CURRENT_TIMESTAMP + `, req.SourceHost, req.SourcePath, req.SourceQuery, pathHash, + contentHash, httpStatusOK, result.ContentType, string(headersJSON)) + if err != nil { + return "", fmt.Errorf("failed to insert source metadata: %w", err) + } + + // Store metadata JSON file + meta := &SourceMetadata{ + Host: req.SourceHost, + Path: req.SourcePath, + Query: req.SourceQuery, + ContentHash: contentHash, + StatusCode: httpStatusOK, + ContentType: result.ContentType, + ResponseHeaders: result.Headers, + FetchedAt: time.Now().Unix(), + } + + if err := c.srcMetadata.Store(req.SourceHost, pathHash, meta); err != nil { + // Non-fatal, we have it in the database + _ = err + } + + return contentHash, nil +} + +// StoreOutput stores processed output content. +func (c *Cache) StoreOutput( + ctx context.Context, + req *ImageRequest, + sourceMetadataID int64, + content io.Reader, + contentType string, +) error { + // Store content + outputHash, size, err := c.dstContent.Store(content) + if err != nil { + return fmt.Errorf("failed to store output content: %w", err) + } + + cacheKey := CacheKey(req) + + // Store in database + _, err = c.db.ExecContext(ctx, ` + INSERT INTO output_content (content_hash, content_type, size_bytes) + VALUES (?, ?, ?) + ON CONFLICT(content_hash) DO NOTHING + `, outputHash, contentType, size) + if err != nil { + return fmt.Errorf("failed to insert output content: %w", err) + } + + _, err = c.db.ExecContext(ctx, ` + INSERT INTO request_cache (cache_key, source_metadata_id, output_hash, width, height, format, quality, fit_mode) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(cache_key) DO UPDATE SET + output_hash = excluded.output_hash, + fetched_at = CURRENT_TIMESTAMP, + access_count = request_cache.access_count + 1 + `, cacheKey, sourceMetadataID, outputHash, req.Size.Width, req.Size.Height, req.Format, req.Quality, req.FitMode) + if err != nil { + return fmt.Errorf("failed to insert request cache: %w", err) + } + + // Update hot cache + if c.hotCacheEnabled { + c.hotCacheMu.Lock() + c.hotCache[cacheKey] = outputHash + c.hotCacheMu.Unlock() + } + + return nil +} + +// StoreNegative stores a negative cache entry for a failed fetch. +func (c *Cache) StoreNegative(ctx context.Context, req *ImageRequest, statusCode int, errMsg string) error { + expiresAt := time.Now().Add(c.config.NegativeTTL) + + _, err := c.db.ExecContext(ctx, ` + INSERT INTO negative_cache (source_host, source_path, source_query, status_code, error_message, expires_at) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(source_host, source_path, source_query) DO UPDATE SET + status_code = excluded.status_code, + error_message = excluded.error_message, + fetched_at = CURRENT_TIMESTAMP, + expires_at = excluded.expires_at + `, req.SourceHost, req.SourcePath, req.SourceQuery, statusCode, errMsg, expiresAt) + if err != nil { + return fmt.Errorf("failed to insert negative cache: %w", err) + } + + return nil +} + +// checkNegativeCache checks if a request is in the negative cache. +func (c *Cache) checkNegativeCache(ctx context.Context, req *ImageRequest) (bool, error) { + var expiresAt time.Time + + err := c.db.QueryRowContext(ctx, ` + SELECT expires_at FROM negative_cache + WHERE source_host = ? AND source_path = ? AND source_query = ? + `, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&expiresAt) + + if errors.Is(err, sql.ErrNoRows) { + return false, nil + } + + if err != nil { + return false, fmt.Errorf("failed to check negative cache: %w", err) + } + + // Check if expired + if time.Now().After(expiresAt) { + // Clean up expired entry + _, _ = c.db.ExecContext(ctx, ` + DELETE FROM negative_cache + WHERE source_host = ? AND source_path = ? AND source_query = ? + `, req.SourceHost, req.SourcePath, req.SourceQuery) + + return false, nil + } + + return true, nil +} + +// GetSourceMetadataID returns the source metadata ID for a request. +func (c *Cache) GetSourceMetadataID(ctx context.Context, req *ImageRequest) (int64, error) { + var id int64 + + err := c.db.QueryRowContext(ctx, ` + SELECT id FROM source_metadata + WHERE source_host = ? AND source_path = ? AND source_query = ? + `, req.SourceHost, req.SourcePath, req.SourceQuery).Scan(&id) + + if err != nil { + return 0, fmt.Errorf("failed to get source metadata ID: %w", err) + } + + return id, nil +} + +// GetSourceContent returns a reader for cached source content by its hash. +func (c *Cache) GetSourceContent(contentHash string) (io.ReadCloser, error) { + return c.srcContent.Load(contentHash) +} + +// CleanExpired removes expired entries from the cache. +func (c *Cache) CleanExpired(ctx context.Context) error { + // Clean expired negative cache entries + _, err := c.db.ExecContext(ctx, ` + DELETE FROM negative_cache WHERE expires_at < CURRENT_TIMESTAMP + `) + if err != nil { + return fmt.Errorf("failed to clean negative cache: %w", err) + } + + return nil +} + +// Stats returns cache statistics. +func (c *Cache) Stats(ctx context.Context) (*CacheStats, error) { + var stats CacheStats + + err := c.db.QueryRowContext(ctx, ` + SELECT hit_count, miss_count, upstream_fetch_count, upstream_fetch_bytes, transform_count + FROM cache_stats WHERE id = 1 + `).Scan(&stats.HitCount, &stats.MissCount, &stats.TotalItems, &stats.TotalSizeBytes, &stats.HitRate) + + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("failed to get cache stats: %w", err) + } + + // Get actual counts + _ = c.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM request_cache`).Scan(&stats.TotalItems) + _ = c.db.QueryRowContext(ctx, `SELECT COALESCE(SUM(size_bytes), 0) FROM output_content`).Scan(&stats.TotalSizeBytes) + + if stats.HitCount+stats.MissCount > 0 { + stats.HitRate = float64(stats.HitCount) / float64(stats.HitCount+stats.MissCount) + } + + return &stats, nil +} + +// IncrementStats increments cache statistics. +func (c *Cache) IncrementStats(ctx context.Context, hit bool, fetchBytes int64) { + if hit { + _, _ = c.db.ExecContext(ctx, ` + UPDATE cache_stats SET hit_count = hit_count + 1, last_updated_at = CURRENT_TIMESTAMP WHERE id = 1 + `) + } else { + _, _ = c.db.ExecContext(ctx, ` + UPDATE cache_stats SET miss_count = miss_count + 1, last_updated_at = CURRENT_TIMESTAMP WHERE id = 1 + `) + } + + if fetchBytes > 0 { + _, _ = c.db.ExecContext(ctx, ` + UPDATE cache_stats + SET upstream_fetch_count = upstream_fetch_count + 1, + upstream_fetch_bytes = upstream_fetch_bytes + ?, + last_updated_at = CURRENT_TIMESTAMP + WHERE id = 1 + `, fetchBytes) + } +}