The remote snapshot table now shows the total plaintext size of all chunks referenced by each snapshot, plus the plaintext size of chunks newly referenced by that snapshot (chunks not in any earlier completed snapshot known to the local DB). The latter is the marginal data introduced by each backup — useful for spotting which snapshots actually added bytes vs. dedup'd against prior state. Both new columns are computed from the local database only. Snapshots that exist in remote storage but not in the local DB show "<remote only>" in those cells; their COMPRESSED SIZE column still reflects the value fetched from the remote manifest.
645 lines
18 KiB
Go
645 lines
18 KiB
Go
package database
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"time"
|
|
|
|
"sneak.berlin/go/vaultik/internal/types"
|
|
)
|
|
|
|
type SnapshotRepository struct {
|
|
db *DB
|
|
}
|
|
|
|
func NewSnapshotRepository(db *DB) *SnapshotRepository {
|
|
return &SnapshotRepository{db: db}
|
|
}
|
|
|
|
func (r *SnapshotRepository) Create(ctx context.Context, tx *sql.Tx, snapshot *Snapshot) error {
|
|
query := `
|
|
INSERT INTO snapshots (id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at,
|
|
file_count, chunk_count, blob_count, total_size, blob_size, blob_uncompressed_size,
|
|
compression_ratio, compression_level, upload_bytes, upload_duration_ms)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`
|
|
|
|
var completedAt *int64
|
|
if snapshot.CompletedAt != nil {
|
|
ts := snapshot.CompletedAt.Unix()
|
|
completedAt = &ts
|
|
}
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, snapshot.ID, snapshot.Hostname, snapshot.VaultikVersion, snapshot.VaultikGitRevision, snapshot.StartedAt.Unix(),
|
|
completedAt, snapshot.FileCount, snapshot.ChunkCount, snapshot.BlobCount, snapshot.TotalSize, snapshot.BlobSize, snapshot.BlobUncompressedSize,
|
|
snapshot.CompressionRatio, snapshot.CompressionLevel, snapshot.UploadBytes, snapshot.UploadDurationMs)
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, snapshot.ID, snapshot.Hostname, snapshot.VaultikVersion, snapshot.VaultikGitRevision, snapshot.StartedAt.Unix(),
|
|
completedAt, snapshot.FileCount, snapshot.ChunkCount, snapshot.BlobCount, snapshot.TotalSize, snapshot.BlobSize, snapshot.BlobUncompressedSize,
|
|
snapshot.CompressionRatio, snapshot.CompressionLevel, snapshot.UploadBytes, snapshot.UploadDurationMs)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("inserting snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *SnapshotRepository) UpdateCounts(ctx context.Context, tx *sql.Tx, snapshotID string, fileCount, chunkCount, blobCount, totalSize, blobSize int64) error {
|
|
compressionRatio := 1.0
|
|
if totalSize > 0 {
|
|
compressionRatio = float64(blobSize) / float64(totalSize)
|
|
}
|
|
|
|
query := `
|
|
UPDATE snapshots
|
|
SET file_count = ?,
|
|
chunk_count = ?,
|
|
blob_count = ?,
|
|
total_size = ?,
|
|
blob_size = ?,
|
|
compression_ratio = ?
|
|
WHERE id = ?
|
|
`
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, fileCount, chunkCount, blobCount, totalSize, blobSize, compressionRatio, snapshotID)
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, fileCount, chunkCount, blobCount, totalSize, blobSize, compressionRatio, snapshotID)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("updating snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateExtendedStats updates extended statistics for a snapshot
|
|
func (r *SnapshotRepository) UpdateExtendedStats(ctx context.Context, tx *sql.Tx, snapshotID string, blobUncompressedSize int64, compressionLevel int, uploadDurationMs int64) error {
|
|
// Calculate compression ratio based on uncompressed vs compressed sizes
|
|
var compressionRatio float64
|
|
if blobUncompressedSize > 0 {
|
|
// Get current blob_size from DB to calculate ratio
|
|
var blobSize int64
|
|
queryGet := `SELECT blob_size FROM snapshots WHERE id = ?`
|
|
if tx != nil {
|
|
err := tx.QueryRowContext(ctx, queryGet, snapshotID).Scan(&blobSize)
|
|
if err != nil {
|
|
return fmt.Errorf("getting blob size: %w", err)
|
|
}
|
|
} else {
|
|
err := r.db.conn.QueryRowContext(ctx, queryGet, snapshotID).Scan(&blobSize)
|
|
if err != nil {
|
|
return fmt.Errorf("getting blob size: %w", err)
|
|
}
|
|
}
|
|
compressionRatio = float64(blobSize) / float64(blobUncompressedSize)
|
|
} else {
|
|
compressionRatio = 1.0
|
|
}
|
|
|
|
query := `
|
|
UPDATE snapshots
|
|
SET blob_uncompressed_size = ?,
|
|
compression_ratio = ?,
|
|
compression_level = ?,
|
|
upload_bytes = blob_size,
|
|
upload_duration_ms = ?
|
|
WHERE id = ?
|
|
`
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, blobUncompressedSize, compressionRatio, compressionLevel, uploadDurationMs, snapshotID)
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, blobUncompressedSize, compressionRatio, compressionLevel, uploadDurationMs, snapshotID)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("updating extended stats: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *SnapshotRepository) GetByID(ctx context.Context, snapshotID string) (*Snapshot, error) {
|
|
query := `
|
|
SELECT id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at,
|
|
file_count, chunk_count, blob_count, total_size, blob_size, blob_uncompressed_size,
|
|
compression_ratio, compression_level, upload_bytes, upload_duration_ms
|
|
FROM snapshots
|
|
WHERE id = ?
|
|
`
|
|
|
|
var snapshot Snapshot
|
|
var startedAtUnix int64
|
|
var completedAtUnix *int64
|
|
|
|
err := r.db.conn.QueryRowContext(ctx, query, snapshotID).Scan(
|
|
&snapshot.ID,
|
|
&snapshot.Hostname,
|
|
&snapshot.VaultikVersion,
|
|
&snapshot.VaultikGitRevision,
|
|
&startedAtUnix,
|
|
&completedAtUnix,
|
|
&snapshot.FileCount,
|
|
&snapshot.ChunkCount,
|
|
&snapshot.BlobCount,
|
|
&snapshot.TotalSize,
|
|
&snapshot.BlobSize,
|
|
&snapshot.BlobUncompressedSize,
|
|
&snapshot.CompressionRatio,
|
|
&snapshot.CompressionLevel,
|
|
&snapshot.UploadBytes,
|
|
&snapshot.UploadDurationMs,
|
|
)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying snapshot: %w", err)
|
|
}
|
|
|
|
snapshot.StartedAt = time.Unix(startedAtUnix, 0).UTC()
|
|
if completedAtUnix != nil {
|
|
t := time.Unix(*completedAtUnix, 0).UTC()
|
|
snapshot.CompletedAt = &t
|
|
}
|
|
|
|
return &snapshot, nil
|
|
}
|
|
|
|
func (r *SnapshotRepository) ListRecent(ctx context.Context, limit int) ([]*Snapshot, error) {
|
|
query := `
|
|
SELECT id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at, file_count, chunk_count, blob_count, total_size, blob_size, compression_ratio
|
|
FROM snapshots
|
|
ORDER BY started_at DESC
|
|
LIMIT ?
|
|
`
|
|
|
|
rows, err := r.db.conn.QueryContext(ctx, query, limit)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying snapshots: %w", err)
|
|
}
|
|
defer CloseRows(rows)
|
|
|
|
var snapshots []*Snapshot
|
|
for rows.Next() {
|
|
var snapshot Snapshot
|
|
var startedAtUnix int64
|
|
var completedAtUnix *int64
|
|
|
|
err := rows.Scan(
|
|
&snapshot.ID,
|
|
&snapshot.Hostname,
|
|
&snapshot.VaultikVersion,
|
|
&snapshot.VaultikGitRevision,
|
|
&startedAtUnix,
|
|
&completedAtUnix,
|
|
&snapshot.FileCount,
|
|
&snapshot.ChunkCount,
|
|
&snapshot.BlobCount,
|
|
&snapshot.TotalSize,
|
|
&snapshot.BlobSize,
|
|
&snapshot.CompressionRatio,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning snapshot: %w", err)
|
|
}
|
|
|
|
snapshot.StartedAt = time.Unix(startedAtUnix, 0)
|
|
if completedAtUnix != nil {
|
|
t := time.Unix(*completedAtUnix, 0)
|
|
snapshot.CompletedAt = &t
|
|
}
|
|
|
|
snapshots = append(snapshots, &snapshot)
|
|
}
|
|
|
|
return snapshots, rows.Err()
|
|
}
|
|
|
|
// MarkComplete marks a snapshot as completed with the current timestamp
|
|
func (r *SnapshotRepository) MarkComplete(ctx context.Context, tx *sql.Tx, snapshotID string) error {
|
|
query := `
|
|
UPDATE snapshots
|
|
SET completed_at = ?
|
|
WHERE id = ?
|
|
`
|
|
|
|
completedAt := time.Now().UTC().Unix()
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, completedAt, snapshotID)
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, completedAt, snapshotID)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("marking snapshot complete: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AddFile adds a file to a snapshot
|
|
func (r *SnapshotRepository) AddFile(ctx context.Context, tx *sql.Tx, snapshotID string, filePath string) error {
|
|
query := `
|
|
INSERT OR IGNORE INTO snapshot_files (snapshot_id, file_id)
|
|
SELECT ?, id FROM files WHERE path = ?
|
|
`
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, snapshotID, filePath)
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, snapshotID, filePath)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("adding file to snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AddFileByID adds a file to a snapshot by file ID
|
|
func (r *SnapshotRepository) AddFileByID(ctx context.Context, tx *sql.Tx, snapshotID string, fileID types.FileID) error {
|
|
query := `
|
|
INSERT OR IGNORE INTO snapshot_files (snapshot_id, file_id)
|
|
VALUES (?, ?)
|
|
`
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, snapshotID, fileID.String())
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, snapshotID, fileID.String())
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("adding file to snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AddFilesByIDBatch adds multiple files to a snapshot in batched inserts
|
|
func (r *SnapshotRepository) AddFilesByIDBatch(ctx context.Context, tx *sql.Tx, snapshotID string, fileIDs []types.FileID) error {
|
|
if len(fileIDs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Each entry has 2 values, so batch at 400 to be safe
|
|
const batchSize = 400
|
|
|
|
for i := 0; i < len(fileIDs); i += batchSize {
|
|
end := i + batchSize
|
|
if end > len(fileIDs) {
|
|
end = len(fileIDs)
|
|
}
|
|
batch := fileIDs[i:end]
|
|
|
|
query := "INSERT OR IGNORE INTO snapshot_files (snapshot_id, file_id) VALUES "
|
|
args := make([]interface{}, 0, len(batch)*2)
|
|
for j, fileID := range batch {
|
|
if j > 0 {
|
|
query += ", "
|
|
}
|
|
query += "(?, ?)"
|
|
args = append(args, snapshotID, fileID.String())
|
|
}
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, args...)
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, args...)
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("batch adding files to snapshot: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PopulateReferencedBlobs ensures snapshot_blobs contains an entry for
|
|
// every blob that holds a chunk referenced by any file in the snapshot.
|
|
// This is necessary because the AddBlob hook only runs when a blob is
|
|
// newly uploaded during a snapshot — fully-deduplicated snapshots (where
|
|
// every chunk already exists in storage from a prior run) would otherwise
|
|
// have an empty snapshot_blobs set and be impossible to restore.
|
|
//
|
|
// Returns the number of rows inserted (i.e. blobs that were previously
|
|
// referenced indirectly via file_chunks but not yet recorded in
|
|
// snapshot_blobs for this snapshot).
|
|
func (r *SnapshotRepository) PopulateReferencedBlobs(ctx context.Context, tx *sql.Tx, snapshotID string) (int64, error) {
|
|
query := `
|
|
INSERT OR IGNORE INTO snapshot_blobs (snapshot_id, blob_id, blob_hash)
|
|
SELECT DISTINCT ?, blobs.id, blobs.blob_hash
|
|
FROM blobs
|
|
JOIN blob_chunks ON blob_chunks.blob_id = blobs.id
|
|
JOIN file_chunks ON file_chunks.chunk_hash = blob_chunks.chunk_hash
|
|
JOIN snapshot_files ON snapshot_files.file_id = file_chunks.file_id
|
|
WHERE snapshot_files.snapshot_id = ?
|
|
AND blobs.blob_hash IS NOT NULL
|
|
`
|
|
|
|
var result sql.Result
|
|
var err error
|
|
if tx != nil {
|
|
result, err = tx.ExecContext(ctx, query, snapshotID, snapshotID)
|
|
} else {
|
|
result, err = r.db.ExecWithLog(ctx, query, snapshotID, snapshotID)
|
|
}
|
|
if err != nil {
|
|
return 0, fmt.Errorf("populating referenced blobs: %w", err)
|
|
}
|
|
|
|
n, _ := result.RowsAffected()
|
|
return n, nil
|
|
}
|
|
|
|
// AddBlob adds a blob to a snapshot
|
|
func (r *SnapshotRepository) AddBlob(ctx context.Context, tx *sql.Tx, snapshotID string, blobID types.BlobID, blobHash types.BlobHash) error {
|
|
query := `
|
|
INSERT OR IGNORE INTO snapshot_blobs (snapshot_id, blob_id, blob_hash)
|
|
VALUES (?, ?, ?)
|
|
`
|
|
|
|
var err error
|
|
if tx != nil {
|
|
_, err = tx.ExecContext(ctx, query, snapshotID, blobID.String(), blobHash.String())
|
|
} else {
|
|
_, err = r.db.ExecWithLog(ctx, query, snapshotID, blobID.String(), blobHash.String())
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("adding blob to snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetBlobHashes returns all blob hashes for a snapshot
|
|
func (r *SnapshotRepository) GetBlobHashes(ctx context.Context, snapshotID string) ([]string, error) {
|
|
query := `
|
|
SELECT sb.blob_hash
|
|
FROM snapshot_blobs sb
|
|
WHERE sb.snapshot_id = ?
|
|
ORDER BY sb.blob_hash
|
|
`
|
|
|
|
rows, err := r.db.conn.QueryContext(ctx, query, snapshotID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying blob hashes: %w", err)
|
|
}
|
|
defer CloseRows(rows)
|
|
|
|
var blobs []string
|
|
for rows.Next() {
|
|
var blobHash string
|
|
if err := rows.Scan(&blobHash); err != nil {
|
|
return nil, fmt.Errorf("scanning blob hash: %w", err)
|
|
}
|
|
blobs = append(blobs, blobHash)
|
|
}
|
|
|
|
return blobs, rows.Err()
|
|
}
|
|
|
|
// GetSnapshotTotalCompressedSize returns the total compressed size of all blobs referenced by a snapshot
|
|
func (r *SnapshotRepository) GetSnapshotTotalCompressedSize(ctx context.Context, snapshotID string) (int64, error) {
|
|
query := `
|
|
SELECT COALESCE(SUM(b.compressed_size), 0)
|
|
FROM snapshot_blobs sb
|
|
JOIN blobs b ON sb.blob_hash = b.blob_hash
|
|
WHERE sb.snapshot_id = ?
|
|
`
|
|
|
|
var totalSize int64
|
|
err := r.db.conn.QueryRowContext(ctx, query, snapshotID).Scan(&totalSize)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("querying total compressed size: %w", err)
|
|
}
|
|
|
|
return totalSize, nil
|
|
}
|
|
|
|
// GetSnapshotUncompressedChunkSize returns the sum of plaintext sizes of all unique
|
|
// chunks referenced by a snapshot (via snapshot_files → file_chunks → chunks).
|
|
func (r *SnapshotRepository) GetSnapshotUncompressedChunkSize(ctx context.Context, snapshotID string) (int64, error) {
|
|
query := `
|
|
SELECT COALESCE(SUM(c.size), 0)
|
|
FROM (
|
|
SELECT DISTINCT fc.chunk_hash
|
|
FROM snapshot_files sf
|
|
JOIN file_chunks fc ON sf.file_id = fc.file_id
|
|
WHERE sf.snapshot_id = ?
|
|
) sc
|
|
JOIN chunks c ON sc.chunk_hash = c.chunk_hash
|
|
`
|
|
|
|
var totalSize int64
|
|
err := r.db.conn.QueryRowContext(ctx, query, snapshotID).Scan(&totalSize)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("querying uncompressed chunk size: %w", err)
|
|
}
|
|
|
|
return totalSize, nil
|
|
}
|
|
|
|
// GetSnapshotNewChunkSize returns the sum of plaintext sizes of chunks that are
|
|
// referenced by this snapshot but not by any earlier completed snapshot known to
|
|
// the local database. The result is the marginal uncompressed data this snapshot
|
|
// added to the dedup pool — i.e., the delta from prior snapshots.
|
|
func (r *SnapshotRepository) GetSnapshotNewChunkSize(ctx context.Context, snapshotID string) (int64, error) {
|
|
query := `
|
|
WITH this_snap_chunks AS (
|
|
SELECT DISTINCT fc.chunk_hash
|
|
FROM snapshot_files sf
|
|
JOIN file_chunks fc ON sf.file_id = fc.file_id
|
|
WHERE sf.snapshot_id = ?
|
|
),
|
|
prior_chunks AS (
|
|
SELECT DISTINCT fc.chunk_hash
|
|
FROM snapshots s
|
|
JOIN snapshot_files sf ON sf.snapshot_id = s.id
|
|
JOIN file_chunks fc ON fc.file_id = sf.file_id
|
|
WHERE s.completed_at IS NOT NULL
|
|
AND s.id != ?
|
|
AND s.started_at < (SELECT started_at FROM snapshots WHERE id = ?)
|
|
)
|
|
SELECT COALESCE(SUM(c.size), 0)
|
|
FROM chunks c
|
|
JOIN this_snap_chunks t ON c.chunk_hash = t.chunk_hash
|
|
WHERE c.chunk_hash NOT IN (SELECT chunk_hash FROM prior_chunks)
|
|
`
|
|
|
|
var totalSize int64
|
|
err := r.db.conn.QueryRowContext(ctx, query, snapshotID, snapshotID, snapshotID).Scan(&totalSize)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("querying new chunk size: %w", err)
|
|
}
|
|
|
|
return totalSize, nil
|
|
}
|
|
|
|
// GetIncompleteSnapshots returns all snapshots that haven't been completed
|
|
func (r *SnapshotRepository) GetIncompleteSnapshots(ctx context.Context) ([]*Snapshot, error) {
|
|
query := `
|
|
SELECT id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at, file_count, chunk_count, blob_count, total_size, blob_size, compression_ratio
|
|
FROM snapshots
|
|
WHERE completed_at IS NULL
|
|
ORDER BY started_at DESC
|
|
`
|
|
|
|
rows, err := r.db.conn.QueryContext(ctx, query)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying incomplete snapshots: %w", err)
|
|
}
|
|
defer CloseRows(rows)
|
|
|
|
var snapshots []*Snapshot
|
|
for rows.Next() {
|
|
var snapshot Snapshot
|
|
var startedAtUnix int64
|
|
var completedAtUnix *int64
|
|
|
|
err := rows.Scan(
|
|
&snapshot.ID,
|
|
&snapshot.Hostname,
|
|
&snapshot.VaultikVersion,
|
|
&snapshot.VaultikGitRevision,
|
|
&startedAtUnix,
|
|
&completedAtUnix,
|
|
&snapshot.FileCount,
|
|
&snapshot.ChunkCount,
|
|
&snapshot.BlobCount,
|
|
&snapshot.TotalSize,
|
|
&snapshot.BlobSize,
|
|
&snapshot.CompressionRatio,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning snapshot: %w", err)
|
|
}
|
|
|
|
snapshot.StartedAt = time.Unix(startedAtUnix, 0)
|
|
if completedAtUnix != nil {
|
|
t := time.Unix(*completedAtUnix, 0)
|
|
snapshot.CompletedAt = &t
|
|
}
|
|
|
|
snapshots = append(snapshots, &snapshot)
|
|
}
|
|
|
|
return snapshots, rows.Err()
|
|
}
|
|
|
|
// GetIncompleteByHostname returns all incomplete snapshots for a specific hostname
|
|
func (r *SnapshotRepository) GetIncompleteByHostname(ctx context.Context, hostname string) ([]*Snapshot, error) {
|
|
query := `
|
|
SELECT id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at, file_count, chunk_count, blob_count, total_size, blob_size, compression_ratio
|
|
FROM snapshots
|
|
WHERE completed_at IS NULL AND hostname = ?
|
|
ORDER BY started_at DESC
|
|
`
|
|
|
|
rows, err := r.db.conn.QueryContext(ctx, query, hostname)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("querying incomplete snapshots: %w", err)
|
|
}
|
|
defer CloseRows(rows)
|
|
|
|
var snapshots []*Snapshot
|
|
for rows.Next() {
|
|
var snapshot Snapshot
|
|
var startedAtUnix int64
|
|
var completedAtUnix *int64
|
|
|
|
err := rows.Scan(
|
|
&snapshot.ID,
|
|
&snapshot.Hostname,
|
|
&snapshot.VaultikVersion,
|
|
&snapshot.VaultikGitRevision,
|
|
&startedAtUnix,
|
|
&completedAtUnix,
|
|
&snapshot.FileCount,
|
|
&snapshot.ChunkCount,
|
|
&snapshot.BlobCount,
|
|
&snapshot.TotalSize,
|
|
&snapshot.BlobSize,
|
|
&snapshot.CompressionRatio,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning snapshot: %w", err)
|
|
}
|
|
|
|
snapshot.StartedAt = time.Unix(startedAtUnix, 0).UTC()
|
|
if completedAtUnix != nil {
|
|
t := time.Unix(*completedAtUnix, 0).UTC()
|
|
snapshot.CompletedAt = &t
|
|
}
|
|
|
|
snapshots = append(snapshots, &snapshot)
|
|
}
|
|
|
|
return snapshots, rows.Err()
|
|
}
|
|
|
|
// Delete removes a snapshot record
|
|
func (r *SnapshotRepository) Delete(ctx context.Context, snapshotID string) error {
|
|
query := `DELETE FROM snapshots WHERE id = ?`
|
|
|
|
_, err := r.db.ExecWithLog(ctx, query, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteSnapshotFiles removes all snapshot_files entries for a snapshot
|
|
func (r *SnapshotRepository) DeleteSnapshotFiles(ctx context.Context, snapshotID string) error {
|
|
query := `DELETE FROM snapshot_files WHERE snapshot_id = ?`
|
|
|
|
_, err := r.db.ExecWithLog(ctx, query, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting snapshot files: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteSnapshotBlobs removes all snapshot_blobs entries for a snapshot
|
|
func (r *SnapshotRepository) DeleteSnapshotBlobs(ctx context.Context, snapshotID string) error {
|
|
query := `DELETE FROM snapshot_blobs WHERE snapshot_id = ?`
|
|
|
|
_, err := r.db.ExecWithLog(ctx, query, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting snapshot blobs: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteSnapshotUploads removes all uploads entries for a snapshot
|
|
func (r *SnapshotRepository) DeleteSnapshotUploads(ctx context.Context, snapshotID string) error {
|
|
query := `DELETE FROM uploads WHERE snapshot_id = ?`
|
|
|
|
_, err := r.db.ExecWithLog(ctx, query, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting snapshot uploads: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|