The bug: fully-deduplicated snapshots (every chunk already in storage from a prior run) had an empty snapshot_blobs table. The metadata- export pipeline then dropped all blob/blob_chunks rows from the exported database, leaving file_chunks references to chunks whose blobs were no longer recorded. Restore fails on every file with "chunk X not found in any blob". Fix: at CompleteSnapshot time, run an INSERT OR IGNORE that links every blob holding a chunk referenced by this snapshot's files into snapshot_blobs. New blobs uploaded during the snapshot are already recorded (no-op for them); dedup-referenced blobs are added. The cleanup query in deleteOrphanedBlobs already restricts to snapshot_blobs entries for the current snapshot — so once snapshot_blobs is correctly populated, the exported database contains the full set of blob/blob_chunks rows needed for restore. Regression test: TestDedupOnlySnapshotRestores creates two identical snapshots (the second uploads zero new blobs) and restores the second. Without the fix, restore fails on every file.
586 lines
16 KiB
Go
586 lines
16 KiB
Go
package database
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"time"
|
|
|
|
"sneak.berlin/go/vaultik/internal/types"
|
|
)
|
|
|
|
type SnapshotRepository struct {
|
|
db *DB
|
|
}
|
|
|
|
func NewSnapshotRepository(db *DB) *SnapshotRepository {
|
|
return &SnapshotRepository{db: db}
|
|
}
|
|
|
|
func (r *SnapshotRepository) Create(ctx context.Context, tx *sql.Tx, snapshot *Snapshot) error {
|
|
query := `
|
|
INSERT INTO snapshots (id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at,
|
|
file_count, chunk_count, blob_count, total_size, blob_size, blob_uncompressed_size,
|
|
compression_ratio, compression_level, upload_bytes, upload_duration_ms)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`
|
|
|
|
var completedAt *int64
|
|
if snapshot.CompletedAt != nil {
|
|
ts := snapshot.CompletedAt.Unix()
|
|
completedAt = &ts
|
|
}
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, snapshot.ID, snapshot.Hostname, snapshot.VaultikVersion, snapshot.VaultikGitRevision, snapshot.StartedAt.Unix(),
|
|
completedAt, snapshot.FileCount, snapshot.ChunkCount, snapshot.BlobCount, snapshot.TotalSize, snapshot.BlobSize, snapshot.BlobUncompressedSize,
|
|
snapshot.CompressionRatio, snapshot.CompressionLevel, snapshot.UploadBytes, snapshot.UploadDurationMs)
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, snapshot.ID, snapshot.Hostname, snapshot.VaultikVersion, snapshot.VaultikGitRevision, snapshot.StartedAt.Unix(),
|
|
completedAt, snapshot.FileCount, snapshot.ChunkCount, snapshot.BlobCount, snapshot.TotalSize, snapshot.BlobSize, snapshot.BlobUncompressedSize,
|
|
snapshot.CompressionRatio, snapshot.CompressionLevel, snapshot.UploadBytes, snapshot.UploadDurationMs)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("inserting snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *SnapshotRepository) UpdateCounts(ctx context.Context, tx *sql.Tx, snapshotID string, fileCount, chunkCount, blobCount, totalSize, blobSize int64) error {
|
|
compressionRatio := 1.0
|
|
if totalSize > 0 {
|
|
compressionRatio = float64(blobSize) / float64(totalSize)
|
|
}
|
|
|
|
query := `
|
|
UPDATE snapshots
|
|
SET file_count = ?,
|
|
chunk_count = ?,
|
|
blob_count = ?,
|
|
total_size = ?,
|
|
blob_size = ?,
|
|
compression_ratio = ?
|
|
WHERE id = ?
|
|
`
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, fileCount, chunkCount, blobCount, totalSize, blobSize, compressionRatio, snapshotID)
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, fileCount, chunkCount, blobCount, totalSize, blobSize, compressionRatio, snapshotID)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("updating snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateExtendedStats updates extended statistics for a snapshot
|
|
func (r *SnapshotRepository) UpdateExtendedStats(ctx context.Context, tx *sql.Tx, snapshotID string, blobUncompressedSize int64, compressionLevel int, uploadDurationMs int64) error {
|
|
// Calculate compression ratio based on uncompressed vs compressed sizes
|
|
var compressionRatio float64
|
|
if blobUncompressedSize > 0 {
|
|
// Get current blob_size from DB to calculate ratio
|
|
var blobSize int64
|
|
queryGet := `SELECT blob_size FROM snapshots WHERE id = ?`
|
|
if tx != nil {
|
|
err := tx.QueryRowContext(ctx, queryGet, snapshotID).Scan(&blobSize)
|
|
if err != nil {
|
|
return fmt.Errorf("getting blob size: %w", err)
|
|
}
|
|
} else {
|
|
err := r.db.conn.QueryRowContext(ctx, queryGet, snapshotID).Scan(&blobSize)
|
|
if err != nil {
|
|
return fmt.Errorf("getting blob size: %w", err)
|
|
}
|
|
}
|
|
compressionRatio = float64(blobSize) / float64(blobUncompressedSize)
|
|
} else {
|
|
compressionRatio = 1.0
|
|
}
|
|
|
|
query := `
|
|
UPDATE snapshots
|
|
SET blob_uncompressed_size = ?,
|
|
compression_ratio = ?,
|
|
compression_level = ?,
|
|
upload_bytes = blob_size,
|
|
upload_duration_ms = ?
|
|
WHERE id = ?
|
|
`
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, blobUncompressedSize, compressionRatio, compressionLevel, uploadDurationMs, snapshotID)
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, blobUncompressedSize, compressionRatio, compressionLevel, uploadDurationMs, snapshotID)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("updating extended stats: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *SnapshotRepository) GetByID(ctx context.Context, snapshotID string) (*Snapshot, error) {
|
|
query := `
|
|
SELECT id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at,
|
|
file_count, chunk_count, blob_count, total_size, blob_size, blob_uncompressed_size,
|
|
compression_ratio, compression_level, upload_bytes, upload_duration_ms
|
|
FROM snapshots
|
|
WHERE id = ?
|
|
`
|
|
|
|
var snapshot Snapshot
|
|
var startedAtUnix int64
|
|
var completedAtUnix *int64
|
|
|
|
err := r.db.conn.QueryRowContext(ctx, query, snapshotID).Scan(
|
|
&snapshot.ID,
|
|
&snapshot.Hostname,
|
|
&snapshot.VaultikVersion,
|
|
&snapshot.VaultikGitRevision,
|
|
&startedAtUnix,
|
|
&completedAtUnix,
|
|
&snapshot.FileCount,
|
|
&snapshot.ChunkCount,
|
|
&snapshot.BlobCount,
|
|
&snapshot.TotalSize,
|
|
&snapshot.BlobSize,
|
|
&snapshot.BlobUncompressedSize,
|
|
&snapshot.CompressionRatio,
|
|
&snapshot.CompressionLevel,
|
|
&snapshot.UploadBytes,
|
|
&snapshot.UploadDurationMs,
|
|
)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying snapshot: %w", err)
|
|
}
|
|
|
|
snapshot.StartedAt = time.Unix(startedAtUnix, 0).UTC()
|
|
if completedAtUnix != nil {
|
|
t := time.Unix(*completedAtUnix, 0).UTC()
|
|
snapshot.CompletedAt = &t
|
|
}
|
|
|
|
return &snapshot, nil
|
|
}
|
|
|
|
func (r *SnapshotRepository) ListRecent(ctx context.Context, limit int) ([]*Snapshot, error) {
|
|
query := `
|
|
SELECT id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at, file_count, chunk_count, blob_count, total_size, blob_size, compression_ratio
|
|
FROM snapshots
|
|
ORDER BY started_at DESC
|
|
LIMIT ?
|
|
`
|
|
|
|
rows, err := r.db.conn.QueryContext(ctx, query, limit)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying snapshots: %w", err)
|
|
}
|
|
defer CloseRows(rows)
|
|
|
|
var snapshots []*Snapshot
|
|
for rows.Next() {
|
|
var snapshot Snapshot
|
|
var startedAtUnix int64
|
|
var completedAtUnix *int64
|
|
|
|
err := rows.Scan(
|
|
&snapshot.ID,
|
|
&snapshot.Hostname,
|
|
&snapshot.VaultikVersion,
|
|
&snapshot.VaultikGitRevision,
|
|
&startedAtUnix,
|
|
&completedAtUnix,
|
|
&snapshot.FileCount,
|
|
&snapshot.ChunkCount,
|
|
&snapshot.BlobCount,
|
|
&snapshot.TotalSize,
|
|
&snapshot.BlobSize,
|
|
&snapshot.CompressionRatio,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning snapshot: %w", err)
|
|
}
|
|
|
|
snapshot.StartedAt = time.Unix(startedAtUnix, 0)
|
|
if completedAtUnix != nil {
|
|
t := time.Unix(*completedAtUnix, 0)
|
|
snapshot.CompletedAt = &t
|
|
}
|
|
|
|
snapshots = append(snapshots, &snapshot)
|
|
}
|
|
|
|
return snapshots, rows.Err()
|
|
}
|
|
|
|
// MarkComplete marks a snapshot as completed with the current timestamp
|
|
func (r *SnapshotRepository) MarkComplete(ctx context.Context, tx *sql.Tx, snapshotID string) error {
|
|
query := `
|
|
UPDATE snapshots
|
|
SET completed_at = ?
|
|
WHERE id = ?
|
|
`
|
|
|
|
completedAt := time.Now().UTC().Unix()
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, completedAt, snapshotID)
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, completedAt, snapshotID)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("marking snapshot complete: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AddFile adds a file to a snapshot
|
|
func (r *SnapshotRepository) AddFile(ctx context.Context, tx *sql.Tx, snapshotID string, filePath string) error {
|
|
query := `
|
|
INSERT OR IGNORE INTO snapshot_files (snapshot_id, file_id)
|
|
SELECT ?, id FROM files WHERE path = ?
|
|
`
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, snapshotID, filePath)
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, snapshotID, filePath)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("adding file to snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AddFileByID adds a file to a snapshot by file ID
|
|
func (r *SnapshotRepository) AddFileByID(ctx context.Context, tx *sql.Tx, snapshotID string, fileID types.FileID) error {
|
|
query := `
|
|
INSERT OR IGNORE INTO snapshot_files (snapshot_id, file_id)
|
|
VALUES (?, ?)
|
|
`
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, snapshotID, fileID.String())
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, snapshotID, fileID.String())
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("adding file to snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AddFilesByIDBatch adds multiple files to a snapshot in batched inserts
|
|
func (r *SnapshotRepository) AddFilesByIDBatch(ctx context.Context, tx *sql.Tx, snapshotID string, fileIDs []types.FileID) error {
|
|
if len(fileIDs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Each entry has 2 values, so batch at 400 to be safe
|
|
const batchSize = 400
|
|
|
|
for i := 0; i < len(fileIDs); i += batchSize {
|
|
end := i + batchSize
|
|
if end > len(fileIDs) {
|
|
end = len(fileIDs)
|
|
}
|
|
batch := fileIDs[i:end]
|
|
|
|
query := "INSERT OR IGNORE INTO snapshot_files (snapshot_id, file_id) VALUES "
|
|
args := make([]interface{}, 0, len(batch)*2)
|
|
for j, fileID := range batch {
|
|
if j > 0 {
|
|
query += ", "
|
|
}
|
|
query += "(?, ?)"
|
|
args = append(args, snapshotID, fileID.String())
|
|
}
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, args...)
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, args...)
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("batch adding files to snapshot: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PopulateReferencedBlobs ensures snapshot_blobs contains an entry for
|
|
// every blob that holds a chunk referenced by any file in the snapshot.
|
|
// This is necessary because the AddBlob hook only runs when a blob is
|
|
// newly uploaded during a snapshot — fully-deduplicated snapshots (where
|
|
// every chunk already exists in storage from a prior run) would otherwise
|
|
// have an empty snapshot_blobs set and be impossible to restore.
|
|
//
|
|
// Returns the number of rows inserted (i.e. blobs that were previously
|
|
// referenced indirectly via file_chunks but not yet recorded in
|
|
// snapshot_blobs for this snapshot).
|
|
func (r *SnapshotRepository) PopulateReferencedBlobs(ctx context.Context, tx *sql.Tx, snapshotID string) (int64, error) {
|
|
query := `
|
|
INSERT OR IGNORE INTO snapshot_blobs (snapshot_id, blob_id, blob_hash)
|
|
SELECT DISTINCT ?, blobs.id, blobs.blob_hash
|
|
FROM blobs
|
|
JOIN blob_chunks ON blob_chunks.blob_id = blobs.id
|
|
JOIN file_chunks ON file_chunks.chunk_hash = blob_chunks.chunk_hash
|
|
JOIN snapshot_files ON snapshot_files.file_id = file_chunks.file_id
|
|
WHERE snapshot_files.snapshot_id = ?
|
|
AND blobs.blob_hash IS NOT NULL
|
|
`
|
|
|
|
var result sql.Result
|
|
var err error
|
|
if tx != nil {
|
|
result, err = tx.ExecContext(ctx, query, snapshotID, snapshotID)
|
|
} else {
|
|
result, err = r.db.ExecWithLog(ctx, query, snapshotID, snapshotID)
|
|
}
|
|
if err != nil {
|
|
return 0, fmt.Errorf("populating referenced blobs: %w", err)
|
|
}
|
|
|
|
n, _ := result.RowsAffected()
|
|
return n, nil
|
|
}
|
|
|
|
// AddBlob adds a blob to a snapshot
|
|
func (r *SnapshotRepository) AddBlob(ctx context.Context, tx *sql.Tx, snapshotID string, blobID types.BlobID, blobHash types.BlobHash) error {
|
|
query := `
|
|
INSERT OR IGNORE INTO snapshot_blobs (snapshot_id, blob_id, blob_hash)
|
|
VALUES (?, ?, ?)
|
|
`
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, snapshotID, blobID.String(), blobHash.String())
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, snapshotID, blobID.String(), blobHash.String())
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("adding blob to snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetBlobHashes returns all blob hashes for a snapshot
|
|
func (r *SnapshotRepository) GetBlobHashes(ctx context.Context, snapshotID string) ([]string, error) {
|
|
query := `
|
|
SELECT sb.blob_hash
|
|
FROM snapshot_blobs sb
|
|
WHERE sb.snapshot_id = ?
|
|
ORDER BY sb.blob_hash
|
|
`
|
|
|
|
rows, err := r.db.conn.QueryContext(ctx, query, snapshotID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying blob hashes: %w", err)
|
|
}
|
|
defer CloseRows(rows)
|
|
|
|
var blobs []string
|
|
for rows.Next() {
|
|
var blobHash string
|
|
if err := rows.Scan(&blobHash); err != nil {
|
|
return nil, fmt.Errorf("scanning blob hash: %w", err)
|
|
}
|
|
blobs = append(blobs, blobHash)
|
|
}
|
|
|
|
return blobs, rows.Err()
|
|
}
|
|
|
|
// GetSnapshotTotalCompressedSize returns the total compressed size of all blobs referenced by a snapshot
|
|
func (r *SnapshotRepository) GetSnapshotTotalCompressedSize(ctx context.Context, snapshotID string) (int64, error) {
|
|
query := `
|
|
SELECT COALESCE(SUM(b.compressed_size), 0)
|
|
FROM snapshot_blobs sb
|
|
JOIN blobs b ON sb.blob_hash = b.blob_hash
|
|
WHERE sb.snapshot_id = ?
|
|
`
|
|
|
|
var totalSize int64
|
|
err := r.db.conn.QueryRowContext(ctx, query, snapshotID).Scan(&totalSize)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("querying total compressed size: %w", err)
|
|
}
|
|
|
|
return totalSize, nil
|
|
}
|
|
|
|
// GetIncompleteSnapshots returns all snapshots that haven't been completed
|
|
func (r *SnapshotRepository) GetIncompleteSnapshots(ctx context.Context) ([]*Snapshot, error) {
|
|
query := `
|
|
SELECT id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at, file_count, chunk_count, blob_count, total_size, blob_size, compression_ratio
|
|
FROM snapshots
|
|
WHERE completed_at IS NULL
|
|
ORDER BY started_at DESC
|
|
`
|
|
|
|
rows, err := r.db.conn.QueryContext(ctx, query)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying incomplete snapshots: %w", err)
|
|
}
|
|
defer CloseRows(rows)
|
|
|
|
var snapshots []*Snapshot
|
|
for rows.Next() {
|
|
var snapshot Snapshot
|
|
var startedAtUnix int64
|
|
var completedAtUnix *int64
|
|
|
|
err := rows.Scan(
|
|
&snapshot.ID,
|
|
&snapshot.Hostname,
|
|
&snapshot.VaultikVersion,
|
|
&snapshot.VaultikGitRevision,
|
|
&startedAtUnix,
|
|
&completedAtUnix,
|
|
&snapshot.FileCount,
|
|
&snapshot.ChunkCount,
|
|
&snapshot.BlobCount,
|
|
&snapshot.TotalSize,
|
|
&snapshot.BlobSize,
|
|
&snapshot.CompressionRatio,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning snapshot: %w", err)
|
|
}
|
|
|
|
snapshot.StartedAt = time.Unix(startedAtUnix, 0)
|
|
if completedAtUnix != nil {
|
|
t := time.Unix(*completedAtUnix, 0)
|
|
snapshot.CompletedAt = &t
|
|
}
|
|
|
|
snapshots = append(snapshots, &snapshot)
|
|
}
|
|
|
|
return snapshots, rows.Err()
|
|
}
|
|
|
|
// GetIncompleteByHostname returns all incomplete snapshots for a specific hostname
|
|
func (r *SnapshotRepository) GetIncompleteByHostname(ctx context.Context, hostname string) ([]*Snapshot, error) {
|
|
query := `
|
|
SELECT id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at, file_count, chunk_count, blob_count, total_size, blob_size, compression_ratio
|
|
FROM snapshots
|
|
WHERE completed_at IS NULL AND hostname = ?
|
|
ORDER BY started_at DESC
|
|
`
|
|
|
|
rows, err := r.db.conn.QueryContext(ctx, query, hostname)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying incomplete snapshots: %w", err)
|
|
}
|
|
defer CloseRows(rows)
|
|
|
|
var snapshots []*Snapshot
|
|
for rows.Next() {
|
|
var snapshot Snapshot
|
|
var startedAtUnix int64
|
|
var completedAtUnix *int64
|
|
|
|
err := rows.Scan(
|
|
&snapshot.ID,
|
|
&snapshot.Hostname,
|
|
&snapshot.VaultikVersion,
|
|
&snapshot.VaultikGitRevision,
|
|
&startedAtUnix,
|
|
&completedAtUnix,
|
|
&snapshot.FileCount,
|
|
&snapshot.ChunkCount,
|
|
&snapshot.BlobCount,
|
|
&snapshot.TotalSize,
|
|
&snapshot.BlobSize,
|
|
&snapshot.CompressionRatio,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning snapshot: %w", err)
|
|
}
|
|
|
|
snapshot.StartedAt = time.Unix(startedAtUnix, 0).UTC()
|
|
if completedAtUnix != nil {
|
|
t := time.Unix(*completedAtUnix, 0).UTC()
|
|
snapshot.CompletedAt = &t
|
|
}
|
|
|
|
snapshots = append(snapshots, &snapshot)
|
|
}
|
|
|
|
return snapshots, rows.Err()
|
|
}
|
|
|
|
// Delete removes a snapshot record
|
|
func (r *SnapshotRepository) Delete(ctx context.Context, snapshotID string) error {
|
|
query := `DELETE FROM snapshots WHERE id = ?`
|
|
|
|
_, err := r.db.ExecWithLog(ctx, query, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteSnapshotFiles removes all snapshot_files entries for a snapshot
|
|
func (r *SnapshotRepository) DeleteSnapshotFiles(ctx context.Context, snapshotID string) error {
|
|
query := `DELETE FROM snapshot_files WHERE snapshot_id = ?`
|
|
|
|
_, err := r.db.ExecWithLog(ctx, query, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting snapshot files: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteSnapshotBlobs removes all snapshot_blobs entries for a snapshot
|
|
func (r *SnapshotRepository) DeleteSnapshotBlobs(ctx context.Context, snapshotID string) error {
|
|
query := `DELETE FROM snapshot_blobs WHERE snapshot_id = ?`
|
|
|
|
_, err := r.db.ExecWithLog(ctx, query, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting snapshot blobs: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteSnapshotUploads removes all uploads entries for a snapshot
|
|
func (r *SnapshotRepository) DeleteSnapshotUploads(ctx context.Context, snapshotID string) error {
|
|
query := `DELETE FROM uploads WHERE snapshot_id = ?`
|
|
|
|
_, err := r.db.ExecWithLog(ctx, query, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting snapshot uploads: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|