package database import ( "context" "database/sql" "fmt" ) type BlobChunkRepository struct { db *DB } func NewBlobChunkRepository(db *DB) *BlobChunkRepository { return &BlobChunkRepository{db: db} } func (r *BlobChunkRepository) Create(ctx context.Context, tx *sql.Tx, bc *BlobChunk) error { query := ` INSERT INTO blob_chunks (blob_id, chunk_hash, offset, length) VALUES (?, ?, ?, ?) ` var err error if tx != nil { _, err = tx.ExecContext(ctx, query, bc.BlobID, bc.ChunkHash, bc.Offset, bc.Length) } else { _, err = r.db.ExecWithLog(ctx, query, bc.BlobID, bc.ChunkHash, bc.Offset, bc.Length) } if err != nil { return fmt.Errorf("inserting blob_chunk: %w", err) } return nil } func (r *BlobChunkRepository) GetByBlobID(ctx context.Context, blobID string) ([]*BlobChunk, error) { query := ` SELECT blob_id, chunk_hash, offset, length FROM blob_chunks WHERE blob_id = ? ORDER BY offset ` rows, err := r.db.conn.QueryContext(ctx, query, blobID) if err != nil { return nil, fmt.Errorf("querying blob chunks: %w", err) } defer CloseRows(rows) var blobChunks []*BlobChunk for rows.Next() { var bc BlobChunk err := rows.Scan(&bc.BlobID, &bc.ChunkHash, &bc.Offset, &bc.Length) if err != nil { return nil, fmt.Errorf("scanning blob chunk: %w", err) } blobChunks = append(blobChunks, &bc) } return blobChunks, rows.Err() } func (r *BlobChunkRepository) GetByChunkHash(ctx context.Context, chunkHash string) (*BlobChunk, error) { query := ` SELECT blob_id, chunk_hash, offset, length FROM blob_chunks WHERE chunk_hash = ? LIMIT 1 ` LogSQL("GetByChunkHash", query, chunkHash) var bc BlobChunk err := r.db.conn.QueryRowContext(ctx, query, chunkHash).Scan( &bc.BlobID, &bc.ChunkHash, &bc.Offset, &bc.Length, ) if err == sql.ErrNoRows { LogSQL("GetByChunkHash", "No rows found", chunkHash) return nil, nil } if err != nil { LogSQL("GetByChunkHash", "Error", chunkHash, err) return nil, fmt.Errorf("querying blob chunk: %w", err) } LogSQL("GetByChunkHash", "Found blob", chunkHash, "blob", bc.BlobID) return &bc, nil } // GetByChunkHashTx retrieves a blob chunk within a transaction func (r *BlobChunkRepository) GetByChunkHashTx(ctx context.Context, tx *sql.Tx, chunkHash string) (*BlobChunk, error) { query := ` SELECT blob_id, chunk_hash, offset, length FROM blob_chunks WHERE chunk_hash = ? LIMIT 1 ` LogSQL("GetByChunkHashTx", query, chunkHash) var bc BlobChunk err := tx.QueryRowContext(ctx, query, chunkHash).Scan( &bc.BlobID, &bc.ChunkHash, &bc.Offset, &bc.Length, ) if err == sql.ErrNoRows { LogSQL("GetByChunkHashTx", "No rows found", chunkHash) return nil, nil } if err != nil { LogSQL("GetByChunkHashTx", "Error", chunkHash, err) return nil, fmt.Errorf("querying blob chunk: %w", err) } LogSQL("GetByChunkHashTx", "Found blob", chunkHash, "blob", bc.BlobID) return &bc, nil } // DeleteOrphaned deletes blob_chunks entries where either the blob or chunk no longer exists func (r *BlobChunkRepository) DeleteOrphaned(ctx context.Context) error { // Delete blob_chunks where the blob doesn't exist query1 := ` DELETE FROM blob_chunks WHERE NOT EXISTS ( SELECT 1 FROM blobs WHERE blobs.id = blob_chunks.blob_id ) ` if _, err := r.db.ExecWithLog(ctx, query1); err != nil { return fmt.Errorf("deleting blob_chunks with missing blobs: %w", err) } // Delete blob_chunks where the chunk doesn't exist query2 := ` DELETE FROM blob_chunks WHERE NOT EXISTS ( SELECT 1 FROM chunks WHERE chunks.chunk_hash = blob_chunks.chunk_hash ) ` if _, err := r.db.ExecWithLog(ctx, query2); err != nil { return fmt.Errorf("deleting blob_chunks with missing chunks: %w", err) } return nil }