Files
vaultik/internal/database/snapshots.go
sneak 0e9c96c8b5 Add uncompressed-size and new-chunk-size columns to snapshot list
The remote snapshot table now shows the total plaintext size of all
chunks referenced by each snapshot, plus the plaintext size of chunks
newly referenced by that snapshot (chunks not in any earlier completed
snapshot known to the local DB). The latter is the marginal data
introduced by each backup — useful for spotting which snapshots
actually added bytes vs. dedup'd against prior state.

Both new columns are computed from the local database only. Snapshots
that exist in remote storage but not in the local DB show
"<remote only>" in those cells; their COMPRESSED SIZE column still
reflects the value fetched from the remote manifest.
2026-06-17 06:33:59 +02:00

645 lines
18 KiB
Go

package database
import (
"context"
"database/sql"
"fmt"
"time"
"sneak.berlin/go/vaultik/internal/types"
)
type SnapshotRepository struct {
db *DB
}
func NewSnapshotRepository(db *DB) *SnapshotRepository {
return &SnapshotRepository{db: db}
}
func (r *SnapshotRepository) Create(ctx context.Context, tx *sql.Tx, snapshot *Snapshot) error {
query := `
INSERT INTO snapshots (id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at,
file_count, chunk_count, blob_count, total_size, blob_size, blob_uncompressed_size,
compression_ratio, compression_level, upload_bytes, upload_duration_ms)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
var completedAt *int64
if snapshot.CompletedAt != nil {
ts := snapshot.CompletedAt.Unix()
completedAt = &ts
}
var err error
if tx != nil {
_, err = tx.ExecContext(ctx, query, snapshot.ID, snapshot.Hostname, snapshot.VaultikVersion, snapshot.VaultikGitRevision, snapshot.StartedAt.Unix(),
completedAt, snapshot.FileCount, snapshot.ChunkCount, snapshot.BlobCount, snapshot.TotalSize, snapshot.BlobSize, snapshot.BlobUncompressedSize,
snapshot.CompressionRatio, snapshot.CompressionLevel, snapshot.UploadBytes, snapshot.UploadDurationMs)
} else {
_, err = r.db.ExecWithLog(ctx, query, snapshot.ID, snapshot.Hostname, snapshot.VaultikVersion, snapshot.VaultikGitRevision, snapshot.StartedAt.Unix(),
completedAt, snapshot.FileCount, snapshot.ChunkCount, snapshot.BlobCount, snapshot.TotalSize, snapshot.BlobSize, snapshot.BlobUncompressedSize,
snapshot.CompressionRatio, snapshot.CompressionLevel, snapshot.UploadBytes, snapshot.UploadDurationMs)
}
if err != nil {
return fmt.Errorf("inserting snapshot: %w", err)
}
return nil
}
func (r *SnapshotRepository) UpdateCounts(ctx context.Context, tx *sql.Tx, snapshotID string, fileCount, chunkCount, blobCount, totalSize, blobSize int64) error {
compressionRatio := 1.0
if totalSize > 0 {
compressionRatio = float64(blobSize) / float64(totalSize)
}
query := `
UPDATE snapshots
SET file_count = ?,
chunk_count = ?,
blob_count = ?,
total_size = ?,
blob_size = ?,
compression_ratio = ?
WHERE id = ?
`
var err error
if tx != nil {
_, err = tx.ExecContext(ctx, query, fileCount, chunkCount, blobCount, totalSize, blobSize, compressionRatio, snapshotID)
} else {
_, err = r.db.ExecWithLog(ctx, query, fileCount, chunkCount, blobCount, totalSize, blobSize, compressionRatio, snapshotID)
}
if err != nil {
return fmt.Errorf("updating snapshot: %w", err)
}
return nil
}
// UpdateExtendedStats updates extended statistics for a snapshot
func (r *SnapshotRepository) UpdateExtendedStats(ctx context.Context, tx *sql.Tx, snapshotID string, blobUncompressedSize int64, compressionLevel int, uploadDurationMs int64) error {
// Calculate compression ratio based on uncompressed vs compressed sizes
var compressionRatio float64
if blobUncompressedSize > 0 {
// Get current blob_size from DB to calculate ratio
var blobSize int64
queryGet := `SELECT blob_size FROM snapshots WHERE id = ?`
if tx != nil {
err := tx.QueryRowContext(ctx, queryGet, snapshotID).Scan(&blobSize)
if err != nil {
return fmt.Errorf("getting blob size: %w", err)
}
} else {
err := r.db.conn.QueryRowContext(ctx, queryGet, snapshotID).Scan(&blobSize)
if err != nil {
return fmt.Errorf("getting blob size: %w", err)
}
}
compressionRatio = float64(blobSize) / float64(blobUncompressedSize)
} else {
compressionRatio = 1.0
}
query := `
UPDATE snapshots
SET blob_uncompressed_size = ?,
compression_ratio = ?,
compression_level = ?,
upload_bytes = blob_size,
upload_duration_ms = ?
WHERE id = ?
`
var err error
if tx != nil {
_, err = tx.ExecContext(ctx, query, blobUncompressedSize, compressionRatio, compressionLevel, uploadDurationMs, snapshotID)
} else {
_, err = r.db.ExecWithLog(ctx, query, blobUncompressedSize, compressionRatio, compressionLevel, uploadDurationMs, snapshotID)
}
if err != nil {
return fmt.Errorf("updating extended stats: %w", err)
}
return nil
}
func (r *SnapshotRepository) GetByID(ctx context.Context, snapshotID string) (*Snapshot, error) {
query := `
SELECT id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at,
file_count, chunk_count, blob_count, total_size, blob_size, blob_uncompressed_size,
compression_ratio, compression_level, upload_bytes, upload_duration_ms
FROM snapshots
WHERE id = ?
`
var snapshot Snapshot
var startedAtUnix int64
var completedAtUnix *int64
err := r.db.conn.QueryRowContext(ctx, query, snapshotID).Scan(
&snapshot.ID,
&snapshot.Hostname,
&snapshot.VaultikVersion,
&snapshot.VaultikGitRevision,
&startedAtUnix,
&completedAtUnix,
&snapshot.FileCount,
&snapshot.ChunkCount,
&snapshot.BlobCount,
&snapshot.TotalSize,
&snapshot.BlobSize,
&snapshot.BlobUncompressedSize,
&snapshot.CompressionRatio,
&snapshot.CompressionLevel,
&snapshot.UploadBytes,
&snapshot.UploadDurationMs,
)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("querying snapshot: %w", err)
}
snapshot.StartedAt = time.Unix(startedAtUnix, 0).UTC()
if completedAtUnix != nil {
t := time.Unix(*completedAtUnix, 0).UTC()
snapshot.CompletedAt = &t
}
return &snapshot, nil
}
func (r *SnapshotRepository) ListRecent(ctx context.Context, limit int) ([]*Snapshot, error) {
query := `
SELECT id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at, file_count, chunk_count, blob_count, total_size, blob_size, compression_ratio
FROM snapshots
ORDER BY started_at DESC
LIMIT ?
`
rows, err := r.db.conn.QueryContext(ctx, query, limit)
if err != nil {
return nil, fmt.Errorf("querying snapshots: %w", err)
}
defer CloseRows(rows)
var snapshots []*Snapshot
for rows.Next() {
var snapshot Snapshot
var startedAtUnix int64
var completedAtUnix *int64
err := rows.Scan(
&snapshot.ID,
&snapshot.Hostname,
&snapshot.VaultikVersion,
&snapshot.VaultikGitRevision,
&startedAtUnix,
&completedAtUnix,
&snapshot.FileCount,
&snapshot.ChunkCount,
&snapshot.BlobCount,
&snapshot.TotalSize,
&snapshot.BlobSize,
&snapshot.CompressionRatio,
)
if err != nil {
return nil, fmt.Errorf("scanning snapshot: %w", err)
}
snapshot.StartedAt = time.Unix(startedAtUnix, 0)
if completedAtUnix != nil {
t := time.Unix(*completedAtUnix, 0)
snapshot.CompletedAt = &t
}
snapshots = append(snapshots, &snapshot)
}
return snapshots, rows.Err()
}
// MarkComplete marks a snapshot as completed with the current timestamp
func (r *SnapshotRepository) MarkComplete(ctx context.Context, tx *sql.Tx, snapshotID string) error {
query := `
UPDATE snapshots
SET completed_at = ?
WHERE id = ?
`
completedAt := time.Now().UTC().Unix()
var err error
if tx != nil {
_, err = tx.ExecContext(ctx, query, completedAt, snapshotID)
} else {
_, err = r.db.ExecWithLog(ctx, query, completedAt, snapshotID)
}
if err != nil {
return fmt.Errorf("marking snapshot complete: %w", err)
}
return nil
}
// AddFile adds a file to a snapshot
func (r *SnapshotRepository) AddFile(ctx context.Context, tx *sql.Tx, snapshotID string, filePath string) error {
query := `
INSERT OR IGNORE INTO snapshot_files (snapshot_id, file_id)
SELECT ?, id FROM files WHERE path = ?
`
var err error
if tx != nil {
_, err = tx.ExecContext(ctx, query, snapshotID, filePath)
} else {
_, err = r.db.ExecWithLog(ctx, query, snapshotID, filePath)
}
if err != nil {
return fmt.Errorf("adding file to snapshot: %w", err)
}
return nil
}
// AddFileByID adds a file to a snapshot by file ID
func (r *SnapshotRepository) AddFileByID(ctx context.Context, tx *sql.Tx, snapshotID string, fileID types.FileID) error {
query := `
INSERT OR IGNORE INTO snapshot_files (snapshot_id, file_id)
VALUES (?, ?)
`
var err error
if tx != nil {
_, err = tx.ExecContext(ctx, query, snapshotID, fileID.String())
} else {
_, err = r.db.ExecWithLog(ctx, query, snapshotID, fileID.String())
}
if err != nil {
return fmt.Errorf("adding file to snapshot: %w", err)
}
return nil
}
// AddFilesByIDBatch adds multiple files to a snapshot in batched inserts
func (r *SnapshotRepository) AddFilesByIDBatch(ctx context.Context, tx *sql.Tx, snapshotID string, fileIDs []types.FileID) error {
if len(fileIDs) == 0 {
return nil
}
// Each entry has 2 values, so batch at 400 to be safe
const batchSize = 400
for i := 0; i < len(fileIDs); i += batchSize {
end := i + batchSize
if end > len(fileIDs) {
end = len(fileIDs)
}
batch := fileIDs[i:end]
query := "INSERT OR IGNORE INTO snapshot_files (snapshot_id, file_id) VALUES "
args := make([]interface{}, 0, len(batch)*2)
for j, fileID := range batch {
if j > 0 {
query += ", "
}
query += "(?, ?)"
args = append(args, snapshotID, fileID.String())
}
var err error
if tx != nil {
_, err = tx.ExecContext(ctx, query, args...)
} else {
_, err = r.db.ExecWithLog(ctx, query, args...)
}
if err != nil {
return fmt.Errorf("batch adding files to snapshot: %w", err)
}
}
return nil
}
// PopulateReferencedBlobs ensures snapshot_blobs contains an entry for
// every blob that holds a chunk referenced by any file in the snapshot.
// This is necessary because the AddBlob hook only runs when a blob is
// newly uploaded during a snapshot — fully-deduplicated snapshots (where
// every chunk already exists in storage from a prior run) would otherwise
// have an empty snapshot_blobs set and be impossible to restore.
//
// Returns the number of rows inserted (i.e. blobs that were previously
// referenced indirectly via file_chunks but not yet recorded in
// snapshot_blobs for this snapshot).
func (r *SnapshotRepository) PopulateReferencedBlobs(ctx context.Context, tx *sql.Tx, snapshotID string) (int64, error) {
query := `
INSERT OR IGNORE INTO snapshot_blobs (snapshot_id, blob_id, blob_hash)
SELECT DISTINCT ?, blobs.id, blobs.blob_hash
FROM blobs
JOIN blob_chunks ON blob_chunks.blob_id = blobs.id
JOIN file_chunks ON file_chunks.chunk_hash = blob_chunks.chunk_hash
JOIN snapshot_files ON snapshot_files.file_id = file_chunks.file_id
WHERE snapshot_files.snapshot_id = ?
AND blobs.blob_hash IS NOT NULL
`
var result sql.Result
var err error
if tx != nil {
result, err = tx.ExecContext(ctx, query, snapshotID, snapshotID)
} else {
result, err = r.db.ExecWithLog(ctx, query, snapshotID, snapshotID)
}
if err != nil {
return 0, fmt.Errorf("populating referenced blobs: %w", err)
}
n, _ := result.RowsAffected()
return n, nil
}
// AddBlob adds a blob to a snapshot
func (r *SnapshotRepository) AddBlob(ctx context.Context, tx *sql.Tx, snapshotID string, blobID types.BlobID, blobHash types.BlobHash) error {
query := `
INSERT OR IGNORE INTO snapshot_blobs (snapshot_id, blob_id, blob_hash)
VALUES (?, ?, ?)
`
var err error
if tx != nil {
_, err = tx.ExecContext(ctx, query, snapshotID, blobID.String(), blobHash.String())
} else {
_, err = r.db.ExecWithLog(ctx, query, snapshotID, blobID.String(), blobHash.String())
}
if err != nil {
return fmt.Errorf("adding blob to snapshot: %w", err)
}
return nil
}
// GetBlobHashes returns all blob hashes for a snapshot
func (r *SnapshotRepository) GetBlobHashes(ctx context.Context, snapshotID string) ([]string, error) {
query := `
SELECT sb.blob_hash
FROM snapshot_blobs sb
WHERE sb.snapshot_id = ?
ORDER BY sb.blob_hash
`
rows, err := r.db.conn.QueryContext(ctx, query, snapshotID)
if err != nil {
return nil, fmt.Errorf("querying blob hashes: %w", err)
}
defer CloseRows(rows)
var blobs []string
for rows.Next() {
var blobHash string
if err := rows.Scan(&blobHash); err != nil {
return nil, fmt.Errorf("scanning blob hash: %w", err)
}
blobs = append(blobs, blobHash)
}
return blobs, rows.Err()
}
// GetSnapshotTotalCompressedSize returns the total compressed size of all blobs referenced by a snapshot
func (r *SnapshotRepository) GetSnapshotTotalCompressedSize(ctx context.Context, snapshotID string) (int64, error) {
query := `
SELECT COALESCE(SUM(b.compressed_size), 0)
FROM snapshot_blobs sb
JOIN blobs b ON sb.blob_hash = b.blob_hash
WHERE sb.snapshot_id = ?
`
var totalSize int64
err := r.db.conn.QueryRowContext(ctx, query, snapshotID).Scan(&totalSize)
if err != nil {
return 0, fmt.Errorf("querying total compressed size: %w", err)
}
return totalSize, nil
}
// GetSnapshotUncompressedChunkSize returns the sum of plaintext sizes of all unique
// chunks referenced by a snapshot (via snapshot_files → file_chunks → chunks).
func (r *SnapshotRepository) GetSnapshotUncompressedChunkSize(ctx context.Context, snapshotID string) (int64, error) {
query := `
SELECT COALESCE(SUM(c.size), 0)
FROM (
SELECT DISTINCT fc.chunk_hash
FROM snapshot_files sf
JOIN file_chunks fc ON sf.file_id = fc.file_id
WHERE sf.snapshot_id = ?
) sc
JOIN chunks c ON sc.chunk_hash = c.chunk_hash
`
var totalSize int64
err := r.db.conn.QueryRowContext(ctx, query, snapshotID).Scan(&totalSize)
if err != nil {
return 0, fmt.Errorf("querying uncompressed chunk size: %w", err)
}
return totalSize, nil
}
// GetSnapshotNewChunkSize returns the sum of plaintext sizes of chunks that are
// referenced by this snapshot but not by any earlier completed snapshot known to
// the local database. The result is the marginal uncompressed data this snapshot
// added to the dedup pool — i.e., the delta from prior snapshots.
func (r *SnapshotRepository) GetSnapshotNewChunkSize(ctx context.Context, snapshotID string) (int64, error) {
query := `
WITH this_snap_chunks AS (
SELECT DISTINCT fc.chunk_hash
FROM snapshot_files sf
JOIN file_chunks fc ON sf.file_id = fc.file_id
WHERE sf.snapshot_id = ?
),
prior_chunks AS (
SELECT DISTINCT fc.chunk_hash
FROM snapshots s
JOIN snapshot_files sf ON sf.snapshot_id = s.id
JOIN file_chunks fc ON fc.file_id = sf.file_id
WHERE s.completed_at IS NOT NULL
AND s.id != ?
AND s.started_at < (SELECT started_at FROM snapshots WHERE id = ?)
)
SELECT COALESCE(SUM(c.size), 0)
FROM chunks c
JOIN this_snap_chunks t ON c.chunk_hash = t.chunk_hash
WHERE c.chunk_hash NOT IN (SELECT chunk_hash FROM prior_chunks)
`
var totalSize int64
err := r.db.conn.QueryRowContext(ctx, query, snapshotID, snapshotID, snapshotID).Scan(&totalSize)
if err != nil {
return 0, fmt.Errorf("querying new chunk size: %w", err)
}
return totalSize, nil
}
// GetIncompleteSnapshots returns all snapshots that haven't been completed
func (r *SnapshotRepository) GetIncompleteSnapshots(ctx context.Context) ([]*Snapshot, error) {
query := `
SELECT id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at, file_count, chunk_count, blob_count, total_size, blob_size, compression_ratio
FROM snapshots
WHERE completed_at IS NULL
ORDER BY started_at DESC
`
rows, err := r.db.conn.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("querying incomplete snapshots: %w", err)
}
defer CloseRows(rows)
var snapshots []*Snapshot
for rows.Next() {
var snapshot Snapshot
var startedAtUnix int64
var completedAtUnix *int64
err := rows.Scan(
&snapshot.ID,
&snapshot.Hostname,
&snapshot.VaultikVersion,
&snapshot.VaultikGitRevision,
&startedAtUnix,
&completedAtUnix,
&snapshot.FileCount,
&snapshot.ChunkCount,
&snapshot.BlobCount,
&snapshot.TotalSize,
&snapshot.BlobSize,
&snapshot.CompressionRatio,
)
if err != nil {
return nil, fmt.Errorf("scanning snapshot: %w", err)
}
snapshot.StartedAt = time.Unix(startedAtUnix, 0)
if completedAtUnix != nil {
t := time.Unix(*completedAtUnix, 0)
snapshot.CompletedAt = &t
}
snapshots = append(snapshots, &snapshot)
}
return snapshots, rows.Err()
}
// GetIncompleteByHostname returns all incomplete snapshots for a specific hostname
func (r *SnapshotRepository) GetIncompleteByHostname(ctx context.Context, hostname string) ([]*Snapshot, error) {
query := `
SELECT id, hostname, vaultik_version, vaultik_git_revision, started_at, completed_at, file_count, chunk_count, blob_count, total_size, blob_size, compression_ratio
FROM snapshots
WHERE completed_at IS NULL AND hostname = ?
ORDER BY started_at DESC
`
rows, err := r.db.conn.QueryContext(ctx, query, hostname)
if err != nil {
return nil, fmt.Errorf("querying incomplete snapshots: %w", err)
}
defer CloseRows(rows)
var snapshots []*Snapshot
for rows.Next() {
var snapshot Snapshot
var startedAtUnix int64
var completedAtUnix *int64
err := rows.Scan(
&snapshot.ID,
&snapshot.Hostname,
&snapshot.VaultikVersion,
&snapshot.VaultikGitRevision,
&startedAtUnix,
&completedAtUnix,
&snapshot.FileCount,
&snapshot.ChunkCount,
&snapshot.BlobCount,
&snapshot.TotalSize,
&snapshot.BlobSize,
&snapshot.CompressionRatio,
)
if err != nil {
return nil, fmt.Errorf("scanning snapshot: %w", err)
}
snapshot.StartedAt = time.Unix(startedAtUnix, 0).UTC()
if completedAtUnix != nil {
t := time.Unix(*completedAtUnix, 0).UTC()
snapshot.CompletedAt = &t
}
snapshots = append(snapshots, &snapshot)
}
return snapshots, rows.Err()
}
// Delete removes a snapshot record
func (r *SnapshotRepository) Delete(ctx context.Context, snapshotID string) error {
query := `DELETE FROM snapshots WHERE id = ?`
_, err := r.db.ExecWithLog(ctx, query, snapshotID)
if err != nil {
return fmt.Errorf("deleting snapshot: %w", err)
}
return nil
}
// DeleteSnapshotFiles removes all snapshot_files entries for a snapshot
func (r *SnapshotRepository) DeleteSnapshotFiles(ctx context.Context, snapshotID string) error {
query := `DELETE FROM snapshot_files WHERE snapshot_id = ?`
_, err := r.db.ExecWithLog(ctx, query, snapshotID)
if err != nil {
return fmt.Errorf("deleting snapshot files: %w", err)
}
return nil
}
// DeleteSnapshotBlobs removes all snapshot_blobs entries for a snapshot
func (r *SnapshotRepository) DeleteSnapshotBlobs(ctx context.Context, snapshotID string) error {
query := `DELETE FROM snapshot_blobs WHERE snapshot_id = ?`
_, err := r.db.ExecWithLog(ctx, query, snapshotID)
if err != nil {
return fmt.Errorf("deleting snapshot blobs: %w", err)
}
return nil
}
// DeleteSnapshotUploads removes all uploads entries for a snapshot
func (r *SnapshotRepository) DeleteSnapshotUploads(ctx context.Context, snapshotID string) error {
query := `DELETE FROM uploads WHERE snapshot_id = ?`
_, err := r.db.ExecWithLog(ctx, query, snapshotID)
if err != nil {
return fmt.Errorf("deleting snapshot uploads: %w", err)
}
return nil
}