Refactor to serve all responses from cached files on disk
- StoreOutput now returns output hash for immediate retrieval - Cache misses now serve from disk file after storing (same as hits) - Log served_bytes from actual io.Copy result (avoids stat calls) - Remove ContentLength field usage for cache hits (stream from file) - Fix tests to properly check all return values
This commit is contained in:
@@ -124,26 +124,26 @@ func (s *Handlers) HandleImage() http.HandlerFunc {
|
|||||||
w.Header().Set("ETag", resp.ETag)
|
w.Header().Set("ETag", resp.ETag)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log cache status and timing
|
// Stream the response
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
|
||||||
|
servedBytes, err := io.Copy(w, resp.Content)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Error("failed to write response",
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log cache status and timing after serving
|
||||||
duration := time.Since(startTime)
|
duration := time.Since(startTime)
|
||||||
s.log.Info("image served",
|
s.log.Info("image served",
|
||||||
"cache_key", cacheKey,
|
"cache_key", cacheKey,
|
||||||
"cache_status", resp.CacheStatus,
|
"cache_status", resp.CacheStatus,
|
||||||
"duration_ms", duration.Milliseconds(),
|
"duration_ms", duration.Milliseconds(),
|
||||||
"format", req.Format,
|
"format", req.Format,
|
||||||
"served_bytes", resp.ContentLength,
|
"served_bytes", servedBytes,
|
||||||
"fetched_bytes", resp.FetchedBytes,
|
"fetched_bytes", resp.FetchedBytes,
|
||||||
)
|
)
|
||||||
|
|
||||||
// Stream the response
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
|
|
||||||
_, err = io.Copy(w, resp.Content)
|
|
||||||
if err != nil {
|
|
||||||
s.log.Error("failed to write response",
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -231,18 +231,18 @@ func (c *Cache) StoreSource(
|
|||||||
return contentHash, nil
|
return contentHash, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreOutput stores processed output content.
|
// StoreOutput stores processed output content and returns the output hash.
|
||||||
func (c *Cache) StoreOutput(
|
func (c *Cache) StoreOutput(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
req *ImageRequest,
|
req *ImageRequest,
|
||||||
sourceMetadataID int64,
|
sourceMetadataID int64,
|
||||||
content io.Reader,
|
content io.Reader,
|
||||||
contentType string,
|
contentType string,
|
||||||
) error {
|
) (string, error) {
|
||||||
// Store content
|
// Store content
|
||||||
outputHash, size, err := c.dstContent.Store(content)
|
outputHash, size, err := c.dstContent.Store(content)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to store output content: %w", err)
|
return "", fmt.Errorf("failed to store output content: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cacheKey := CacheKey(req)
|
cacheKey := CacheKey(req)
|
||||||
@@ -254,7 +254,7 @@ func (c *Cache) StoreOutput(
|
|||||||
ON CONFLICT(content_hash) DO NOTHING
|
ON CONFLICT(content_hash) DO NOTHING
|
||||||
`, outputHash, contentType, size)
|
`, outputHash, contentType, size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to insert output content: %w", err)
|
return "", fmt.Errorf("failed to insert output content: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = c.db.ExecContext(ctx, `
|
_, err = c.db.ExecContext(ctx, `
|
||||||
@@ -266,7 +266,7 @@ func (c *Cache) StoreOutput(
|
|||||||
access_count = request_cache.access_count + 1
|
access_count = request_cache.access_count + 1
|
||||||
`, cacheKey, sourceMetadataID, outputHash, req.Size.Width, req.Size.Height, req.Format, req.Quality, req.FitMode)
|
`, cacheKey, sourceMetadataID, outputHash, req.Size.Width, req.Size.Height, req.Format, req.Quality, req.FitMode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to insert request cache: %w", err)
|
return "", fmt.Errorf("failed to insert request cache: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update hot cache
|
// Update hot cache
|
||||||
@@ -276,7 +276,7 @@ func (c *Cache) StoreOutput(
|
|||||||
c.hotCacheMu.Unlock()
|
c.hotCacheMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return outputHash, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreNegative stores a negative cache entry for a failed fetch.
|
// StoreNegative stores a negative cache entry for a failed fetch.
|
||||||
|
|||||||
@@ -176,7 +176,7 @@ func TestCache_StoreAndLookup(t *testing.T) {
|
|||||||
|
|
||||||
// Store output content
|
// Store output content
|
||||||
outputContent := []byte("fake webp data")
|
outputContent := []byte("fake webp data")
|
||||||
err = cache.StoreOutput(ctx, req, metaID, bytes.NewReader(outputContent), "image/webp")
|
_, err = cache.StoreOutput(ctx, req, metaID, bytes.NewReader(outputContent), "image/webp")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("StoreOutput() error = %v", err)
|
t.Fatalf("StoreOutput() error = %v", err)
|
||||||
}
|
}
|
||||||
@@ -297,7 +297,7 @@ func TestCache_HotCache(t *testing.T) {
|
|||||||
metaID, _ := cache.GetSourceMetadataID(ctx, req)
|
metaID, _ := cache.GetSourceMetadataID(ctx, req)
|
||||||
|
|
||||||
outputContent := []byte("output data")
|
outputContent := []byte("output data")
|
||||||
err = cache.StoreOutput(ctx, req, metaID, bytes.NewReader(outputContent), "image/webp")
|
_, err = cache.StoreOutput(ctx, req, metaID, bytes.NewReader(outputContent), "image/webp")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("StoreOutput() error = %v", err)
|
t.Fatalf("StoreOutput() error = %v", err)
|
||||||
}
|
}
|
||||||
@@ -343,15 +343,30 @@ func TestCache_GetOutput(t *testing.T) {
|
|||||||
// Store content
|
// Store content
|
||||||
sourceContent := []byte("source")
|
sourceContent := []byte("source")
|
||||||
fetchResult := &FetchResult{ContentType: "image/jpeg", Headers: map[string][]string{}}
|
fetchResult := &FetchResult{ContentType: "image/jpeg", Headers: map[string][]string{}}
|
||||||
_, _ = cache.StoreSource(ctx, req, bytes.NewReader(sourceContent), fetchResult)
|
_, err := cache.StoreSource(ctx, req, bytes.NewReader(sourceContent), fetchResult)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("StoreSource() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
metaID, _ := cache.GetSourceMetadataID(ctx, req)
|
metaID, err := cache.GetSourceMetadataID(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetSourceMetadataID() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
outputContent := []byte("the actual output content")
|
outputContent := []byte("the actual output content")
|
||||||
_ = cache.StoreOutput(ctx, req, metaID, bytes.NewReader(outputContent), "image/webp")
|
outputHash, err := cache.StoreOutput(ctx, req, metaID, bytes.NewReader(outputContent), "image/webp")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("StoreOutput() error = %v", err)
|
||||||
|
}
|
||||||
|
if outputHash == "" {
|
||||||
|
t.Fatal("StoreOutput() returned empty hash")
|
||||||
|
}
|
||||||
|
|
||||||
// Lookup to get hash
|
// Lookup to get hash
|
||||||
result, _ := cache.Lookup(ctx, req)
|
result, err := cache.Lookup(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Lookup() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Get output content
|
// Get output content
|
||||||
reader, err := cache.GetOutput(result.OutputHash)
|
reader, err := cache.GetOutput(result.OutputHash)
|
||||||
|
|||||||
@@ -88,10 +88,9 @@ func (s *Service) Get(ctx context.Context, req *ImageRequest) (*ImageResponse, e
|
|||||||
// Fall through to re-fetch
|
// Fall through to re-fetch
|
||||||
} else {
|
} else {
|
||||||
return &ImageResponse{
|
return &ImageResponse{
|
||||||
Content: reader,
|
Content: reader,
|
||||||
ContentLength: -1, // Unknown until read
|
ContentType: result.ContentType,
|
||||||
ContentType: result.ContentType,
|
CacheStatus: CacheHit,
|
||||||
CacheStatus: CacheHit,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -152,27 +151,28 @@ func (s *Service) fetchAndProcess(ctx context.Context, req *ImageRequest) (*Imag
|
|||||||
return nil, fmt.Errorf("image processing failed: %w", err)
|
return nil, fmt.Errorf("image processing failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read processed data to cache it
|
// Store output content to cache
|
||||||
processedData, err := io.ReadAll(processResult.Content)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to read processed image: %w", err)
|
|
||||||
}
|
|
||||||
_ = processResult.Content.Close()
|
|
||||||
|
|
||||||
// Store output content
|
|
||||||
metaID, err := s.cache.GetSourceMetadataID(ctx, req)
|
metaID, err := s.cache.GetSourceMetadataID(ctx, req)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
err = s.cache.StoreOutput(ctx, req, metaID, bytes.NewReader(processedData), processResult.ContentType)
|
return nil, fmt.Errorf("failed to get source metadata ID: %w", err)
|
||||||
if err != nil {
|
}
|
||||||
s.log.Warn("failed to store output content", "error", err)
|
|
||||||
}
|
outputHash, err := s.cache.StoreOutput(ctx, req, metaID, processResult.Content, processResult.ContentType)
|
||||||
|
_ = processResult.Content.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to store output content: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Serve from the cached file on disk (same path as cache hits)
|
||||||
|
reader, err := s.cache.GetOutput(outputHash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to read cached output: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ImageResponse{
|
return &ImageResponse{
|
||||||
Content: io.NopCloser(bytes.NewReader(processedData)),
|
Content: reader,
|
||||||
ContentLength: int64(len(processedData)),
|
ContentType: processResult.ContentType,
|
||||||
ContentType: processResult.ContentType,
|
FetchedBytes: int64(len(sourceData)),
|
||||||
FetchedBytes: int64(len(sourceData)),
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user