Merge feature/snapshot-ls-delta-column
All checks were successful
check / check (push) Successful in 2m37s
All checks were successful
check / check (push) Successful in 2m37s
This commit is contained in:
@@ -434,6 +434,65 @@ func (r *SnapshotRepository) GetSnapshotTotalCompressedSize(ctx context.Context,
|
|||||||
return totalSize, nil
|
return totalSize, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetSnapshotUncompressedChunkSize returns the sum of plaintext sizes of all unique
|
||||||
|
// chunks referenced by a snapshot (via snapshot_files → file_chunks → chunks).
|
||||||
|
func (r *SnapshotRepository) GetSnapshotUncompressedChunkSize(ctx context.Context, snapshotID string) (int64, error) {
|
||||||
|
query := `
|
||||||
|
SELECT COALESCE(SUM(c.size), 0)
|
||||||
|
FROM (
|
||||||
|
SELECT DISTINCT fc.chunk_hash
|
||||||
|
FROM snapshot_files sf
|
||||||
|
JOIN file_chunks fc ON sf.file_id = fc.file_id
|
||||||
|
WHERE sf.snapshot_id = ?
|
||||||
|
) sc
|
||||||
|
JOIN chunks c ON sc.chunk_hash = c.chunk_hash
|
||||||
|
`
|
||||||
|
|
||||||
|
var totalSize int64
|
||||||
|
err := r.db.conn.QueryRowContext(ctx, query, snapshotID).Scan(&totalSize)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("querying uncompressed chunk size: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalSize, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSnapshotNewChunkSize returns the sum of plaintext sizes of chunks that are
|
||||||
|
// referenced by this snapshot but not by any earlier completed snapshot known to
|
||||||
|
// the local database. The result is the marginal uncompressed data this snapshot
|
||||||
|
// added to the dedup pool — i.e., the delta from prior snapshots.
|
||||||
|
func (r *SnapshotRepository) GetSnapshotNewChunkSize(ctx context.Context, snapshotID string) (int64, error) {
|
||||||
|
query := `
|
||||||
|
WITH this_snap_chunks AS (
|
||||||
|
SELECT DISTINCT fc.chunk_hash
|
||||||
|
FROM snapshot_files sf
|
||||||
|
JOIN file_chunks fc ON sf.file_id = fc.file_id
|
||||||
|
WHERE sf.snapshot_id = ?
|
||||||
|
),
|
||||||
|
prior_chunks AS (
|
||||||
|
SELECT DISTINCT fc.chunk_hash
|
||||||
|
FROM snapshots s
|
||||||
|
JOIN snapshot_files sf ON sf.snapshot_id = s.id
|
||||||
|
JOIN file_chunks fc ON fc.file_id = sf.file_id
|
||||||
|
WHERE s.completed_at IS NOT NULL
|
||||||
|
AND s.id != ?
|
||||||
|
AND s.started_at < (SELECT started_at FROM snapshots WHERE id = ?)
|
||||||
|
)
|
||||||
|
SELECT COALESCE(SUM(c.size), 0)
|
||||||
|
FROM chunks c
|
||||||
|
JOIN this_snap_chunks t ON c.chunk_hash = t.chunk_hash
|
||||||
|
WHERE c.chunk_hash NOT IN (SELECT chunk_hash FROM prior_chunks)
|
||||||
|
`
|
||||||
|
|
||||||
|
var totalSize int64
|
||||||
|
err := r.db.conn.QueryRowContext(ctx, query, snapshotID, snapshotID, snapshotID).Scan(&totalSize)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("querying new chunk size: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalSize, nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetIncompleteSnapshots returns all snapshots that haven't been completed
|
// GetIncompleteSnapshots returns all snapshots that haven't been completed
|
||||||
func (r *SnapshotRepository) GetIncompleteSnapshots(ctx context.Context) ([]*Snapshot, error) {
|
func (r *SnapshotRepository) GetIncompleteSnapshots(ctx context.Context) ([]*Snapshot, error) {
|
||||||
query := `
|
query := `
|
||||||
|
|||||||
@@ -10,11 +10,17 @@ import (
|
|||||||
"sneak.berlin/go/vaultik/internal/types"
|
"sneak.berlin/go/vaultik/internal/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SnapshotInfo contains information about a snapshot
|
// SnapshotInfo contains information about a snapshot.
|
||||||
|
// UncompressedSize and NewChunkSize are populated only when the snapshot
|
||||||
|
// is present in the local database; LocallyTracked indicates whether
|
||||||
|
// those values are meaningful.
|
||||||
type SnapshotInfo struct {
|
type SnapshotInfo struct {
|
||||||
ID types.SnapshotID `json:"id"`
|
ID types.SnapshotID `json:"id"`
|
||||||
Timestamp time.Time `json:"timestamp"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
CompressedSize int64 `json:"compressed_size"`
|
CompressedSize int64 `json:"compressed_size"`
|
||||||
|
UncompressedSize int64 `json:"uncompressed_size,omitempty"`
|
||||||
|
NewChunkSize int64 `json:"new_chunk_size,omitempty"`
|
||||||
|
LocallyTracked bool `json:"locally_tracked"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// formatBytes formats bytes in a human-readable format
|
// formatBytes formats bytes in a human-readable format
|
||||||
|
|||||||
@@ -481,10 +481,23 @@ func (v *Vaultik) buildSnapshotInfoList(remoteSnapshots map[string]bool, localSn
|
|||||||
totalSize = localSnap.BlobSize
|
totalSize = localSnap.BlobSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uncompressedSize, err := v.Repositories.Snapshots.GetSnapshotUncompressedChunkSize(v.ctx, snapshotID)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Failed to get uncompressed chunk size", "id", snapshotID, "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
newChunkSize, err := v.Repositories.Snapshots.GetSnapshotNewChunkSize(v.ctx, snapshotID)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Failed to get new chunk size", "id", snapshotID, "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
snapshots = append(snapshots, SnapshotInfo{
|
snapshots = append(snapshots, SnapshotInfo{
|
||||||
ID: localSnap.ID,
|
ID: localSnap.ID,
|
||||||
Timestamp: localSnap.StartedAt,
|
Timestamp: localSnap.StartedAt,
|
||||||
CompressedSize: totalSize,
|
CompressedSize: totalSize,
|
||||||
|
UncompressedSize: uncompressedSize,
|
||||||
|
NewChunkSize: newChunkSize,
|
||||||
|
LocallyTracked: true,
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
timestamp, err := parseSnapshotTimestamp(snapshotID)
|
timestamp, err := parseSnapshotTimestamp(snapshotID)
|
||||||
@@ -498,6 +511,7 @@ func (v *Vaultik) buildSnapshotInfoList(remoteSnapshots map[string]bool, localSn
|
|||||||
ID: types.SnapshotID(snapshotID),
|
ID: types.SnapshotID(snapshotID),
|
||||||
Timestamp: timestamp,
|
Timestamp: timestamp,
|
||||||
CompressedSize: 0,
|
CompressedSize: 0,
|
||||||
|
LocallyTracked: false,
|
||||||
})
|
})
|
||||||
remoteOnly = append(remoteOnly, snapshotID)
|
remoteOnly = append(remoteOnly, snapshotID)
|
||||||
}
|
}
|
||||||
@@ -593,18 +607,27 @@ func (v *Vaultik) printSnapshotTable(snapshots []SnapshotInfo) error {
|
|||||||
if _, err := fmt.Fprintln(w, "REMOTE SNAPSHOTS:"); err != nil {
|
if _, err := fmt.Fprintln(w, "REMOTE SNAPSHOTS:"); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err := fmt.Fprintln(w, "SNAPSHOT ID\tTIMESTAMP\tCOMPRESSED SIZE"); err != nil {
|
if _, err := fmt.Fprintln(w, "SNAPSHOT ID\tTIMESTAMP\tCOMPRESSED SIZE\tUNCOMPRESSED SIZE\tNEW CHUNK SIZE"); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err := fmt.Fprintln(w, "───────────\t─────────\t───────────────"); err != nil {
|
if _, err := fmt.Fprintln(w, "───────────\t─────────\t───────────────\t─────────────────\t──────────────"); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const remoteOnlyCell = "<remote only>"
|
||||||
for _, snap := range snapshots {
|
for _, snap := range snapshots {
|
||||||
if _, err := fmt.Fprintf(w, "%s\t%s\t%s\n",
|
uncompressed := remoteOnlyCell
|
||||||
|
newChunks := remoteOnlyCell
|
||||||
|
if snap.LocallyTracked {
|
||||||
|
uncompressed = formatBytes(snap.UncompressedSize)
|
||||||
|
newChunks = formatBytes(snap.NewChunkSize)
|
||||||
|
}
|
||||||
|
if _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n",
|
||||||
snap.ID,
|
snap.ID,
|
||||||
snap.Timestamp.Format("2006-01-02 15:04:05"),
|
snap.Timestamp.Format("2006-01-02 15:04:05"),
|
||||||
formatBytes(snap.CompressedSize)); err != nil {
|
formatBytes(snap.CompressedSize),
|
||||||
|
uncompressed,
|
||||||
|
newChunks); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user