- Add unified compression/encryption package in internal/blobgen - Update DATAMODEL.md to reflect current schema implementation - Refactor snapshot cleanup into well-named methods for clarity - Add snapshot_id to uploads table to track new blobs per snapshot - Fix blob count reporting for incremental backups - Add DeleteOrphaned method to BlobChunkRepository - Fix cleanup order to respect foreign key constraints - Update tests to reflect schema changes
153 lines
3.7 KiB
Go
153 lines
3.7 KiB
Go
package database
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
)
|
|
|
|
type BlobChunkRepository struct {
|
|
db *DB
|
|
}
|
|
|
|
func NewBlobChunkRepository(db *DB) *BlobChunkRepository {
|
|
return &BlobChunkRepository{db: db}
|
|
}
|
|
|
|
func (r *BlobChunkRepository) Create(ctx context.Context, tx *sql.Tx, bc *BlobChunk) error {
|
|
query := `
|
|
INSERT INTO blob_chunks (blob_id, chunk_hash, offset, length)
|
|
VALUES (?, ?, ?, ?)
|
|
`
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, bc.BlobID, bc.ChunkHash, bc.Offset, bc.Length)
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, bc.BlobID, bc.ChunkHash, bc.Offset, bc.Length)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("inserting blob_chunk: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *BlobChunkRepository) GetByBlobID(ctx context.Context, blobID string) ([]*BlobChunk, error) {
|
|
query := `
|
|
SELECT blob_id, chunk_hash, offset, length
|
|
FROM blob_chunks
|
|
WHERE blob_id = ?
|
|
ORDER BY offset
|
|
`
|
|
|
|
rows, err := r.db.conn.QueryContext(ctx, query, blobID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying blob chunks: %w", err)
|
|
}
|
|
defer CloseRows(rows)
|
|
|
|
var blobChunks []*BlobChunk
|
|
for rows.Next() {
|
|
var bc BlobChunk
|
|
err := rows.Scan(&bc.BlobID, &bc.ChunkHash, &bc.Offset, &bc.Length)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning blob chunk: %w", err)
|
|
}
|
|
blobChunks = append(blobChunks, &bc)
|
|
}
|
|
|
|
return blobChunks, rows.Err()
|
|
}
|
|
|
|
func (r *BlobChunkRepository) GetByChunkHash(ctx context.Context, chunkHash string) (*BlobChunk, error) {
|
|
query := `
|
|
SELECT blob_id, chunk_hash, offset, length
|
|
FROM blob_chunks
|
|
WHERE chunk_hash = ?
|
|
LIMIT 1
|
|
`
|
|
|
|
LogSQL("GetByChunkHash", query, chunkHash)
|
|
var bc BlobChunk
|
|
err := r.db.conn.QueryRowContext(ctx, query, chunkHash).Scan(
|
|
&bc.BlobID,
|
|
&bc.ChunkHash,
|
|
&bc.Offset,
|
|
&bc.Length,
|
|
)
|
|
|
|
if err == sql.ErrNoRows {
|
|
LogSQL("GetByChunkHash", "No rows found", chunkHash)
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
LogSQL("GetByChunkHash", "Error", chunkHash, err)
|
|
return nil, fmt.Errorf("querying blob chunk: %w", err)
|
|
}
|
|
|
|
LogSQL("GetByChunkHash", "Found blob", chunkHash, "blob", bc.BlobID)
|
|
return &bc, nil
|
|
}
|
|
|
|
// GetByChunkHashTx retrieves a blob chunk within a transaction
|
|
func (r *BlobChunkRepository) GetByChunkHashTx(ctx context.Context, tx *sql.Tx, chunkHash string) (*BlobChunk, error) {
|
|
query := `
|
|
SELECT blob_id, chunk_hash, offset, length
|
|
FROM blob_chunks
|
|
WHERE chunk_hash = ?
|
|
LIMIT 1
|
|
`
|
|
|
|
LogSQL("GetByChunkHashTx", query, chunkHash)
|
|
var bc BlobChunk
|
|
err := tx.QueryRowContext(ctx, query, chunkHash).Scan(
|
|
&bc.BlobID,
|
|
&bc.ChunkHash,
|
|
&bc.Offset,
|
|
&bc.Length,
|
|
)
|
|
|
|
if err == sql.ErrNoRows {
|
|
LogSQL("GetByChunkHashTx", "No rows found", chunkHash)
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
LogSQL("GetByChunkHashTx", "Error", chunkHash, err)
|
|
return nil, fmt.Errorf("querying blob chunk: %w", err)
|
|
}
|
|
|
|
LogSQL("GetByChunkHashTx", "Found blob", chunkHash, "blob", bc.BlobID)
|
|
return &bc, nil
|
|
}
|
|
|
|
// DeleteOrphaned deletes blob_chunks entries where either the blob or chunk no longer exists
|
|
func (r *BlobChunkRepository) DeleteOrphaned(ctx context.Context) error {
|
|
// Delete blob_chunks where the blob doesn't exist
|
|
query1 := `
|
|
DELETE FROM blob_chunks
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM blobs
|
|
WHERE blobs.id = blob_chunks.blob_id
|
|
)
|
|
`
|
|
if _, err := r.db.ExecWithLog(ctx, query1); err != nil {
|
|
return fmt.Errorf("deleting blob_chunks with missing blobs: %w", err)
|
|
}
|
|
|
|
// Delete blob_chunks where the chunk doesn't exist
|
|
query2 := `
|
|
DELETE FROM blob_chunks
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM chunks
|
|
WHERE chunks.chunk_hash = blob_chunks.chunk_hash
|
|
)
|
|
`
|
|
if _, err := r.db.ExecWithLog(ctx, query2); err != nil {
|
|
return fmt.Errorf("deleting blob_chunks with missing chunks: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|