Compare commits
30 Commits
fix/ctime-
...
2e2b02a056
| Author | SHA1 | Date | |
|---|---|---|---|
| 2e2b02a056 | |||
| 0b95cb4308 | |||
| 4a3e61f8e1 | |||
| 6fbcac0cd8 | |||
| 34f73f72d8 | |||
| ee240faa32 | |||
| f719ab3adc | |||
| 1a8baf7491 | |||
| 7d5d3fa598 | |||
| ac5d2f4a0d | |||
| b250ddfa94 | |||
| fe3ad13a91 | |||
| ebd6619638 | |||
| 20d3a9ac8c | |||
| 0889cf2804 | |||
| f9ebb4bf25 | |||
| 9f2d722734 | |||
| 6821215b0e | |||
| f97a1dc2eb | |||
| 18c14d1507 | |||
| 65da291ddf | |||
| dcf3ec399a | |||
| 495dede1bc | |||
| 1c72a37bc8 | |||
| 60b6746db9 | |||
| f28c8a73b7 | |||
| 1c0f5b8eb2 | |||
| 689109a2b8 | |||
| ac2f21a89d | |||
| 8c59f55096 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -1,5 +1,5 @@
|
||||
# Binary
|
||||
vaultik
|
||||
/vaultik
|
||||
|
||||
# Test artifacts
|
||||
*.out
|
||||
|
||||
55
.goreleaser.yaml
Normal file
55
.goreleaser.yaml
Normal file
@@ -0,0 +1,55 @@
|
||||
version: 2
|
||||
|
||||
project_name: vaultik
|
||||
|
||||
before:
|
||||
hooks:
|
||||
- go mod tidy
|
||||
|
||||
builds:
|
||||
- id: vaultik
|
||||
main: ./cmd/vaultik
|
||||
binary: vaultik
|
||||
env:
|
||||
- CGO_ENABLED=0
|
||||
goos:
|
||||
- linux
|
||||
- darwin
|
||||
goarch:
|
||||
- amd64
|
||||
- arm64
|
||||
ldflags:
|
||||
- -s -w
|
||||
- -X 'git.eeqj.de/sneak/vaultik/internal/globals.Version={{ .Version }}'
|
||||
- -X 'git.eeqj.de/sneak/vaultik/internal/globals.Commit={{ .Commit }}'
|
||||
|
||||
archives:
|
||||
- id: default
|
||||
name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}"
|
||||
formats:
|
||||
- tar.gz
|
||||
files:
|
||||
- LICENSE
|
||||
- README.md
|
||||
|
||||
checksum:
|
||||
name_template: "checksums.txt"
|
||||
algorithm: sha256
|
||||
|
||||
snapshot:
|
||||
version_template: "{{ incpatch .Version }}-next"
|
||||
|
||||
changelog:
|
||||
sort: asc
|
||||
use: git
|
||||
filters:
|
||||
exclude:
|
||||
- "^docs:"
|
||||
- "^test:"
|
||||
- "^chore:"
|
||||
- "Merge pull request"
|
||||
- "Merge branch"
|
||||
|
||||
release:
|
||||
draft: true
|
||||
prerelease: auto
|
||||
13
AGENTS.md
13
AGENTS.md
@@ -38,10 +38,9 @@ Version: 2025-06-08
|
||||
1. Before committing, tests must pass (`make test`), linting must pass
|
||||
(`make lint`), and code must be formatted (`make fmt`). For go, those
|
||||
makefile targets should use `go fmt` and `go test -v ./...` and
|
||||
`golangci-lint run`. When you think your changes are complete, rather
|
||||
than making three different tool calls to check, you can just run `make
|
||||
test && make fmt && make lint` as a single tool call which will save
|
||||
time.
|
||||
`golangci-lint run`. Each Makefile target does exactly one thing — to
|
||||
run lint + fmt-check + test together (the standard pre-commit gate),
|
||||
use `make check`.
|
||||
|
||||
2. Always write a `Makefile` with the default target being `test`, and with
|
||||
a `fmt` target that formats the code. The `test` target should run all
|
||||
@@ -103,3 +102,9 @@ Version: 2025-06-08
|
||||
build files are acceptable in the root, but source code and other files
|
||||
should be organized in appropriate subdirectories.
|
||||
|
||||
13. Pre-1.0: NEVER write database migrations. There are no live databases
|
||||
anywhere — every user's local index can be rebuilt from a fresh full
|
||||
backup. When the schema changes, just change `schema.sql` (and any code
|
||||
that touches the affected tables). The local index is disposable until
|
||||
1.0 ships and is tagged.
|
||||
|
||||
|
||||
@@ -53,8 +53,8 @@ The database tracks five primary entities and their relationships:
|
||||
### Entity Descriptions
|
||||
|
||||
#### File (`database.File`)
|
||||
Represents a file or directory in the backup system. Stores metadata needed for restoration:
|
||||
- Path, timestamps (mtime, ctime)
|
||||
Represents a file, directory, or symlink in the backup system. Stores metadata needed for restoration:
|
||||
- Path, source_path (for restore path stripping), mtime
|
||||
- Size, mode, ownership (uid, gid)
|
||||
- Symlink target (if applicable)
|
||||
|
||||
@@ -95,7 +95,7 @@ Maps chunks to their position within blobs:
|
||||
|
||||
#### Snapshot (`database.Snapshot`)
|
||||
Represents a point-in-time backup:
|
||||
- `ID`: Format is `{hostname}-{YYYYMMDD}-{HHMMSS}Z`
|
||||
- `ID`: Format is `{hostname}_{snapshot-name}_{RFC3339}` (e.g. `server1_home_2025-06-01T12:00:00Z`)
|
||||
- Tracks file count, chunk count, blob count, sizes, compression ratio
|
||||
- `CompletedAt`: Null until snapshot finishes successfully
|
||||
|
||||
@@ -127,7 +127,7 @@ fx.New(
|
||||
config.Module, // 5. Config
|
||||
database.Module, // 6. Database + Repositories
|
||||
log.Module, // 7. Logger initialization
|
||||
s3.Module, // 8. S3 client
|
||||
storage.Module, // 8. Storage backend (S3/file/rclone)
|
||||
snapshot.Module, // 9. SnapshotManager + ScannerFactory
|
||||
fx.Provide(vaultik.New), // 10. Vaultik orchestrator
|
||||
)
|
||||
@@ -161,7 +161,7 @@ type Vaultik struct {
|
||||
Config *config.Config
|
||||
DB *database.DB
|
||||
Repositories *database.Repositories
|
||||
S3Client *s3.Client
|
||||
Storage storage.Storer
|
||||
ScannerFactory snapshot.ScannerFactory
|
||||
SnapshotManager *snapshot.SnapshotManager
|
||||
Shutdowner fx.Shutdowner
|
||||
@@ -341,12 +341,11 @@ CreateSnapshot(opts)
|
||||
└─► SnapshotManager.ExportSnapshotMetadata()
|
||||
│
|
||||
├─► Copy database to temp file
|
||||
├─► Clean to only current snapshot data
|
||||
├─► Dump to SQL
|
||||
├─► Compress with zstd
|
||||
├─► Clean to only current snapshot data (VACUUM)
|
||||
├─► Compress binary SQLite with zstd
|
||||
├─► Encrypt with age
|
||||
├─► Upload db.zst.age to S3
|
||||
└─► Upload manifest.json.zst to S3
|
||||
├─► Upload db.zst.age to storage
|
||||
└─► Upload manifest.json.zst to storage
|
||||
```
|
||||
|
||||
## Deduplication Strategy
|
||||
@@ -368,8 +367,8 @@ bucket/
|
||||
│
|
||||
└── metadata/
|
||||
└── {snapshot-id}/
|
||||
├── db.zst.age # Encrypted database dump
|
||||
└── manifest.json.zst # Blob list (for verification)
|
||||
├── db.zst.age # Encrypted binary SQLite database
|
||||
└── manifest.json.zst # Blob list (for pruning/verification)
|
||||
```
|
||||
|
||||
## Thread Safety
|
||||
|
||||
41
Makefile
41
Makefile
@@ -1,7 +1,7 @@
|
||||
.PHONY: test fmt lint fmt-check check build clean all docker hooks
|
||||
.PHONY: all check test lint fmt fmt-check build clean deps test-coverage test-integration local install release release-snapshot docker hooks
|
||||
|
||||
# Version number
|
||||
VERSION := 0.0.1
|
||||
VERSION := 1.0.0-rc.1
|
||||
|
||||
# Build variables
|
||||
GIT_REVISION := $(shell git rev-parse HEAD 2>/dev/null || echo "unknown")
|
||||
@@ -13,37 +13,45 @@ LDFLAGS := -X 'git.eeqj.de/sneak/vaultik/internal/globals.Version=$(VERSION)' \
|
||||
# Default target
|
||||
all: vaultik
|
||||
|
||||
# Run tests
|
||||
# Combined pre-commit/CI gate: lint, format check, then tests.
|
||||
check: lint fmt-check test
|
||||
|
||||
# Run tests only.
|
||||
test:
|
||||
go test -race -timeout 30s ./...
|
||||
|
||||
# Check if code is formatted (read-only)
|
||||
# Check if code is formatted (read-only).
|
||||
fmt-check:
|
||||
@test -z "$$(gofmt -l .)" || (echo "Files not formatted:" && gofmt -l . && exit 1)
|
||||
|
||||
# Format code
|
||||
# Format code.
|
||||
fmt:
|
||||
go fmt ./...
|
||||
|
||||
# Run linter
|
||||
# Run linter only.
|
||||
lint:
|
||||
golangci-lint run ./...
|
||||
|
||||
# Build binary
|
||||
# Build binary.
|
||||
vaultik: internal/*/*.go cmd/vaultik/*.go
|
||||
go build -ldflags "$(LDFLAGS)" -o $@ ./cmd/vaultik
|
||||
|
||||
# Clean build artifacts
|
||||
# Clean build artifacts.
|
||||
clean:
|
||||
rm -f vaultik
|
||||
go clean
|
||||
|
||||
# Run tests with coverage
|
||||
# Install dependencies.
|
||||
deps:
|
||||
go mod download
|
||||
go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
|
||||
|
||||
# Run tests with coverage.
|
||||
test-coverage:
|
||||
go test -v -coverprofile=coverage.out ./...
|
||||
go tool cover -html=coverage.out -o coverage.html
|
||||
|
||||
# Run integration tests
|
||||
# Run integration tests.
|
||||
test-integration:
|
||||
go test -v -tags=integration ./...
|
||||
|
||||
@@ -54,14 +62,19 @@ local:
|
||||
install: vaultik
|
||||
cp ./vaultik $(HOME)/bin/
|
||||
|
||||
# Run all checks (formatting, linting, tests) without modifying files
|
||||
check: fmt-check lint test
|
||||
# Build and publish release artifacts (linux/darwin × amd64/arm64) via goreleaser.
|
||||
release:
|
||||
goreleaser release --clean
|
||||
|
||||
# Build Docker image
|
||||
# Dry-run a release build without publishing or tagging.
|
||||
release-snapshot:
|
||||
goreleaser release --clean --snapshot
|
||||
|
||||
# Build Docker image.
|
||||
docker:
|
||||
docker build -t vaultik .
|
||||
|
||||
# Install pre-commit hook
|
||||
# Install pre-commit hook.
|
||||
hooks:
|
||||
@printf '#!/bin/sh\nset -e\n' > .git/hooks/pre-commit
|
||||
@printf 'go mod tidy\ngo fmt ./...\ngit diff --exit-code -- go.mod go.sum || { echo "go mod tidy changed files; please stage and retry"; exit 1; }\n' >> .git/hooks/pre-commit
|
||||
|
||||
556
PROCESS.md
556
PROCESS.md
@@ -1,556 +0,0 @@
|
||||
# Vaultik Snapshot Creation Process
|
||||
|
||||
This document describes the lifecycle of objects during snapshot creation, with a focus on database transactions and foreign key constraints.
|
||||
|
||||
## Database Schema Overview
|
||||
|
||||
### Tables and Foreign Key Dependencies
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────────┐
|
||||
│ FOREIGN KEY GRAPH │
|
||||
│ │
|
||||
│ snapshots ◄────── snapshot_files ────────► files │
|
||||
│ │ │ │
|
||||
│ └───────── snapshot_blobs ────────► blobs │ │
|
||||
│ │ │ │
|
||||
│ │ ├──► file_chunks ◄── chunks│
|
||||
│ │ │ ▲ │
|
||||
│ │ └──► chunk_files ────┘ │
|
||||
│ │ │
|
||||
│ └──► blob_chunks ─────────────┘│
|
||||
│ │
|
||||
│ uploads ───────► blobs.blob_hash │
|
||||
│ └──────────► snapshots.id │
|
||||
└─────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Critical Constraint: `chunks` Must Exist First
|
||||
|
||||
These tables reference `chunks.chunk_hash` **without CASCADE**:
|
||||
- `file_chunks.chunk_hash` → `chunks.chunk_hash`
|
||||
- `chunk_files.chunk_hash` → `chunks.chunk_hash`
|
||||
- `blob_chunks.chunk_hash` → `chunks.chunk_hash`
|
||||
|
||||
**Implication**: A chunk record MUST be committed to the database BEFORE any of these referencing records can be created.
|
||||
|
||||
### Order of Operations Required by Schema
|
||||
|
||||
```
|
||||
1. snapshots (created first, before scan)
|
||||
2. blobs (created when packer starts new blob)
|
||||
3. chunks (created during file processing)
|
||||
4. blob_chunks (created immediately after chunk added to packer)
|
||||
5. files (created after file fully chunked)
|
||||
6. file_chunks (created with file record)
|
||||
7. chunk_files (created with file record)
|
||||
8. snapshot_files (created with file record)
|
||||
9. snapshot_blobs (created after blob uploaded)
|
||||
10. uploads (created after blob uploaded)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Snapshot Creation Phases
|
||||
|
||||
### Phase 0: Initialization
|
||||
|
||||
**Actions:**
|
||||
1. Snapshot record created in database (Transaction T0)
|
||||
2. Known files loaded into memory from `files` table
|
||||
3. Known chunks loaded into memory from `chunks` table
|
||||
|
||||
**Transactions:**
|
||||
```
|
||||
T0: INSERT INTO snapshots (id, hostname, ...) VALUES (...)
|
||||
COMMIT
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Phase 1: Scan Directory
|
||||
|
||||
**Actions:**
|
||||
1. Walk filesystem directory tree
|
||||
2. For each file, compare against in-memory `knownFiles` map
|
||||
3. Classify files as: unchanged, new, or modified
|
||||
4. Collect unchanged file IDs for later association
|
||||
5. Collect new/modified files for processing
|
||||
|
||||
**Transactions:**
|
||||
```
|
||||
(None during scan - all in-memory)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Phase 1b: Associate Unchanged Files
|
||||
|
||||
**Actions:**
|
||||
1. For unchanged files, add entries to `snapshot_files` table
|
||||
2. Done in batches of 1000
|
||||
|
||||
**Transactions:**
|
||||
```
|
||||
For each batch of 1000 file IDs:
|
||||
T: BEGIN
|
||||
INSERT INTO snapshot_files (snapshot_id, file_id) VALUES (?, ?)
|
||||
... (up to 1000 inserts)
|
||||
COMMIT
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Phase 2: Process Files
|
||||
|
||||
For each file that needs processing:
|
||||
|
||||
#### Step 2a: Open and Chunk File
|
||||
|
||||
**Location:** `processFileStreaming()`
|
||||
|
||||
For each chunk produced by content-defined chunking:
|
||||
|
||||
##### Step 2a-1: Check Chunk Existence
|
||||
```go
|
||||
chunkExists := s.chunkExists(chunk.Hash) // In-memory lookup
|
||||
```
|
||||
|
||||
##### Step 2a-2: Create Chunk Record (if new)
|
||||
```go
|
||||
// TRANSACTION: Create chunk in database
|
||||
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
dbChunk := &database.Chunk{ChunkHash: chunk.Hash, Size: chunk.Size}
|
||||
return s.repos.Chunks.Create(txCtx, tx, dbChunk)
|
||||
})
|
||||
// COMMIT immediately after WithTx returns
|
||||
|
||||
// Update in-memory cache
|
||||
s.addKnownChunk(chunk.Hash)
|
||||
```
|
||||
|
||||
**Transaction:**
|
||||
```
|
||||
T_chunk: BEGIN
|
||||
INSERT INTO chunks (chunk_hash, size) VALUES (?, ?)
|
||||
COMMIT
|
||||
```
|
||||
|
||||
##### Step 2a-3: Add Chunk to Packer
|
||||
|
||||
```go
|
||||
s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data})
|
||||
```
|
||||
|
||||
**Inside packer.AddChunk → addChunkToCurrentBlob():**
|
||||
|
||||
```go
|
||||
// TRANSACTION: Create blob_chunks record IMMEDIATELY
|
||||
if p.repos != nil {
|
||||
blobChunk := &database.BlobChunk{
|
||||
BlobID: p.currentBlob.id,
|
||||
ChunkHash: chunk.Hash,
|
||||
Offset: offset,
|
||||
Length: chunkSize,
|
||||
}
|
||||
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
|
||||
return p.repos.BlobChunks.Create(ctx, tx, blobChunk)
|
||||
})
|
||||
// COMMIT immediately
|
||||
}
|
||||
```
|
||||
|
||||
**Transaction:**
|
||||
```
|
||||
T_blob_chunk: BEGIN
|
||||
INSERT INTO blob_chunks (blob_id, chunk_hash, offset, length) VALUES (?, ?, ?, ?)
|
||||
COMMIT
|
||||
```
|
||||
|
||||
**⚠️ CRITICAL DEPENDENCY**: This transaction requires `chunks.chunk_hash` to exist (FK constraint).
|
||||
The chunk MUST be committed in Step 2a-2 BEFORE this can succeed.
|
||||
|
||||
---
|
||||
|
||||
#### Step 2b: Blob Size Limit Handling
|
||||
|
||||
If adding a chunk would exceed blob size limit:
|
||||
|
||||
```go
|
||||
if err == blob.ErrBlobSizeLimitExceeded {
|
||||
if err := s.packer.FinalizeBlob(); err != nil { ... }
|
||||
// Retry adding the chunk
|
||||
if err := s.packer.AddChunk(...); err != nil { ... }
|
||||
}
|
||||
```
|
||||
|
||||
**FinalizeBlob() transactions:**
|
||||
```
|
||||
T_blob_finish: BEGIN
|
||||
UPDATE blobs SET blob_hash=?, uncompressed_size=?, compressed_size=?, finished_ts=? WHERE id=?
|
||||
COMMIT
|
||||
```
|
||||
|
||||
Then blob handler is called (handleBlobReady):
|
||||
```
|
||||
(Upload to S3 - no transaction)
|
||||
|
||||
T_blob_uploaded: BEGIN
|
||||
UPDATE blobs SET uploaded_ts=? WHERE id=?
|
||||
INSERT INTO snapshot_blobs (snapshot_id, blob_id, blob_hash) VALUES (?, ?, ?)
|
||||
INSERT INTO uploads (blob_hash, snapshot_id, uploaded_at, size, duration_ms) VALUES (?, ?, ?, ?, ?)
|
||||
COMMIT
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
#### Step 2c: Queue File for Batch Insertion
|
||||
|
||||
After all chunks for a file are processed:
|
||||
|
||||
```go
|
||||
// Build file data (in-memory, no DB)
|
||||
fileChunks := make([]database.FileChunk, len(chunks))
|
||||
chunkFiles := make([]database.ChunkFile, len(chunks))
|
||||
|
||||
// Queue for batch insertion
|
||||
return s.addPendingFile(ctx, pendingFileData{
|
||||
file: fileToProcess.File,
|
||||
fileChunks: fileChunks,
|
||||
chunkFiles: chunkFiles,
|
||||
})
|
||||
```
|
||||
|
||||
**No transaction yet** - just adds to `pendingFiles` slice.
|
||||
|
||||
If `len(pendingFiles) >= fileBatchSize (100)`, triggers `flushPendingFiles()`.
|
||||
|
||||
---
|
||||
|
||||
### Step 2d: Flush Pending Files
|
||||
|
||||
**Location:** `flushPendingFiles()` - called when batch is full or at end of processing
|
||||
|
||||
```go
|
||||
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
for _, data := range files {
|
||||
// 1. Create file record
|
||||
s.repos.Files.Create(txCtx, tx, data.file) // INSERT OR REPLACE
|
||||
|
||||
// 2. Delete old associations
|
||||
s.repos.FileChunks.DeleteByFileID(txCtx, tx, data.file.ID)
|
||||
s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, data.file.ID)
|
||||
|
||||
// 3. Create file_chunks records
|
||||
for _, fc := range data.fileChunks {
|
||||
s.repos.FileChunks.Create(txCtx, tx, &fc) // FK: chunks.chunk_hash
|
||||
}
|
||||
|
||||
// 4. Create chunk_files records
|
||||
for _, cf := range data.chunkFiles {
|
||||
s.repos.ChunkFiles.Create(txCtx, tx, &cf) // FK: chunks.chunk_hash
|
||||
}
|
||||
|
||||
// 5. Add file to snapshot
|
||||
s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, data.file.ID)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
// COMMIT (all or nothing for the batch)
|
||||
```
|
||||
|
||||
**Transaction:**
|
||||
```
|
||||
T_files_batch: BEGIN
|
||||
-- For each file in batch:
|
||||
INSERT OR REPLACE INTO files (...) VALUES (...)
|
||||
DELETE FROM file_chunks WHERE file_id = ?
|
||||
DELETE FROM chunk_files WHERE file_id = ?
|
||||
INSERT INTO file_chunks (file_id, idx, chunk_hash) VALUES (?, ?, ?) -- FK: chunks
|
||||
INSERT INTO chunk_files (chunk_hash, file_id, ...) VALUES (?, ?, ...) -- FK: chunks
|
||||
INSERT INTO snapshot_files (snapshot_id, file_id) VALUES (?, ?)
|
||||
-- Repeat for each file
|
||||
COMMIT
|
||||
```
|
||||
|
||||
**⚠️ CRITICAL DEPENDENCY**: `file_chunks` and `chunk_files` require `chunks.chunk_hash` to exist.
|
||||
|
||||
---
|
||||
|
||||
### Phase 2 End: Final Flush
|
||||
|
||||
```go
|
||||
// Flush any remaining pending files
|
||||
if err := s.flushAllPending(ctx); err != nil { ... }
|
||||
|
||||
// Final packer flush
|
||||
s.packer.Flush()
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## The Current Bug
|
||||
|
||||
### Problem
|
||||
|
||||
The current code attempts to batch file insertions, but `file_chunks` and `chunk_files` have foreign keys to `chunks.chunk_hash`. The batched file flush tries to insert these records, but if the chunks haven't been committed yet, the FK constraint fails.
|
||||
|
||||
### Why It's Happening
|
||||
|
||||
Looking at the sequence:
|
||||
|
||||
1. Process file A, chunk X
|
||||
2. Create chunk X in DB (Transaction commits)
|
||||
3. Add chunk X to packer
|
||||
4. Packer creates blob_chunks for chunk X (needs chunk X - OK, committed in step 2)
|
||||
5. Queue file A with chunk references
|
||||
6. Process file B, chunk Y
|
||||
7. Create chunk Y in DB (Transaction commits)
|
||||
8. ... etc ...
|
||||
9. At end: flushPendingFiles()
|
||||
10. Insert file_chunks for file A referencing chunk X (chunk X committed - should work)
|
||||
|
||||
The chunks ARE being created individually. But something is going wrong.
|
||||
|
||||
### Actual Issue
|
||||
|
||||
Wait - let me re-read the code. The issue is:
|
||||
|
||||
In `processFileStreaming`, when we queue file data:
|
||||
```go
|
||||
fileChunks[i] = database.FileChunk{
|
||||
FileID: fileToProcess.File.ID,
|
||||
Idx: ci.fileChunk.Idx,
|
||||
ChunkHash: ci.fileChunk.ChunkHash,
|
||||
}
|
||||
```
|
||||
|
||||
The `FileID` is set, but `fileToProcess.File.ID` might be empty at this point because the file record hasn't been created yet!
|
||||
|
||||
Looking at `checkFileInMemory`:
|
||||
```go
|
||||
// For new files:
|
||||
if !exists {
|
||||
return file, true // file.ID is empty string!
|
||||
}
|
||||
|
||||
// For existing files:
|
||||
file.ID = existingFile.ID // Reuse existing ID
|
||||
```
|
||||
|
||||
**For NEW files, `file.ID` is empty!**
|
||||
|
||||
Then in `flushPendingFiles`:
|
||||
```go
|
||||
s.repos.Files.Create(txCtx, tx, data.file) // This generates/uses the ID
|
||||
```
|
||||
|
||||
But `data.fileChunks` was built with the EMPTY ID!
|
||||
|
||||
### The Real Problem
|
||||
|
||||
For new files:
|
||||
1. `checkFileInMemory` creates file record with empty ID
|
||||
2. `processFileStreaming` queues file_chunks with empty `FileID`
|
||||
3. `flushPendingFiles` creates file (generates ID), but file_chunks still have empty `FileID`
|
||||
|
||||
Wait, but `Files.Create` should be INSERT OR REPLACE by path, and the file struct should get updated... Let me check.
|
||||
|
||||
Actually, looking more carefully at the code path - the file IS created first in the flush, but the `fileChunks` slice was already built with the old (possibly empty) ID. The ID isn't updated after the file is created.
|
||||
|
||||
Hmm, but looking at the current code:
|
||||
```go
|
||||
fileChunks[i] = database.FileChunk{
|
||||
FileID: fileToProcess.File.ID, // This uses the ID from the File struct
|
||||
```
|
||||
|
||||
And in `checkFileInMemory` for new files, we create a file struct but don't set the ID. However, looking at the database repository, `Files.Create` should be doing `INSERT OR REPLACE` and the ID should be pre-generated...
|
||||
|
||||
Let me check if IDs are being generated. Looking at the File struct usage, it seems like UUIDs should be generated somewhere...
|
||||
|
||||
Actually, looking at the test failures again:
|
||||
```
|
||||
creating file chunk: inserting file_chunk: constraint failed: FOREIGN KEY constraint failed (787)
|
||||
```
|
||||
|
||||
Error 787 is SQLite's foreign key constraint error. The failing FK is on `file_chunks.chunk_hash → chunks.chunk_hash`.
|
||||
|
||||
So the chunks ARE NOT in the database when we try to insert file_chunks. Let me trace through more carefully...
|
||||
|
||||
---
|
||||
|
||||
## Transaction Timing Issue
|
||||
|
||||
The problem is transaction visibility in SQLite.
|
||||
|
||||
Each `WithTx` creates a new transaction that commits at the end. But with batched file insertion:
|
||||
|
||||
1. Chunk transactions commit one at a time
|
||||
2. File batch transaction runs later
|
||||
|
||||
If chunks are being inserted but something goes wrong with transaction isolation, the file batch might not see them.
|
||||
|
||||
But actually SQLite in WAL mode should have SERIALIZABLE isolation by default, so committed transactions should be visible.
|
||||
|
||||
Let me check if the in-memory cache is masking a database problem...
|
||||
|
||||
Actually, wait. Let me re-check the current broken code more carefully. The issue might be simpler.
|
||||
|
||||
---
|
||||
|
||||
## Current Code Flow Analysis
|
||||
|
||||
Looking at `processFileStreaming` in the current broken state:
|
||||
|
||||
```go
|
||||
// For each chunk:
|
||||
if !chunkExists {
|
||||
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
dbChunk := &database.Chunk{ChunkHash: chunk.Hash, Size: chunk.Size}
|
||||
return s.repos.Chunks.Create(txCtx, tx, dbChunk)
|
||||
})
|
||||
// ... check error ...
|
||||
s.addKnownChunk(chunk.Hash)
|
||||
}
|
||||
|
||||
// ... add to packer (creates blob_chunks) ...
|
||||
|
||||
// Collect chunk info for file
|
||||
chunks = append(chunks, chunkInfo{...})
|
||||
```
|
||||
|
||||
Then at end of function:
|
||||
```go
|
||||
// Queue file for batch insertion
|
||||
return s.addPendingFile(ctx, pendingFileData{
|
||||
file: fileToProcess.File,
|
||||
fileChunks: fileChunks,
|
||||
chunkFiles: chunkFiles,
|
||||
})
|
||||
```
|
||||
|
||||
At end of `processPhase`:
|
||||
```go
|
||||
if err := s.flushAllPending(ctx); err != nil { ... }
|
||||
```
|
||||
|
||||
The chunks are being created one-by-one with individual transactions. By the time `flushPendingFiles` runs, all chunk transactions should have committed.
|
||||
|
||||
Unless... there's a bug in how the chunks are being referenced. Let me check if the chunk_hash values are correct.
|
||||
|
||||
Or... maybe the test database is being recreated between operations somehow?
|
||||
|
||||
Actually, let me check the test setup. Maybe the issue is specific to the test environment.
|
||||
|
||||
---
|
||||
|
||||
## Summary of Object Lifecycle
|
||||
|
||||
| Object | When Created | Transaction | Dependencies |
|
||||
|--------|--------------|-------------|--------------|
|
||||
| snapshot | Before scan | Individual tx | None |
|
||||
| blob | When packer needs new blob | Individual tx | None |
|
||||
| chunk | During file chunking (each chunk) | Individual tx | None |
|
||||
| blob_chunks | Immediately after adding chunk to packer | Individual tx | chunks, blobs |
|
||||
| files | Batched at end of processing | Batch tx | None |
|
||||
| file_chunks | With file (batched) | Batch tx | files, chunks |
|
||||
| chunk_files | With file (batched) | Batch tx | files, chunks |
|
||||
| snapshot_files | With file (batched) | Batch tx | snapshots, files |
|
||||
| snapshot_blobs | After blob upload | Individual tx | snapshots, blobs |
|
||||
| uploads | After blob upload | Same tx as snapshot_blobs | blobs, snapshots |
|
||||
|
||||
---
|
||||
|
||||
## Root Cause Analysis
|
||||
|
||||
After detailed analysis, I believe the issue is one of the following:
|
||||
|
||||
### Hypothesis 1: File ID Not Set
|
||||
|
||||
Looking at `checkFileInMemory()` for NEW files:
|
||||
```go
|
||||
if !exists {
|
||||
return file, true // file.ID is empty string!
|
||||
}
|
||||
```
|
||||
|
||||
For new files, `file.ID` is empty. Then in `processFileStreaming`:
|
||||
```go
|
||||
fileChunks[i] = database.FileChunk{
|
||||
FileID: fileToProcess.File.ID, // Empty for new files!
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
The `FileID` in the built `fileChunks` slice is empty.
|
||||
|
||||
Then in `flushPendingFiles`:
|
||||
```go
|
||||
s.repos.Files.Create(txCtx, tx, data.file) // This generates the ID
|
||||
// But data.fileChunks still has empty FileID!
|
||||
for i := range data.fileChunks {
|
||||
s.repos.FileChunks.Create(...) // Uses empty FileID
|
||||
}
|
||||
```
|
||||
|
||||
**Solution**: Generate file IDs upfront in `checkFileInMemory()`:
|
||||
```go
|
||||
file := &database.File{
|
||||
ID: uuid.New().String(), // Generate ID immediately
|
||||
Path: path,
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
### Hypothesis 2: Transaction Isolation
|
||||
|
||||
SQLite with a single connection pool (`MaxOpenConns(1)`) should serialize all transactions. Committed data should be visible to subsequent transactions.
|
||||
|
||||
However, there might be a subtle issue with how `context.Background()` is used in the packer vs the scanner's context.
|
||||
|
||||
## Recommended Fix
|
||||
|
||||
**Step 1: Generate file IDs upfront**
|
||||
|
||||
In `checkFileInMemory()`, generate the UUID for new files immediately:
|
||||
```go
|
||||
file := &database.File{
|
||||
ID: uuid.New().String(), // Always generate ID
|
||||
Path: path,
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
This ensures `file.ID` is set when building `fileChunks` and `chunkFiles` slices.
|
||||
|
||||
**Step 2: Verify by reverting to per-file transactions**
|
||||
|
||||
If Step 1 doesn't fix it, revert to non-batched file insertion to isolate the issue:
|
||||
|
||||
```go
|
||||
// Instead of queuing:
|
||||
// return s.addPendingFile(ctx, pendingFileData{...})
|
||||
|
||||
// Do immediate insertion:
|
||||
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
// Create file
|
||||
s.repos.Files.Create(txCtx, tx, fileToProcess.File)
|
||||
// Delete old associations
|
||||
s.repos.FileChunks.DeleteByFileID(...)
|
||||
s.repos.ChunkFiles.DeleteByFileID(...)
|
||||
// Create new associations
|
||||
for _, fc := range fileChunks {
|
||||
s.repos.FileChunks.Create(...)
|
||||
}
|
||||
for _, cf := range chunkFiles {
|
||||
s.repos.ChunkFiles.Create(...)
|
||||
}
|
||||
// Add to snapshot
|
||||
s.repos.Snapshots.AddFileByID(...)
|
||||
return nil
|
||||
})
|
||||
```
|
||||
|
||||
**Step 3: If batching is still desired**
|
||||
|
||||
After confirming per-file transactions work, re-implement batching with the ID fix in place, and add debug logging to trace exactly which chunk_hash is failing and why.
|
||||
506
README.md
506
README.md
@@ -1,43 +1,35 @@
|
||||
# vaultik (ваултик)
|
||||
|
||||
WIP: pre-1.0, some functions may not be fully implemented yet
|
||||
|
||||
`vaultik` is an incremental backup daemon written in Go. It encrypts data
|
||||
`vaultik` is an incremental backup tool written in Go. It encrypts data
|
||||
using an `age` public key and uploads each encrypted blob directly to a
|
||||
remote S3-compatible object store. It requires no private keys, secrets, or
|
||||
credentials (other than those required to PUT to encrypted object storage,
|
||||
such as S3 API keys) stored on the backed-up system.
|
||||
|
||||
It includes table-stakes features such as:
|
||||
Features:
|
||||
|
||||
* modern encryption (the excellent `age`)
|
||||
* deduplication
|
||||
* incremental backups
|
||||
* modern multithreaded zstd compression with configurable levels
|
||||
* modern encryption ([age](https://age-encryption.org/), X25519 + XChaCha20-Poly1305)
|
||||
* content-defined chunking with deduplication (FastCDC)
|
||||
* incremental backups (only changed files are re-chunked)
|
||||
* multithreaded zstd compression at configurable levels
|
||||
* content-addressed immutable storage
|
||||
* local state tracking in standard SQLite database, enables write-only
|
||||
incremental backups to destination
|
||||
* local state tracking in SQLite (enables write-only incremental backups)
|
||||
* no mutable remote metadata
|
||||
* no plaintext file paths or metadata stored in remote
|
||||
* does not create huge numbers of small files (to keep S3 operation counts
|
||||
down) even if the source system has many small files
|
||||
* no plaintext file paths or metadata in remote storage
|
||||
* packs small files into large blobs (keeps S3 operation counts down)
|
||||
* backs up regular files, symlinks, empty directories, and file permissions
|
||||
* pluggable storage backends: S3, local filesystem, rclone (70+ providers)
|
||||
* pure Go (no CGO), cross-compiles to linux/darwin × amd64/arm64
|
||||
|
||||
## why
|
||||
|
||||
Existing backup software fails under one or more of these conditions:
|
||||
|
||||
* Requires secrets (passwords, private keys) on the source system, which
|
||||
compromises encrypted backups in the case of host system compromise
|
||||
* Depends on symmetric encryption unsuitable for zero-trust environments
|
||||
* Creates one-blob-per-file, which results in excessive S3 operation counts
|
||||
* is slow
|
||||
|
||||
Other backup tools like `restic`, `borg`, and `duplicity` are designed for
|
||||
environments where the source host can store secrets and has access to
|
||||
decryption keys. I don't want to store backup decryption keys on my hosts,
|
||||
only public keys for encryption.
|
||||
decryption keys. `vaultik` is for environments where you don't want to
|
||||
store backup decryption keys on your hosts — only public keys for
|
||||
encryption.
|
||||
|
||||
My requirements are:
|
||||
Requirements that no existing tool meets:
|
||||
|
||||
* open source
|
||||
* no passphrases or private keys on the source host
|
||||
@@ -46,40 +38,13 @@ My requirements are:
|
||||
* encrypted
|
||||
* s3 compatible without an intermediate step or tool
|
||||
|
||||
Surprisingly, no existing tool meets these requirements, so I wrote `vaultik`.
|
||||
## install
|
||||
|
||||
## design goals
|
||||
```sh
|
||||
go install git.eeqj.de/sneak/vaultik@latest
|
||||
```
|
||||
|
||||
1. Backups must require only a public key on the source host.
|
||||
1. No secrets or private keys may exist on the source system.
|
||||
1. Restore must be possible using **only** the backup bucket and a private key.
|
||||
1. Prune must be possible (requires private key, done on different hosts).
|
||||
1. All encryption uses [`age`](https://age-encryption.org/) (X25519, XChaCha20-Poly1305).
|
||||
1. Compression uses `zstd` at a configurable level.
|
||||
1. Files are chunked, and multiple chunks are packed into encrypted blobs
|
||||
to reduce object count for filesystems with many small files.
|
||||
1. All metadata (snapshots) is stored remotely as encrypted SQLite DBs.
|
||||
|
||||
## what
|
||||
|
||||
`vaultik` walks a set of configured directories and builds a
|
||||
content-addressable chunk map of changed files using deterministic chunking.
|
||||
Each chunk is streamed into a blob packer. Blobs are compressed with `zstd`,
|
||||
encrypted with `age`, and uploaded directly to remote storage under a
|
||||
content-addressed S3 path. At the end, a pruned snapshot-specific sqlite
|
||||
database of metadata is created, encrypted, and uploaded alongside the
|
||||
blobs.
|
||||
|
||||
No plaintext file contents ever hit disk. No private key or secret
|
||||
passphrase is needed or stored locally.
|
||||
|
||||
## how
|
||||
|
||||
1. **install**
|
||||
|
||||
```sh
|
||||
go install git.eeqj.de/sneak/vaultik@latest
|
||||
```
|
||||
## quick start
|
||||
|
||||
1. **generate keypair**
|
||||
|
||||
@@ -88,23 +53,21 @@ passphrase is needed or stored locally.
|
||||
grep 'public key:' agekey.txt
|
||||
```
|
||||
|
||||
1. **write config**
|
||||
2. **write config** (see `config.example.yml` for all options)
|
||||
|
||||
```yaml
|
||||
# Named snapshots - each snapshot can contain multiple paths
|
||||
snapshots:
|
||||
system:
|
||||
paths:
|
||||
- /etc
|
||||
- /var/lib
|
||||
exclude:
|
||||
- '*.cache' # Snapshot-specific exclusions
|
||||
- '*.cache'
|
||||
home:
|
||||
paths:
|
||||
- /home/user/documents
|
||||
- /home/user/photos
|
||||
|
||||
# Global exclusions (apply to all snapshots)
|
||||
exclude:
|
||||
- '*.log'
|
||||
- '*.tmp'
|
||||
@@ -112,32 +75,36 @@ passphrase is needed or stored locally.
|
||||
- 'node_modules'
|
||||
|
||||
age_recipients:
|
||||
- age1278m9q7dp3chsh2dcy82qk27v047zywyvtxwnj4cvt0z65jw6a7q5dqhfj
|
||||
- age1YOUR_PUBLIC_KEY_HERE
|
||||
|
||||
# Storage backend (pick one):
|
||||
storage_url: "s3://mybucket/backups?endpoint=s3.example.com®ion=us-east-1"
|
||||
# storage_url: "file:///mnt/backups"
|
||||
# storage_url: "rclone://myremote/path/to/backups"
|
||||
|
||||
# For s3:// URLs, credentials are still required:
|
||||
s3:
|
||||
endpoint: https://s3.example.com
|
||||
bucket: vaultik-data
|
||||
prefix: host1/
|
||||
access_key_id: ...
|
||||
secret_access_key: ...
|
||||
region: us-east-1
|
||||
backup_interval: 1h
|
||||
full_scan_interval: 24h
|
||||
min_time_between_run: 15m
|
||||
chunk_size: 10MB
|
||||
blob_size_limit: 1GB
|
||||
```
|
||||
|
||||
1. **run**
|
||||
3. **run**
|
||||
|
||||
```sh
|
||||
# Create all configured snapshots
|
||||
vaultik --config /etc/vaultik.yaml snapshot create
|
||||
# Back up all configured snapshots
|
||||
vaultik --config /etc/vaultik.yml snapshot create
|
||||
|
||||
# Create specific snapshots by name
|
||||
vaultik --config /etc/vaultik.yaml snapshot create home system
|
||||
# Back up specific snapshots by name
|
||||
vaultik --config /etc/vaultik.yml snapshot create home system
|
||||
|
||||
# Silent mode for cron
|
||||
vaultik --config /etc/vaultik.yaml snapshot create --cron
|
||||
vaultik --config /etc/vaultik.yml snapshot create --cron
|
||||
|
||||
# Back up and clean up old snapshots + orphan blobs in one shot
|
||||
vaultik --config /etc/vaultik.yml snapshot create --prune
|
||||
|
||||
# Daily cron: back up, keep last 4 weeks of snapshots
|
||||
vaultik --config /etc/vaultik.yml snapshot create --cron --prune --keep-newer-than 4w
|
||||
```
|
||||
|
||||
---
|
||||
@@ -147,253 +114,284 @@ passphrase is needed or stored locally.
|
||||
### commands
|
||||
|
||||
```sh
|
||||
vaultik [--config <path>] snapshot create [snapshot-names...] [--cron] [--daemon] [--prune]
|
||||
vaultik [--config <path>] snapshot create [snapshot-names...] [--cron] [--prune] [--keep-newer-than <duration>] [--skip-errors]
|
||||
vaultik [--config <path>] snapshot list [--json]
|
||||
vaultik [--config <path>] snapshot verify <snapshot-id> [--deep]
|
||||
vaultik [--config <path>] snapshot purge [--keep-latest | --older-than <duration>] [--force]
|
||||
vaultik [--config <path>] snapshot remove <snapshot-id> [--dry-run] [--force]
|
||||
vaultik [--config <path>] snapshot verify <snapshot-id> [--deep] [--json]
|
||||
vaultik [--config <path>] snapshot purge [--keep-latest | --older-than <duration>] [--snapshot <name>...] [--force]
|
||||
vaultik [--config <path>] snapshot remove <snapshot-id|--all> [--dry-run] [--force] [--remote] [--json]
|
||||
vaultik [--config <path>] snapshot prune
|
||||
vaultik [--config <path>] restore <snapshot-id> <target-dir> [paths...]
|
||||
vaultik [--config <path>] prune [--dry-run] [--force]
|
||||
vaultik [--config <path>] snapshot cleanup
|
||||
vaultik [--config <path>] restore <snapshot-id> <target-dir> [paths...] [--verify]
|
||||
vaultik [--config <path>] prune [--force] [--json]
|
||||
vaultik [--config <path>] info
|
||||
vaultik [--config <path>] remote info [--json]
|
||||
vaultik [--config <path>] store info
|
||||
vaultik [--config <path>] database purge [--force]
|
||||
vaultik version
|
||||
```
|
||||
|
||||
### environment
|
||||
### global flags
|
||||
|
||||
* `VAULTIK_AGE_SECRET_KEY`: Required for `restore` and deep `verify`. Contains the age private key for decryption.
|
||||
* `VAULTIK_CONFIG`: Optional path to config file.
|
||||
* `--config <path>`: Path to config file (default: `$VAULTIK_CONFIG` or `/etc/vaultik/config.yml`)
|
||||
* `--verbose`, `-v`: Enable verbose output
|
||||
* `--debug`: Enable debug output
|
||||
* `--quiet`, `-q`: Suppress non-error output
|
||||
|
||||
### environment variables
|
||||
|
||||
* `VAULTIK_AGE_SECRET_KEY`: Age private key for decryption (required for `restore` and `verify --deep`)
|
||||
* `VAULTIK_CONFIG`: Path to config file (overridden by `--config`)
|
||||
* `VAULTIK_INDEX_PATH`: Override local SQLite index path
|
||||
|
||||
### command details
|
||||
|
||||
**snapshot create**: Perform incremental backup of configured snapshots
|
||||
* Config is located at `/etc/vaultik/config.yml` by default
|
||||
**snapshot create**: Perform incremental backup of configured snapshots.
|
||||
* Optional snapshot names argument to create specific snapshots (default: all)
|
||||
* `--cron`: Silent unless error (for crontab)
|
||||
* `--daemon`: Run continuously with inotify monitoring and periodic scans
|
||||
* `--prune`: Delete old snapshots and orphaned blobs after backup
|
||||
* `--prune`: After backup, drop older snapshots of each backed-up name and
|
||||
remove orphaned blobs from remote storage. By default keeps only the latest
|
||||
snapshot per name; use `--keep-newer-than` for a rolling window.
|
||||
* `--keep-newer-than <duration>`: With `--prune`, keep snapshots newer than
|
||||
this duration instead of only the latest (e.g. `4w`, `30d`, `6mo`, `1y`)
|
||||
* `--skip-errors`: Skip file read errors (log them loudly but continue)
|
||||
|
||||
**snapshot list**: List all snapshots with their timestamps and sizes
|
||||
**snapshot list**: List all snapshots with their timestamps and sizes.
|
||||
* `--json`: Output in JSON format
|
||||
|
||||
**snapshot verify**: Verify snapshot integrity
|
||||
* `--deep`: Download and verify blob contents (not just existence)
|
||||
**snapshot verify**: Verify snapshot integrity.
|
||||
* Default (shallow): checks that all blobs referenced in the manifest exist in storage
|
||||
* `--deep`: Downloads and decrypts each blob, verifies chunk hashes against the
|
||||
encrypted metadata database
|
||||
* `--json`: Output results as JSON
|
||||
|
||||
**snapshot purge**: Remove old snapshots based on criteria
|
||||
* `--keep-latest`: Keep only the most recent snapshot
|
||||
* `--older-than`: Remove snapshots older than duration (e.g., 30d, 6mo, 1y)
|
||||
**snapshot purge**: Remove old snapshots based on criteria. Retention is
|
||||
per-snapshot-name (`--keep-latest` keeps the latest of each name, not the
|
||||
latest globally).
|
||||
* `--keep-latest`: Keep only the most recent snapshot of each name
|
||||
* `--older-than <duration>`: Remove snapshots older than duration (e.g. `30d`, `6m`, `1y`)
|
||||
* `--snapshot <name>`: Restrict to specific snapshot names (repeat for multiple)
|
||||
* `--force`: Skip confirmation prompt
|
||||
|
||||
**snapshot remove**: Remove a specific snapshot
|
||||
**snapshot remove**: Remove a specific snapshot from the local database.
|
||||
* `--remote`: Also remove snapshot metadata from remote storage
|
||||
* `--all`: Remove all snapshots (requires `--force`)
|
||||
* `--dry-run`: Show what would be deleted without deleting
|
||||
* `--force`: Skip confirmation prompt
|
||||
* `--json`: Output result as JSON
|
||||
|
||||
**snapshot prune**: Clean orphaned data from local database
|
||||
**snapshot prune**: Clean orphaned data from the local database (files,
|
||||
chunks, blobs not referenced by any snapshot).
|
||||
|
||||
**restore**: Restore snapshot to target directory
|
||||
* Requires `VAULTIK_AGE_SECRET_KEY` environment variable with age private key
|
||||
**snapshot cleanup**: Remove stale local snapshot records that have no
|
||||
corresponding metadata in remote storage. These are typically left behind
|
||||
by incomplete or interrupted backups. Does not touch remote storage.
|
||||
|
||||
**restore**: Restore files from a backup snapshot.
|
||||
* Requires `VAULTIK_AGE_SECRET_KEY` environment variable
|
||||
* Optional path arguments to restore specific files/directories (default: all)
|
||||
* Downloads and decrypts metadata, fetches required blobs, reconstructs files
|
||||
* Preserves file permissions, timestamps, and ownership (ownership requires root)
|
||||
* Handles symlinks and directories
|
||||
* Preserves file permissions, timestamps, ownership (ownership requires root),
|
||||
symlinks, and empty directories
|
||||
* `--verify`: After restoring, verify every file's chunk hashes match
|
||||
|
||||
**prune**: Remove unreferenced blobs from remote storage
|
||||
* Scans all snapshots for referenced blobs
|
||||
* Deletes orphaned blobs
|
||||
**prune**: Remove unreferenced blobs from remote storage.
|
||||
* Scans all snapshot manifests for referenced blobs, deletes any blob not referenced
|
||||
* `--force`: Skip confirmation prompt
|
||||
* `--json`: Output stats as JSON
|
||||
|
||||
**info**: Display system and configuration information
|
||||
**info**: Display system configuration, storage settings, encryption
|
||||
recipients, and local database statistics.
|
||||
|
||||
**store info**: Display S3 bucket configuration and storage statistics
|
||||
**remote info**: Show detailed remote storage information including per-snapshot
|
||||
metadata sizes, blob counts, and orphaned blob detection.
|
||||
* `--json`: Output as JSON
|
||||
|
||||
**store info**: Display storage backend type and statistics.
|
||||
|
||||
**database purge**: Delete the local SQLite state database entirely. Remote
|
||||
storage is unaffected; the next backup will do a full scan and re-deduplicate
|
||||
against existing remote blobs.
|
||||
* `--force`: Skip confirmation prompt
|
||||
|
||||
---
|
||||
|
||||
## storage backends
|
||||
|
||||
vaultik supports three storage backends, selected via the `storage_url` config field:
|
||||
|
||||
**S3** (`s3://bucket/prefix?endpoint=host®ion=us-east-1`): Any S3-compatible
|
||||
object store. Credentials are read from `s3.access_key_id` and
|
||||
`s3.secret_access_key` in the config file.
|
||||
|
||||
**Local filesystem** (`file:///path/to/backup`): Stores blobs and metadata on
|
||||
a local or mounted filesystem. Useful for testing or backing up to a NAS.
|
||||
|
||||
**Rclone** (`rclone://remote/path`): Uses rclone's 70+ supported cloud
|
||||
providers. Requires rclone to be configured separately (`rclone config`).
|
||||
|
||||
Legacy S3 configuration via `s3.*` fields (endpoint, bucket, prefix, etc.) is
|
||||
still supported for backward compatibility. `storage_url` takes precedence if
|
||||
both are set.
|
||||
|
||||
---
|
||||
|
||||
## architecture
|
||||
|
||||
### s3 bucket layout
|
||||
### remote storage layout
|
||||
|
||||
```
|
||||
s3://<bucket>/<prefix>/
|
||||
<bucket>/<prefix>/
|
||||
├── blobs/
|
||||
│ └── <aa>/<bb>/<full_blob_hash>
|
||||
└── metadata/
|
||||
├── <snapshot_id>/
|
||||
│ ├── db.zst.age
|
||||
│ └── manifest.json.zst
|
||||
└── <snapshot_id>/
|
||||
├── db.zst.age # Encrypted binary SQLite database
|
||||
└── manifest.json.zst # Unencrypted blob list (for pruning)
|
||||
```
|
||||
|
||||
* `blobs/<aa>/<bb>/...`: Two-level directory sharding using first 4 hex chars of blob hash
|
||||
* `metadata/<snapshot_id>/db.zst.age`: Encrypted, compressed SQLite database
|
||||
* `metadata/<snapshot_id>/manifest.json.zst`: Unencrypted blob list for pruning
|
||||
* Blobs are two-level directory sharded using the first 4 hex chars of the blob hash
|
||||
* `db.zst.age` is a binary SQLite database (zstd compressed, age encrypted)
|
||||
containing all file metadata, chunk mappings, and relationships for the snapshot
|
||||
* `manifest.json.zst` is an unencrypted compressed JSON blob list, enabling
|
||||
pruning without the private key
|
||||
|
||||
### blob manifest format
|
||||
|
||||
The `manifest.json.zst` file is unencrypted (compressed JSON) to enable pruning without decryption:
|
||||
|
||||
```json
|
||||
{
|
||||
"snapshot_id": "hostname_snapshotname_2025-01-01T12:00:00Z",
|
||||
"blob_hashes": [
|
||||
"aa1234567890abcdef...",
|
||||
"bb2345678901bcdef0..."
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
Snapshot IDs follow the format `<hostname>_<snapshot-name>_<timestamp>` (e.g., `server1_home_2025-01-01T12:00:00Z`).
|
||||
|
||||
### local sqlite schema
|
||||
|
||||
```sql
|
||||
CREATE TABLE files (
|
||||
id TEXT PRIMARY KEY,
|
||||
path TEXT NOT NULL UNIQUE,
|
||||
mtime INTEGER NOT NULL,
|
||||
size INTEGER NOT NULL,
|
||||
mode INTEGER NOT NULL,
|
||||
uid INTEGER NOT NULL,
|
||||
gid INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE file_chunks (
|
||||
file_id TEXT NOT NULL,
|
||||
idx INTEGER NOT NULL,
|
||||
chunk_hash TEXT NOT NULL,
|
||||
PRIMARY KEY (file_id, idx),
|
||||
FOREIGN KEY (file_id) REFERENCES files(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE chunks (
|
||||
chunk_hash TEXT PRIMARY KEY,
|
||||
size INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE blobs (
|
||||
id TEXT PRIMARY KEY,
|
||||
blob_hash TEXT NOT NULL UNIQUE,
|
||||
uncompressed INTEGER NOT NULL,
|
||||
compressed INTEGER NOT NULL,
|
||||
uploaded_at INTEGER
|
||||
);
|
||||
|
||||
CREATE TABLE blob_chunks (
|
||||
blob_hash TEXT NOT NULL,
|
||||
chunk_hash TEXT NOT NULL,
|
||||
offset INTEGER NOT NULL,
|
||||
length INTEGER NOT NULL,
|
||||
PRIMARY KEY (blob_hash, chunk_hash)
|
||||
);
|
||||
|
||||
CREATE TABLE chunk_files (
|
||||
chunk_hash TEXT NOT NULL,
|
||||
file_id TEXT NOT NULL,
|
||||
file_offset INTEGER NOT NULL,
|
||||
length INTEGER NOT NULL,
|
||||
PRIMARY KEY (chunk_hash, file_id)
|
||||
);
|
||||
|
||||
CREATE TABLE snapshots (
|
||||
id TEXT PRIMARY KEY,
|
||||
hostname TEXT NOT NULL,
|
||||
vaultik_version TEXT NOT NULL,
|
||||
started_at INTEGER NOT NULL,
|
||||
completed_at INTEGER,
|
||||
file_count INTEGER NOT NULL,
|
||||
chunk_count INTEGER NOT NULL,
|
||||
blob_count INTEGER NOT NULL,
|
||||
total_size INTEGER NOT NULL,
|
||||
blob_size INTEGER NOT NULL,
|
||||
compression_ratio REAL NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE snapshot_files (
|
||||
snapshot_id TEXT NOT NULL,
|
||||
file_id TEXT NOT NULL,
|
||||
PRIMARY KEY (snapshot_id, file_id)
|
||||
);
|
||||
|
||||
CREATE TABLE snapshot_blobs (
|
||||
snapshot_id TEXT NOT NULL,
|
||||
blob_id TEXT NOT NULL,
|
||||
blob_hash TEXT NOT NULL,
|
||||
PRIMARY KEY (snapshot_id, blob_id)
|
||||
);
|
||||
```
|
||||
Snapshot IDs follow the format `<hostname>_<snapshot-name>_<RFC3339-timestamp>`
|
||||
(e.g. `server1_home_2025-06-01T12:00:00Z`).
|
||||
|
||||
### data flow
|
||||
|
||||
#### backup
|
||||
**backup:**
|
||||
|
||||
1. Load config, open local SQLite index
|
||||
1. Walk source directories, check mtime/size against index
|
||||
1. For changed/new files: chunk using content-defined chunking
|
||||
1. For each chunk: hash, check if already uploaded, add to blob packer
|
||||
1. When blob reaches threshold: compress, encrypt, upload to S3
|
||||
1. Build snapshot metadata, compress, encrypt, upload
|
||||
1. Create blob manifest (unencrypted) for pruning support
|
||||
1. Open local SQLite index, load known files and chunks into memory
|
||||
2. Walk source directories, compare mtime/size/mode against index
|
||||
3. For changed/new files: chunk using content-defined chunking (FastCDC)
|
||||
4. For symlinks and directories: record metadata (no chunking)
|
||||
5. For each chunk: hash, check dedup, add to blob packer
|
||||
6. When blob reaches size threshold: compress (zstd), encrypt (age), upload
|
||||
7. Build snapshot metadata database, compress, encrypt, upload
|
||||
8. Create unencrypted blob manifest for pruning support
|
||||
|
||||
#### restore
|
||||
**restore:**
|
||||
|
||||
1. Download `metadata/<snapshot_id>/db.zst.age`
|
||||
1. Decrypt and decompress SQLite database
|
||||
1. Query files table (optionally filtered by paths)
|
||||
1. For each file, get ordered chunk list from file_chunks
|
||||
1. Download required blobs, decrypt, decompress
|
||||
1. Extract chunks and reconstruct files
|
||||
1. Restore permissions, mtime, uid/gid
|
||||
1. Download and decrypt `metadata/<snapshot_id>/db.zst.age`
|
||||
2. Open the binary SQLite database
|
||||
3. Query files (optionally filtered by paths)
|
||||
4. Download and decrypt required blobs
|
||||
5. Extract chunks, reconstruct files
|
||||
6. Restore permissions, timestamps, ownership, symlinks
|
||||
|
||||
#### prune
|
||||
**prune:**
|
||||
|
||||
1. List all snapshot manifests
|
||||
1. Build set of all referenced blob hashes
|
||||
1. List all blobs in storage
|
||||
1. Delete any blob not in referenced set
|
||||
2. Build set of all referenced blob hashes
|
||||
3. List all blobs in storage
|
||||
4. Delete any blob not in the referenced set
|
||||
|
||||
### chunking
|
||||
### chunking and deduplication
|
||||
|
||||
* Content-defined chunking using FastCDC algorithm
|
||||
* Content-defined chunking using the FastCDC algorithm
|
||||
* Average chunk size: configurable (default 10MB)
|
||||
* Deduplication at chunk level
|
||||
* Multiple chunks packed into blobs for efficiency
|
||||
* Deduplication at file level (unchanged files skipped) and chunk level
|
||||
(identical chunks across files stored once)
|
||||
* Multiple chunks packed into blobs to reduce object count
|
||||
|
||||
### encryption
|
||||
|
||||
* Asymmetric encryption using age (X25519 + XChaCha20-Poly1305)
|
||||
* Only public key needed on source host
|
||||
* Each blob encrypted independently
|
||||
* Metadata databases also encrypted
|
||||
* Only the public key is needed on the source host
|
||||
* Each blob and each metadata database is encrypted independently
|
||||
* Multiple recipients supported (encrypt to multiple keys)
|
||||
|
||||
### compression
|
||||
|
||||
* zstd compression at configurable level
|
||||
* Applied before encryption
|
||||
* Blob-level compression for efficiency
|
||||
* zstd compression at configurable level (1-19, default 3)
|
||||
* Applied before encryption at the blob level
|
||||
|
||||
---
|
||||
|
||||
## does not
|
||||
## configuration reference
|
||||
|
||||
* Store any secrets on the backed-up machine
|
||||
* Require mutable remote metadata
|
||||
* Use tarballs, restic, rsync, or ssh
|
||||
* Require a symmetric passphrase or password
|
||||
* Trust the source system with anything
|
||||
See `config.example.yml` for a complete annotated example. Key fields:
|
||||
|
||||
## does
|
||||
| Field | Default | Description |
|
||||
|-------|---------|-------------|
|
||||
| `age_recipients` | (required) | Age public keys for encryption |
|
||||
| `snapshots` | (required) | Named snapshot definitions with paths and excludes |
|
||||
| `storage_url` | | Storage backend URL (`s3://`, `file://`, `rclone://`) |
|
||||
| `s3.*` | | Legacy S3 configuration (endpoint, bucket, credentials) |
|
||||
| `exclude` | | Global exclude patterns (applied to all snapshots) |
|
||||
| `chunk_size` | `10MB` | Average chunk size for content-defined chunking |
|
||||
| `blob_size_limit` | `10GB` | Maximum blob size before splitting |
|
||||
| `compression_level` | `3` | zstd compression level (1-19) |
|
||||
| `hostname` | system hostname | Hostname used in snapshot IDs |
|
||||
| `index_path` | `~/.local/share/.../index.sqlite` | Local SQLite index path |
|
||||
|
||||
* Incremental deduplicated backup
|
||||
* Blob-packed chunk encryption
|
||||
* Content-addressed immutable blobs
|
||||
* Public-key encryption only
|
||||
* SQLite-based local and snapshot metadata
|
||||
* Fully stream-processed storage
|
||||
---
|
||||
|
||||
## limitations
|
||||
|
||||
* **No extended attributes (xattrs).** ACLs, macOS Finder metadata,
|
||||
quarantine flags, SELinux labels, and other extended attributes are not
|
||||
backed up or restored.
|
||||
* **No hard link detection.** Two hard links to the same inode are backed
|
||||
up as independent files. Content deduplication means the data is stored
|
||||
once, but the hard link relationship is lost on restore.
|
||||
* **No sparse file support.** Sparse files are fully materialized during
|
||||
backup. A 100 GB sparse VM disk that is mostly zeros will consume the
|
||||
full (compressed) size in storage.
|
||||
* **No bandwidth limiting.** Uploads and downloads use whatever bandwidth
|
||||
is available. There is no `--bwlimit` flag yet.
|
||||
* **No parallel blob downloads during restore.** Blobs are fetched
|
||||
sequentially. Restore speed is bound by single-stream throughput.
|
||||
* **Device nodes, named pipes, and sockets are silently skipped.** Only
|
||||
regular files, directories, and symlinks are backed up.
|
||||
* **No database migrations.** If the local SQLite schema changes between
|
||||
versions, delete the local database (`vaultik database purge`) and run
|
||||
a full backup. Remote storage is unaffected.
|
||||
* **Files that change during backup may be inconsistent.** There is no
|
||||
filesystem snapshot or freeze. If a file is modified between the scan
|
||||
and chunk phases, the backed-up copy may reflect a partial write.
|
||||
* **Ownership restoration requires root.** File uid/gid are recorded
|
||||
and restored, but `chown` requires elevated privileges. Without root,
|
||||
files are restored with the current user's ownership.
|
||||
|
||||
---
|
||||
|
||||
## roadmap
|
||||
|
||||
Items for future releases:
|
||||
|
||||
* Error-condition tests (network failures, disk full, corrupted/missing blobs)
|
||||
* Parallel blob downloads during restore
|
||||
* Bandwidth limiting (`--bwlimit`)
|
||||
* Security audit of encryption implementation
|
||||
* Man pages and richer `--help` examples
|
||||
|
||||
---
|
||||
|
||||
## requirements
|
||||
|
||||
* Go 1.24 or later
|
||||
* S3-compatible object storage
|
||||
* Sufficient disk space for local index (typically <1GB)
|
||||
* Go 1.26 or later
|
||||
* S3-compatible object storage (or local filesystem, or rclone remote)
|
||||
|
||||
## development workflow
|
||||
|
||||
All changes follow this workflow. No exceptions.
|
||||
|
||||
1. Create a feature branch off `main`.
|
||||
2. Write tests.
|
||||
3. Write the implementation.
|
||||
4. Fix implementation errors until it compiles and tests pass.
|
||||
5. Fix linting errors (`make lint`).
|
||||
6. Update documentation and README as required by the change.
|
||||
7. Format code (`make fmt`).
|
||||
8. Run `make check` (lint + fmt-check + test). Fix any issues. Repeat until clean.
|
||||
9. Commit on the branch.
|
||||
10. Merge to `main`.
|
||||
11. Push.
|
||||
|
||||
Do not commit directly to `main`. Do not skip steps.
|
||||
|
||||
Repository policies for AI agents are in [`AGENTS.md`](AGENTS.md).
|
||||
|
||||
## license
|
||||
|
||||
|
||||
128
TODO.md
128
TODO.md
@@ -1,128 +0,0 @@
|
||||
# Vaultik 1.0 TODO
|
||||
|
||||
Linear list of tasks to complete before 1.0 release.
|
||||
|
||||
## Rclone Storage Backend (Complete)
|
||||
|
||||
Add rclone as a storage backend via Go library import, allowing vaultik to use any of rclone's 70+ supported cloud storage providers.
|
||||
|
||||
**Configuration:**
|
||||
```yaml
|
||||
storage_url: "rclone://myremote/path/to/backups"
|
||||
```
|
||||
User must have rclone configured separately (via `rclone config`).
|
||||
|
||||
**Implementation Steps:**
|
||||
1. [x] Add rclone dependency to go.mod
|
||||
2. [x] Create `internal/storage/rclone.go` implementing `Storer` interface
|
||||
- `NewRcloneStorer(remote, path)` - init with `configfile.Install()` and `fs.NewFs()`
|
||||
- `Put` / `PutWithProgress` - use `operations.Rcat()`
|
||||
- `Get` - use `fs.NewObject()` then `obj.Open()`
|
||||
- `Stat` - use `fs.NewObject()` for size/metadata
|
||||
- `Delete` - use `obj.Remove()`
|
||||
- `List` / `ListStream` - use `operations.ListFn()`
|
||||
- `Info` - return remote name
|
||||
3. [x] Update `internal/storage/url.go` - parse `rclone://remote/path` URLs
|
||||
4. [x] Update `internal/storage/module.go` - add rclone case to `storerFromURL()`
|
||||
5. [x] Test with real rclone remote
|
||||
|
||||
**Error Mapping:**
|
||||
- `fs.ErrorObjectNotFound` → `ErrNotFound`
|
||||
- `fs.ErrorDirNotFound` → `ErrNotFound`
|
||||
- `fs.ErrorNotFoundInConfigFile` → `ErrRemoteNotFound` (new)
|
||||
|
||||
---
|
||||
|
||||
## CLI Polish (Priority)
|
||||
|
||||
1. Improve error messages throughout
|
||||
- Ensure all errors include actionable context
|
||||
- Add suggestions for common issues (e.g., "did you set VAULTIK_AGE_SECRET_KEY?")
|
||||
|
||||
## Security (Priority)
|
||||
|
||||
1. Audit encryption implementation
|
||||
- Verify age encryption is used correctly
|
||||
- Ensure no plaintext leaks in logs or errors
|
||||
- Verify blob hashes are computed correctly
|
||||
|
||||
1. Secure memory handling for secrets
|
||||
- Clear S3 credentials from memory after client init
|
||||
- Document that age_secret_key is env-var only (already implemented)
|
||||
|
||||
## Testing
|
||||
|
||||
1. Write integration tests for restore command
|
||||
|
||||
1. Write end-to-end integration test
|
||||
- Create backup
|
||||
- Verify backup
|
||||
- Restore backup
|
||||
- Compare restored files to originals
|
||||
|
||||
1. Add tests for edge cases
|
||||
- Empty directories
|
||||
- Symlinks
|
||||
- Special characters in filenames
|
||||
- Very large files (multi-GB)
|
||||
- Many small files (100k+)
|
||||
|
||||
1. Add tests for error conditions
|
||||
- Network failures during upload
|
||||
- Disk full during restore
|
||||
- Corrupted blobs
|
||||
- Missing blobs
|
||||
|
||||
## Performance
|
||||
|
||||
1. Profile and optimize restore performance
|
||||
- Parallel blob downloads
|
||||
- Streaming decompression/decryption
|
||||
- Efficient chunk reassembly
|
||||
|
||||
1. Add bandwidth limiting option
|
||||
- `--bwlimit` flag for upload/download speed limiting
|
||||
|
||||
## Documentation
|
||||
|
||||
1. Add man page or --help improvements
|
||||
- Detailed help for each command
|
||||
- Examples in help output
|
||||
|
||||
## Final Polish
|
||||
|
||||
1. Ensure version is set correctly in releases
|
||||
|
||||
1. Create release process
|
||||
- Binary releases for supported platforms
|
||||
- Checksums for binaries
|
||||
- Release notes template
|
||||
|
||||
1. Final code review
|
||||
- Remove debug statements
|
||||
- Ensure consistent code style
|
||||
|
||||
1. Tag and release v1.0.0
|
||||
|
||||
---
|
||||
|
||||
## Post-1.0 (Daemon Mode)
|
||||
|
||||
1. Implement inotify file watcher for Linux
|
||||
- Watch source directories for changes
|
||||
- Track dirty paths in memory
|
||||
|
||||
1. Implement FSEvents watcher for macOS
|
||||
- Watch source directories for changes
|
||||
- Track dirty paths in memory
|
||||
|
||||
1. Implement backup scheduler in daemon mode
|
||||
- Respect backup_interval config
|
||||
- Trigger backup when dirty paths exist and interval elapsed
|
||||
- Implement full_scan_interval for periodic full scans
|
||||
|
||||
1. Add proper signal handling for daemon
|
||||
- Graceful shutdown on SIGTERM/SIGINT
|
||||
- Complete in-progress backup before exit
|
||||
|
||||
1. Write tests for daemon mode
|
||||
@@ -291,21 +291,6 @@ storage_url: "rclone://las1stor1//srv/pool.2024.04/backups/heraklion"
|
||||
# # Default: 5MB
|
||||
# #part_size: 5MB
|
||||
|
||||
# How often to run backups in daemon mode
|
||||
# Format: 1h, 30m, 24h, etc
|
||||
# Default: 1h
|
||||
#backup_interval: 1h
|
||||
|
||||
# How often to do a full filesystem scan in daemon mode
|
||||
# Between full scans, inotify is used to detect changes
|
||||
# Default: 24h
|
||||
#full_scan_interval: 24h
|
||||
|
||||
# Minimum time between backup runs in daemon mode
|
||||
# Prevents backups from running too frequently
|
||||
# Default: 15m
|
||||
#min_time_between_run: 15m
|
||||
|
||||
# Path to local SQLite index database
|
||||
# This database tracks file state for incremental backups
|
||||
# Default: /var/lib/vaultik/index.sqlite
|
||||
|
||||
@@ -5,8 +5,14 @@
|
||||
Vaultik uses a local SQLite database to track file metadata, chunk mappings, and blob associations during the backup process. This database serves as an index for incremental backups and enables efficient deduplication.
|
||||
|
||||
**Important Notes:**
|
||||
- **No Migration Support**: Vaultik does not support database schema migrations. If the schema changes, the local database must be deleted and recreated by performing a full backup.
|
||||
- **Version Compatibility**: In rare cases, you may need to use the same version of Vaultik to restore a backup as was used to create it. This ensures compatibility with the metadata format stored in S3.
|
||||
- **No Migration Support (pre-1.0)**: Vaultik does not support database schema
|
||||
migrations. The local index is treated as disposable — if the schema changes,
|
||||
delete the local SQLite database (`vaultik database purge`) and run a full
|
||||
backup. The remote storage is unaffected; the new index will re-deduplicate
|
||||
against existing remote blobs.
|
||||
- **Version Compatibility**: In rare cases, you may need to use the same version
|
||||
of Vaultik to restore a backup as was used to create it. This ensures
|
||||
compatibility with the metadata format stored in S3.
|
||||
|
||||
## Database Tables
|
||||
|
||||
@@ -17,7 +23,6 @@ Stores metadata about files in the filesystem being backed up.
|
||||
- `id` (TEXT PRIMARY KEY) - UUID for the file record
|
||||
- `path` (TEXT NOT NULL UNIQUE) - Absolute file path
|
||||
- `mtime` (INTEGER NOT NULL) - Modification time as Unix timestamp
|
||||
- `ctime` (INTEGER NOT NULL) - Change time as Unix timestamp
|
||||
- `size` (INTEGER NOT NULL) - File size in bytes
|
||||
- `mode` (INTEGER NOT NULL) - Unix file permissions and type
|
||||
- `uid` (INTEGER NOT NULL) - User ID of file owner
|
||||
|
||||
@@ -43,18 +43,19 @@ Blobs contain the actual file data from backups and must be encrypted for securi
|
||||
Each snapshot has its own subdirectory named with the snapshot ID.
|
||||
|
||||
### Snapshot ID Format
|
||||
- **Format**: `<hostname>-<YYYYMMDD>-<HHMMSSZ>`
|
||||
- **Example**: `laptop-20240115-143052Z`
|
||||
- **Format**: `<hostname>_<snapshot-name>_<RFC3339>` (or `<hostname>_<RFC3339>` if no
|
||||
name was specified)
|
||||
- **Example**: `laptop_home_2024-01-15T14:30:52Z`
|
||||
- **Components**:
|
||||
- Hostname (may contain hyphens)
|
||||
- Date in YYYYMMDD format
|
||||
- Time in HHMMSSZ format (Z indicates UTC)
|
||||
- Short hostname (everything before the first dot is stripped from the FQDN)
|
||||
- Snapshot name from the configured `snapshots:` map (optional)
|
||||
- RFC3339 UTC timestamp
|
||||
|
||||
### Files in Each Snapshot Directory
|
||||
|
||||
#### `db.zst.age` - Encrypted Database Dump
|
||||
- **What it contains**: Complete SQLite database dump for this snapshot
|
||||
- **Format**: SQL dump → Zstandard compressed → Age encrypted
|
||||
#### `db.zst.age` - Encrypted Database
|
||||
- **What it contains**: Pruned binary SQLite database for this snapshot
|
||||
- **Format**: Binary SQLite → Zstandard compressed → Age encrypted
|
||||
- **Encryption**: Encrypted with Age
|
||||
- **Purpose**: Contains full file metadata, chunk mappings, and all relationships
|
||||
- **Why encrypted**: Contains sensitive metadata like file paths, permissions, and ownership
|
||||
@@ -67,7 +68,7 @@ Each snapshot has its own subdirectory named with the snapshot ID.
|
||||
- **Structure**:
|
||||
```json
|
||||
{
|
||||
"snapshot_id": "laptop-20240115-143052Z",
|
||||
"snapshot_id": "laptop_home_2024-01-15T14:30:52Z",
|
||||
"timestamp": "2024-01-15T14:30:52Z",
|
||||
"blob_count": 42,
|
||||
"blobs": [
|
||||
|
||||
3
go.mod
3
go.mod
@@ -17,13 +17,13 @@ require (
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/johannesboyne/gofakes3 v0.0.0-20250603205740-ed9094be7668
|
||||
github.com/klauspost/compress v1.18.1
|
||||
github.com/mattn/go-sqlite3 v1.14.29
|
||||
github.com/rclone/rclone v1.72.1
|
||||
github.com/schollz/progressbar/v3 v3.19.0
|
||||
github.com/spf13/afero v1.15.0
|
||||
github.com/spf13/cobra v1.10.1
|
||||
github.com/stretchr/testify v1.11.1
|
||||
go.uber.org/fx v1.24.0
|
||||
golang.org/x/sync v0.18.0
|
||||
golang.org/x/term v0.37.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
modernc.org/sqlite v1.38.0
|
||||
@@ -266,7 +266,6 @@ require (
|
||||
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect
|
||||
golang.org/x/net v0.47.0 // indirect
|
||||
golang.org/x/oauth2 v0.33.0 // indirect
|
||||
golang.org/x/sync v0.18.0 // indirect
|
||||
golang.org/x/sys v0.38.0 // indirect
|
||||
golang.org/x/text v0.31.0 // indirect
|
||||
golang.org/x/time v0.14.0 // indirect
|
||||
|
||||
2
go.sum
2
go.sum
@@ -593,8 +593,6 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
|
||||
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
|
||||
github.com/mattn/go-runewidth v0.0.19 h1:v++JhqYnZuu5jSKrk9RbgF5v4CGUjqRfBm05byFGLdw=
|
||||
github.com/mattn/go-runewidth v0.0.19/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs=
|
||||
github.com/mattn/go-sqlite3 v1.14.29 h1:1O6nRLJKvsi1H2Sj0Hzdfojwt8GiGKm+LOfLaBFaouQ=
|
||||
github.com/mattn/go-sqlite3 v1.14.29/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
|
||||
github.com/miekg/dns v1.1.41 h1:WMszZWJG0XmzbK9FEmzH2TVcqYzFesusSIB41b8KHxY=
|
||||
|
||||
@@ -361,101 +361,23 @@ func (p *Packer) finalizeCurrentBlob() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close blobgen writer to flush all data
|
||||
if err := p.currentBlob.writer.Close(); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("closing blobgen writer: %w", err)
|
||||
}
|
||||
|
||||
// Sync file to ensure all data is written
|
||||
if err := p.currentBlob.tempFile.Sync(); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("syncing temp file: %w", err)
|
||||
}
|
||||
|
||||
// Get the final size (encrypted if applicable)
|
||||
finalSize, err := p.currentBlob.tempFile.Seek(0, io.SeekCurrent)
|
||||
blobHash, finalSize, err := p.closeBlobWriter()
|
||||
if err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("getting file size: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Reset to beginning for reading
|
||||
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("seeking to start: %w", err)
|
||||
}
|
||||
chunkRefs := p.buildChunkRefs()
|
||||
|
||||
// Get hash from blobgen writer (of final encrypted data)
|
||||
finalHash := p.currentBlob.writer.Sum256()
|
||||
blobHash := hex.EncodeToString(finalHash)
|
||||
|
||||
// Create chunk references with offsets
|
||||
chunkRefs := make([]*BlobChunkRef, 0, len(p.currentBlob.chunks))
|
||||
|
||||
for _, chunk := range p.currentBlob.chunks {
|
||||
chunkRefs = append(chunkRefs, &BlobChunkRef{
|
||||
ChunkHash: chunk.Hash,
|
||||
Offset: chunk.Offset,
|
||||
Length: chunk.Size,
|
||||
})
|
||||
}
|
||||
|
||||
// Get pending chunks (will be inserted to DB and reported to handler)
|
||||
chunksToInsert := p.pendingChunks
|
||||
p.pendingChunks = nil // Clear pending list
|
||||
p.pendingChunks = nil
|
||||
|
||||
// Insert pending chunks, blob_chunks, and update blob in a single transaction
|
||||
if p.repos != nil {
|
||||
blobIDTyped, parseErr := types.ParseBlobID(p.currentBlob.id)
|
||||
if parseErr != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("parsing blob ID: %w", parseErr)
|
||||
}
|
||||
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
|
||||
// First insert all pending chunks (required for blob_chunks FK)
|
||||
for _, chunk := range chunksToInsert {
|
||||
dbChunk := &database.Chunk{
|
||||
ChunkHash: types.ChunkHash(chunk.Hash),
|
||||
Size: chunk.Size,
|
||||
}
|
||||
if err := p.repos.Chunks.Create(ctx, tx, dbChunk); err != nil {
|
||||
return fmt.Errorf("creating chunk: %w", err)
|
||||
}
|
||||
if err := p.commitBlobToDatabase(blobHash, finalSize, chunksToInsert); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Insert all blob_chunk records in batch
|
||||
for _, chunk := range p.currentBlob.chunks {
|
||||
blobChunk := &database.BlobChunk{
|
||||
BlobID: blobIDTyped,
|
||||
ChunkHash: types.ChunkHash(chunk.Hash),
|
||||
Offset: chunk.Offset,
|
||||
Length: chunk.Size,
|
||||
}
|
||||
if err := p.repos.BlobChunks.Create(ctx, tx, blobChunk); err != nil {
|
||||
return fmt.Errorf("creating blob_chunk: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Update blob record with final hash and sizes
|
||||
return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash,
|
||||
p.currentBlob.size, finalSize)
|
||||
})
|
||||
if err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("finalizing blob transaction: %w", err)
|
||||
}
|
||||
|
||||
log.Debug("Committed blob transaction",
|
||||
"chunks_inserted", len(chunksToInsert),
|
||||
"blob_chunks_inserted", len(p.currentBlob.chunks))
|
||||
}
|
||||
|
||||
// Create finished blob
|
||||
finished := &FinishedBlob{
|
||||
ID: p.currentBlob.id,
|
||||
Hash: blobHash,
|
||||
Data: nil, // We don't load data into memory anymore
|
||||
Chunks: chunkRefs,
|
||||
CreatedTS: p.currentBlob.startTime,
|
||||
Uncompressed: p.currentBlob.size,
|
||||
@@ -464,28 +386,105 @@ func (p *Packer) finalizeCurrentBlob() error {
|
||||
|
||||
compressionRatio := float64(finished.Compressed) / float64(finished.Uncompressed)
|
||||
log.Info("Finalized blob (compressed and encrypted)",
|
||||
"hash", blobHash,
|
||||
"chunks", len(chunkRefs),
|
||||
"uncompressed", finished.Uncompressed,
|
||||
"compressed", finished.Compressed,
|
||||
"hash", blobHash, "chunks", len(chunkRefs),
|
||||
"uncompressed", finished.Uncompressed, "compressed", finished.Compressed,
|
||||
"ratio", fmt.Sprintf("%.2f", compressionRatio),
|
||||
"duration", time.Since(p.currentBlob.startTime))
|
||||
|
||||
// Collect inserted chunk hashes for the scanner to track
|
||||
var insertedChunkHashes []string
|
||||
for _, chunk := range chunksToInsert {
|
||||
insertedChunkHashes = append(insertedChunkHashes, chunk.Hash)
|
||||
}
|
||||
|
||||
// Call blob handler if set
|
||||
return p.deliverFinishedBlob(finished, insertedChunkHashes)
|
||||
}
|
||||
|
||||
// closeBlobWriter closes the writer, syncs to disk, and returns the blob hash and final size
|
||||
func (p *Packer) closeBlobWriter() (string, int64, error) {
|
||||
if err := p.currentBlob.writer.Close(); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return "", 0, fmt.Errorf("closing blobgen writer: %w", err)
|
||||
}
|
||||
if err := p.currentBlob.tempFile.Sync(); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return "", 0, fmt.Errorf("syncing temp file: %w", err)
|
||||
}
|
||||
|
||||
finalSize, err := p.currentBlob.tempFile.Seek(0, io.SeekCurrent)
|
||||
if err != nil {
|
||||
p.cleanupTempFile()
|
||||
return "", 0, fmt.Errorf("getting file size: %w", err)
|
||||
}
|
||||
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return "", 0, fmt.Errorf("seeking to start: %w", err)
|
||||
}
|
||||
|
||||
finalHash := p.currentBlob.writer.Sum256()
|
||||
return hex.EncodeToString(finalHash), finalSize, nil
|
||||
}
|
||||
|
||||
// buildChunkRefs creates BlobChunkRef entries from the current blob's chunks
|
||||
func (p *Packer) buildChunkRefs() []*BlobChunkRef {
|
||||
refs := make([]*BlobChunkRef, 0, len(p.currentBlob.chunks))
|
||||
for _, chunk := range p.currentBlob.chunks {
|
||||
refs = append(refs, &BlobChunkRef{
|
||||
ChunkHash: chunk.Hash, Offset: chunk.Offset, Length: chunk.Size,
|
||||
})
|
||||
}
|
||||
return refs
|
||||
}
|
||||
|
||||
// commitBlobToDatabase inserts pending chunks, blob_chunks, and updates the blob record
|
||||
func (p *Packer) commitBlobToDatabase(blobHash string, finalSize int64, chunksToInsert []PendingChunk) error {
|
||||
if p.repos == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
blobIDTyped, parseErr := types.ParseBlobID(p.currentBlob.id)
|
||||
if parseErr != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("parsing blob ID: %w", parseErr)
|
||||
}
|
||||
|
||||
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
|
||||
for _, chunk := range chunksToInsert {
|
||||
dbChunk := &database.Chunk{ChunkHash: types.ChunkHash(chunk.Hash), Size: chunk.Size}
|
||||
if err := p.repos.Chunks.Create(ctx, tx, dbChunk); err != nil {
|
||||
return fmt.Errorf("creating chunk: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, chunk := range p.currentBlob.chunks {
|
||||
blobChunk := &database.BlobChunk{
|
||||
BlobID: blobIDTyped, ChunkHash: types.ChunkHash(chunk.Hash),
|
||||
Offset: chunk.Offset, Length: chunk.Size,
|
||||
}
|
||||
if err := p.repos.BlobChunks.Create(ctx, tx, blobChunk); err != nil {
|
||||
return fmt.Errorf("creating blob_chunk: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return p.repos.Blobs.UpdateFinished(ctx, tx, p.currentBlob.id, blobHash, p.currentBlob.size, finalSize)
|
||||
})
|
||||
if err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("finalizing blob transaction: %w", err)
|
||||
}
|
||||
|
||||
log.Debug("Committed blob transaction",
|
||||
"chunks_inserted", len(chunksToInsert), "blob_chunks_inserted", len(p.currentBlob.chunks))
|
||||
return nil
|
||||
}
|
||||
|
||||
// deliverFinishedBlob passes the blob to the handler or stores it internally
|
||||
func (p *Packer) deliverFinishedBlob(finished *FinishedBlob, insertedChunkHashes []string) error {
|
||||
if p.blobHandler != nil {
|
||||
// Reset file position for handler
|
||||
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("seeking for handler: %w", err)
|
||||
}
|
||||
|
||||
// Create a blob reader that includes the data stream
|
||||
blobWithReader := &BlobWithReader{
|
||||
FinishedBlob: finished,
|
||||
Reader: p.currentBlob.tempFile,
|
||||
@@ -497,11 +496,12 @@ func (p *Packer) finalizeCurrentBlob() error {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("blob handler failed: %w", err)
|
||||
}
|
||||
// Note: blob handler is responsible for closing/cleaning up temp file
|
||||
p.currentBlob = nil
|
||||
} else {
|
||||
log.Debug("No blob handler callback configured", "blob_hash", blobHash[:8]+"...")
|
||||
// No handler, need to read data for legacy behavior
|
||||
return nil
|
||||
}
|
||||
|
||||
// No handler - read data for legacy behavior
|
||||
log.Debug("No blob handler callback configured", "blob_hash", finished.Hash[:8]+"...")
|
||||
if _, err := p.currentBlob.tempFile.Seek(0, io.SeekStart); err != nil {
|
||||
p.cleanupTempFile()
|
||||
return fmt.Errorf("seeking to read data: %w", err)
|
||||
@@ -513,14 +513,9 @@ func (p *Packer) finalizeCurrentBlob() error {
|
||||
return fmt.Errorf("reading blob data: %w", err)
|
||||
}
|
||||
finished.Data = data
|
||||
|
||||
p.finishedBlobs = append(p.finishedBlobs, finished)
|
||||
|
||||
// Cleanup
|
||||
p.cleanupTempFile()
|
||||
p.currentBlob = nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ func TestCLIEntry(t *testing.T) {
|
||||
}
|
||||
|
||||
// Verify all subcommands are registered
|
||||
expectedCommands := []string{"snapshot", "store", "restore", "prune", "verify", "info", "version"}
|
||||
expectedCommands := []string{"snapshot", "store", "restore", "prune", "info", "version", "remote", "database"}
|
||||
for _, expected := range expectedCommands {
|
||||
found := false
|
||||
for _, cmd := range cmd.Commands() {
|
||||
@@ -38,7 +38,7 @@ func TestCLIEntry(t *testing.T) {
|
||||
t.Errorf("Failed to find snapshot command: %v", err)
|
||||
} else {
|
||||
// Check snapshot subcommands
|
||||
expectedSubCommands := []string{"create", "list", "purge", "verify"}
|
||||
expectedSubCommands := []string{"create", "list", "purge", "verify", "cleanup"}
|
||||
for _, expected := range expectedSubCommands {
|
||||
found := false
|
||||
for _, subcmd := range snapshotCmd.Commands() {
|
||||
|
||||
@@ -1,100 +0,0 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||
"git.eeqj.de/sneak/vaultik/internal/vaultik"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/fx"
|
||||
)
|
||||
|
||||
// PurgeOptions contains options for the purge command
|
||||
type PurgeOptions struct {
|
||||
KeepLatest bool
|
||||
OlderThan string
|
||||
Force bool
|
||||
}
|
||||
|
||||
// NewPurgeCommand creates the purge command
|
||||
func NewPurgeCommand() *cobra.Command {
|
||||
opts := &PurgeOptions{}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "purge",
|
||||
Short: "Purge old snapshots",
|
||||
Long: `Removes snapshots based on age or count criteria.
|
||||
|
||||
This command allows you to:
|
||||
- Keep only the latest snapshot (--keep-latest)
|
||||
- Remove snapshots older than a specific duration (--older-than)
|
||||
|
||||
Config is located at /etc/vaultik/config.yml by default, but can be overridden by
|
||||
specifying a path using --config or by setting VAULTIK_CONFIG to a path.`,
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
// Validate flags
|
||||
if !opts.KeepLatest && opts.OlderThan == "" {
|
||||
return fmt.Errorf("must specify either --keep-latest or --older-than")
|
||||
}
|
||||
if opts.KeepLatest && opts.OlderThan != "" {
|
||||
return fmt.Errorf("cannot specify both --keep-latest and --older-than")
|
||||
}
|
||||
|
||||
// Use unified config resolution
|
||||
configPath, err := ResolveConfigPath()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Use the app framework like other commands
|
||||
rootFlags := GetRootFlags()
|
||||
return RunWithApp(cmd.Context(), AppOptions{
|
||||
ConfigPath: configPath,
|
||||
LogOptions: log.LogOptions{
|
||||
Verbose: rootFlags.Verbose,
|
||||
Debug: rootFlags.Debug,
|
||||
Quiet: rootFlags.Quiet,
|
||||
},
|
||||
Modules: []fx.Option{},
|
||||
Invokes: []fx.Option{
|
||||
fx.Invoke(func(v *vaultik.Vaultik, lc fx.Lifecycle) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
// Start the purge operation in a goroutine
|
||||
go func() {
|
||||
// Run the purge operation
|
||||
if err := v.PurgeSnapshots(opts.KeepLatest, opts.OlderThan, opts.Force); err != nil {
|
||||
if err != context.Canceled {
|
||||
log.Error("Purge operation failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown the app when purge completes
|
||||
if err := v.Shutdowner.Shutdown(); err != nil {
|
||||
log.Error("Failed to shutdown", "error", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
},
|
||||
OnStop: func(ctx context.Context) error {
|
||||
log.Debug("Stopping purge operation")
|
||||
v.Cancel()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}),
|
||||
},
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().BoolVar(&opts.KeepLatest, "keep-latest", false, "Keep only the latest snapshot")
|
||||
cmd.Flags().StringVar(&opts.OlderThan, "older-than", "", "Remove snapshots older than duration (e.g. 30d, 6m, 1y)")
|
||||
cmd.Flags().BoolVar(&opts.Force, "force", false, "Skip confirmation prompts")
|
||||
|
||||
return cmd
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/config"
|
||||
"git.eeqj.de/sneak/vaultik/internal/globals"
|
||||
@@ -57,6 +58,17 @@ Examples:
|
||||
vaultik restore --verify myhost_docs_2025-01-01T12:00:00Z /restore`,
|
||||
Args: cobra.MinimumNArgs(2),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
return runRestore(cmd, args, opts)
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().BoolVar(&opts.Verify, "verify", false, "Verify restored files by checking chunk hashes")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
// runRestore parses arguments and runs the restore operation through the app framework
|
||||
func runRestore(cmd *cobra.Command, args []string, opts *RestoreOptions) error {
|
||||
snapshotID := args[0]
|
||||
opts.TargetDir = args[1]
|
||||
if len(args) > 2 {
|
||||
@@ -78,7 +90,14 @@ Examples:
|
||||
Debug: rootFlags.Debug,
|
||||
Quiet: rootFlags.Quiet,
|
||||
},
|
||||
Modules: []fx.Option{
|
||||
Modules: buildRestoreModules(),
|
||||
Invokes: buildRestoreInvokes(snapshotID, opts),
|
||||
})
|
||||
}
|
||||
|
||||
// buildRestoreModules returns the fx.Options for dependency injection in restore
|
||||
func buildRestoreModules() []fx.Option {
|
||||
return []fx.Option{
|
||||
fx.Provide(fx.Annotate(
|
||||
func(g *globals.Globals, cfg *config.Config,
|
||||
storer storage.Storer, v *vaultik.Vaultik, shutdowner fx.Shutdowner) *RestoreApp {
|
||||
@@ -91,8 +110,12 @@ Examples:
|
||||
}
|
||||
},
|
||||
)),
|
||||
},
|
||||
Invokes: []fx.Option{
|
||||
}
|
||||
}
|
||||
|
||||
// buildRestoreInvokes returns the fx.Options that wire up the restore lifecycle
|
||||
func buildRestoreInvokes(snapshotID string, opts *RestoreOptions) []fx.Option {
|
||||
return []fx.Option{
|
||||
fx.Invoke(func(app *RestoreApp, lc fx.Lifecycle) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
@@ -108,6 +131,7 @@ Examples:
|
||||
if err := app.Vaultik.Restore(restoreOpts); err != nil {
|
||||
if err != context.Canceled {
|
||||
log.Error("Restore operation failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -125,12 +149,5 @@ Examples:
|
||||
},
|
||||
})
|
||||
}),
|
||||
},
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().BoolVar(&opts.Verify, "verify", false, "Verify restored files by checking chunk hashes")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ func NewRootCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "vaultik",
|
||||
Short: "Secure incremental backup tool with asymmetric encryption",
|
||||
Long: `vaultik is a secure incremental backup daemon that encrypts data using age
|
||||
Long: `vaultik is a secure incremental backup tool that encrypts data using age
|
||||
public keys and uploads to S3-compatible storage. No private keys are needed
|
||||
on the source system.`,
|
||||
SilenceUsage: true,
|
||||
@@ -41,7 +41,6 @@ on the source system.`,
|
||||
cmd.AddCommand(
|
||||
NewRestoreCommand(),
|
||||
NewPruneCommand(),
|
||||
NewVerifyCommand(),
|
||||
NewStoreCommand(),
|
||||
NewSnapshotCommand(),
|
||||
NewInfoCommand(),
|
||||
@@ -80,5 +79,5 @@ func ResolveConfigPath() (string, error) {
|
||||
return defaultPath, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("no config file specified, VAULTIK_CONFIG not set, and %s not found", defaultPath)
|
||||
return "", fmt.Errorf("no config file found; specify one with --config, set VAULTIK_CONFIG, or create %s", defaultPath)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package cli
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||
@@ -26,6 +27,7 @@ func NewSnapshotCommand() *cobra.Command {
|
||||
cmd.AddCommand(newSnapshotVerifyCommand())
|
||||
cmd.AddCommand(newSnapshotRemoveCommand())
|
||||
cmd.AddCommand(newSnapshotPruneCommand())
|
||||
cmd.AddCommand(newSnapshotCleanupCommand())
|
||||
|
||||
return cmd
|
||||
}
|
||||
@@ -71,10 +73,13 @@ specifying a path using --config or by setting VAULTIK_CONFIG to a path.`,
|
||||
OnStart: func(ctx context.Context) error {
|
||||
// Start the snapshot creation in a goroutine
|
||||
go func() {
|
||||
// Run the snapshot creation
|
||||
if opts.Cron {
|
||||
v.Stdout = io.Discard
|
||||
}
|
||||
if err := v.CreateSnapshot(opts); err != nil {
|
||||
if err != context.Canceled {
|
||||
log.Error("Snapshot creation failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,9 +103,9 @@ specifying a path using --config or by setting VAULTIK_CONFIG to a path.`,
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().BoolVar(&opts.Daemon, "daemon", false, "Run in daemon mode with inotify monitoring")
|
||||
cmd.Flags().BoolVar(&opts.Cron, "cron", false, "Run in cron mode (silent unless error)")
|
||||
cmd.Flags().BoolVar(&opts.Prune, "prune", false, "Delete all previous snapshots and unreferenced blobs after backup")
|
||||
cmd.Flags().BoolVar(&opts.Prune, "prune", false, "After backup, drop older snapshots of the same name and remove orphaned blobs")
|
||||
cmd.Flags().StringVar(&opts.KeepNewerThan, "keep-newer-than", "", "With --prune: keep snapshots newer than this duration (e.g. 4w, 30d, 6mo) instead of only the latest")
|
||||
cmd.Flags().BoolVar(&opts.SkipErrors, "skip-errors", false, "Skip file read errors (log them loudly but continue)")
|
||||
|
||||
return cmd
|
||||
@@ -167,21 +172,23 @@ func newSnapshotListCommand() *cobra.Command {
|
||||
|
||||
// newSnapshotPurgeCommand creates the 'snapshot purge' subcommand
|
||||
func newSnapshotPurgeCommand() *cobra.Command {
|
||||
var keepLatest bool
|
||||
var olderThan string
|
||||
var force bool
|
||||
opts := &vaultik.SnapshotPurgeOptions{}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "purge",
|
||||
Short: "Purge old snapshots",
|
||||
Long: "Removes snapshots based on age or count criteria",
|
||||
Long: `Removes snapshots based on age or count criteria.
|
||||
|
||||
Retention is per-snapshot-name: --keep-latest keeps the latest of each
|
||||
configured snapshot name, not the latest globally. Use --snapshot to
|
||||
restrict the operation to specific snapshot names.`,
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
// Validate flags
|
||||
if !keepLatest && olderThan == "" {
|
||||
if !opts.KeepLatest && opts.OlderThan == "" {
|
||||
return fmt.Errorf("must specify either --keep-latest or --older-than")
|
||||
}
|
||||
if keepLatest && olderThan != "" {
|
||||
if opts.KeepLatest && opts.OlderThan != "" {
|
||||
return fmt.Errorf("cannot specify both --keep-latest and --older-than")
|
||||
}
|
||||
|
||||
@@ -205,7 +212,7 @@ func newSnapshotPurgeCommand() *cobra.Command {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
go func() {
|
||||
if err := v.PurgeSnapshots(keepLatest, olderThan, force); err != nil {
|
||||
if err := v.PurgeSnapshotsWithOptions(opts); err != nil {
|
||||
if err != context.Canceled {
|
||||
log.Error("Failed to purge snapshots", "error", err)
|
||||
os.Exit(1)
|
||||
@@ -228,9 +235,10 @@ func newSnapshotPurgeCommand() *cobra.Command {
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().BoolVar(&keepLatest, "keep-latest", false, "Keep only the latest snapshot")
|
||||
cmd.Flags().StringVar(&olderThan, "older-than", "", "Remove snapshots older than duration (e.g., 30d, 6m, 1y)")
|
||||
cmd.Flags().BoolVar(&force, "force", false, "Skip confirmation prompt")
|
||||
cmd.Flags().BoolVar(&opts.KeepLatest, "keep-latest", false, "Keep only the latest snapshot of each name")
|
||||
cmd.Flags().StringVar(&opts.OlderThan, "older-than", "", "Remove snapshots older than duration (e.g., 30d, 6m, 1y)")
|
||||
cmd.Flags().BoolVar(&opts.Force, "force", false, "Skip confirmation prompt")
|
||||
cmd.Flags().StringArrayVar(&opts.Names, "snapshot", nil, "Restrict to snapshots with these names (repeat for multiple)")
|
||||
|
||||
return cmd
|
||||
}
|
||||
@@ -276,13 +284,7 @@ func newSnapshotVerifyCommand() *cobra.Command {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
go func() {
|
||||
var err error
|
||||
if opts.Deep {
|
||||
err = v.RunDeepVerify(snapshotID, opts)
|
||||
} else {
|
||||
err = v.VerifySnapshotWithOptions(snapshotID, opts)
|
||||
}
|
||||
if err != nil {
|
||||
if err := v.VerifySnapshotWithOptions(snapshotID, opts); err != nil {
|
||||
if err != context.Canceled {
|
||||
if !opts.JSON {
|
||||
log.Error("Verification failed", "error", err)
|
||||
@@ -465,3 +467,60 @@ accumulate from incomplete backups or deleted snapshots.`,
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
// newSnapshotCleanupCommand creates the 'snapshot cleanup' subcommand
|
||||
func newSnapshotCleanupCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "cleanup",
|
||||
Short: "Remove stale local snapshot records not found in remote storage",
|
||||
Long: `Removes local database records for snapshots whose metadata no longer
|
||||
exists in remote storage. These are typically left behind by incomplete
|
||||
or interrupted backups.
|
||||
|
||||
This command does not delete anything from remote storage.`,
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
configPath, err := ResolveConfigPath()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rootFlags := GetRootFlags()
|
||||
return RunWithApp(cmd.Context(), AppOptions{
|
||||
ConfigPath: configPath,
|
||||
LogOptions: log.LogOptions{
|
||||
Verbose: rootFlags.Verbose,
|
||||
Debug: rootFlags.Debug,
|
||||
Quiet: rootFlags.Quiet,
|
||||
},
|
||||
Modules: []fx.Option{},
|
||||
Invokes: []fx.Option{
|
||||
fx.Invoke(func(v *vaultik.Vaultik, lc fx.Lifecycle) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
go func() {
|
||||
if err := v.CleanupLocalSnapshots(); err != nil {
|
||||
if err != context.Canceled {
|
||||
log.Error("Cleanup failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
if err := v.Shutdowner.Shutdown(); err != nil {
|
||||
log.Error("Failed to shutdown", "error", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
},
|
||||
OnStop: func(ctx context.Context) error {
|
||||
v.Cancel()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}),
|
||||
},
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
@@ -1,98 +0,0 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||
"git.eeqj.de/sneak/vaultik/internal/vaultik"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/fx"
|
||||
)
|
||||
|
||||
// NewVerifyCommand creates the verify command
|
||||
func NewVerifyCommand() *cobra.Command {
|
||||
opts := &vaultik.VerifyOptions{}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "verify <snapshot-id>",
|
||||
Short: "Verify snapshot integrity",
|
||||
Long: `Verifies that all blobs referenced in a snapshot exist and optionally verifies their contents.
|
||||
|
||||
Shallow verification (default):
|
||||
- Downloads and decompresses manifest
|
||||
- Checks existence of all blobs in S3
|
||||
- Reports missing blobs
|
||||
|
||||
Deep verification (--deep):
|
||||
- Downloads and decrypts database
|
||||
- Verifies blob lists match between manifest and database
|
||||
- Downloads, decrypts, and decompresses each blob
|
||||
- Verifies SHA256 hash of each chunk matches database
|
||||
- Ensures chunks are ordered correctly
|
||||
|
||||
The command will fail immediately on any verification error and exit with non-zero status.`,
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
snapshotID := args[0]
|
||||
|
||||
// Use unified config resolution
|
||||
configPath, err := ResolveConfigPath()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Use the app framework for all verification
|
||||
rootFlags := GetRootFlags()
|
||||
return RunWithApp(cmd.Context(), AppOptions{
|
||||
ConfigPath: configPath,
|
||||
LogOptions: log.LogOptions{
|
||||
Verbose: rootFlags.Verbose,
|
||||
Debug: rootFlags.Debug,
|
||||
Quiet: rootFlags.Quiet || opts.JSON, // Suppress log output in JSON mode
|
||||
},
|
||||
Modules: []fx.Option{},
|
||||
Invokes: []fx.Option{
|
||||
fx.Invoke(func(v *vaultik.Vaultik, lc fx.Lifecycle) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
// Run the verify operation directly
|
||||
go func() {
|
||||
var err error
|
||||
if opts.Deep {
|
||||
err = v.RunDeepVerify(snapshotID, opts)
|
||||
} else {
|
||||
err = v.VerifySnapshotWithOptions(snapshotID, opts)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if err != context.Canceled {
|
||||
if !opts.JSON {
|
||||
log.Error("Verification failed", "error", err)
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
if err := v.Shutdowner.Shutdown(); err != nil {
|
||||
log.Error("Failed to shutdown", "error", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
},
|
||||
OnStop: func(ctx context.Context) error {
|
||||
log.Debug("Stopping verify operation")
|
||||
v.Cancel()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}),
|
||||
},
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().BoolVar(&opts.Deep, "deep", false, "Perform deep verification by downloading and verifying all blob contents")
|
||||
cmd.Flags().BoolVar(&opts.JSON, "json", false, "Output verification results as JSON")
|
||||
|
||||
return cmd
|
||||
}
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"filippo.io/age"
|
||||
"git.eeqj.de/sneak/smartconfig"
|
||||
@@ -85,14 +84,11 @@ func (c *Config) SnapshotNames() []string {
|
||||
type Config struct {
|
||||
AgeRecipients []string `yaml:"age_recipients"`
|
||||
AgeSecretKey string `yaml:"age_secret_key"`
|
||||
BackupInterval time.Duration `yaml:"backup_interval"`
|
||||
BlobSizeLimit Size `yaml:"blob_size_limit"`
|
||||
ChunkSize Size `yaml:"chunk_size"`
|
||||
Exclude []string `yaml:"exclude"` // Global excludes applied to all snapshots
|
||||
FullScanInterval time.Duration `yaml:"full_scan_interval"`
|
||||
Hostname string `yaml:"hostname"`
|
||||
IndexPath string `yaml:"index_path"`
|
||||
MinTimeBetweenRun time.Duration `yaml:"min_time_between_run"`
|
||||
S3 S3Config `yaml:"s3"`
|
||||
Snapshots map[string]SnapshotConfig `yaml:"snapshots"`
|
||||
CompressionLevel int `yaml:"compression_level"`
|
||||
@@ -157,9 +153,6 @@ func Load(path string) (*Config, error) {
|
||||
// Set defaults
|
||||
BlobSizeLimit: Size(10 * 1024 * 1024 * 1024), // 10GB
|
||||
ChunkSize: Size(10 * 1024 * 1024), // 10MB
|
||||
BackupInterval: 1 * time.Hour,
|
||||
FullScanInterval: 24 * time.Hour,
|
||||
MinTimeBetweenRun: 15 * time.Minute,
|
||||
IndexPath: filepath.Join(xdg.DataHome, appName, "index.sqlite"),
|
||||
CompressionLevel: 3,
|
||||
}
|
||||
@@ -243,11 +236,11 @@ func Load(path string) (*Config, error) {
|
||||
// Returns an error describing the first validation failure encountered.
|
||||
func (c *Config) Validate() error {
|
||||
if len(c.AgeRecipients) == 0 {
|
||||
return fmt.Errorf("at least one age_recipient is required")
|
||||
return fmt.Errorf("at least one age_recipient is required (generate with: age-keygen)")
|
||||
}
|
||||
|
||||
if len(c.Snapshots) == 0 {
|
||||
return fmt.Errorf("at least one snapshot must be configured")
|
||||
return fmt.Errorf("at least one snapshot must be configured (see config.example.yml)")
|
||||
}
|
||||
|
||||
for name, snap := range c.Snapshots {
|
||||
@@ -306,7 +299,7 @@ func (c *Config) validateStorage() error {
|
||||
|
||||
// Legacy S3 configuration
|
||||
if c.S3.Endpoint == "" {
|
||||
return fmt.Errorf("s3.endpoint is required (or set storage_url)")
|
||||
return fmt.Errorf("storage not configured; set storage_url or provide s3.endpoint + s3.bucket + credentials")
|
||||
}
|
||||
|
||||
if c.S3.Bucket == "" {
|
||||
|
||||
@@ -29,7 +29,6 @@ func TestCascadeDeleteDebug(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/cascade-test.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
|
||||
@@ -22,7 +22,6 @@ func TestChunkFileRepository(t *testing.T) {
|
||||
file1 := &File{
|
||||
Path: "/file1.txt",
|
||||
MTime: testTime,
|
||||
CTime: testTime,
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -37,7 +36,6 @@ func TestChunkFileRepository(t *testing.T) {
|
||||
file2 := &File{
|
||||
Path: "/file2.txt",
|
||||
MTime: testTime,
|
||||
CTime: testTime,
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -138,9 +136,9 @@ func TestChunkFileRepositoryComplexDeduplication(t *testing.T) {
|
||||
|
||||
// Create test files
|
||||
testTime := time.Now().Truncate(time.Second)
|
||||
file1 := &File{Path: "/file1.txt", MTime: testTime, CTime: testTime, Size: 3072, Mode: 0644, UID: 1000, GID: 1000}
|
||||
file2 := &File{Path: "/file2.txt", MTime: testTime, CTime: testTime, Size: 3072, Mode: 0644, UID: 1000, GID: 1000}
|
||||
file3 := &File{Path: "/file3.txt", MTime: testTime, CTime: testTime, Size: 2048, Mode: 0644, UID: 1000, GID: 1000}
|
||||
file1 := &File{Path: "/file1.txt", MTime: testTime, Size: 3072, Mode: 0644, UID: 1000, GID: 1000}
|
||||
file2 := &File{Path: "/file2.txt", MTime: testTime, Size: 3072, Mode: 0644, UID: 1000, GID: 1000}
|
||||
file3 := &File{Path: "/file3.txt", MTime: testTime, Size: 2048, Mode: 0644, UID: 1000, GID: 1000}
|
||||
|
||||
if err := fileRepo.Create(ctx, nil, file1); err != nil {
|
||||
t.Fatalf("failed to create file1: %v", err)
|
||||
|
||||
@@ -6,24 +6,32 @@
|
||||
// multiple source files. Blobs are content-addressed, meaning their filename
|
||||
// is derived from their SHA256 hash after compression and encryption.
|
||||
//
|
||||
// The database does not support migrations. If the schema changes, delete
|
||||
// the local database and perform a full backup to recreate it.
|
||||
// Schema is managed via numbered SQL migrations embedded in the schema/
|
||||
// directory. Migration 000.sql bootstraps the schema_migrations tracking
|
||||
// table; subsequent migrations (001, 002, …) are applied in order.
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"embed"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
//go:embed schema.sql
|
||||
var schemaSQL string
|
||||
//go:embed schema/*.sql
|
||||
var schemaFS embed.FS
|
||||
|
||||
// bootstrapVersion is the migration that creates the schema_migrations
|
||||
// table itself. It is applied before the normal migration loop.
|
||||
const bootstrapVersion = 0
|
||||
|
||||
// DB represents the Vaultik local index database connection.
|
||||
// It uses SQLite to track file metadata, content-defined chunks, and blob associations.
|
||||
@@ -35,6 +43,46 @@ type DB struct {
|
||||
path string
|
||||
}
|
||||
|
||||
// ParseMigrationVersion extracts the numeric version prefix from a migration
|
||||
// filename. Filenames must follow the pattern "<version>.sql" or
|
||||
// "<version>_<description>.sql", where version is a zero-padded numeric
|
||||
// string (e.g. "001", "002"). Returns the version as an integer and an
|
||||
// error if the filename does not match the expected pattern.
|
||||
func ParseMigrationVersion(filename string) (int, error) {
|
||||
name := strings.TrimSuffix(filename, filepath.Ext(filename))
|
||||
if name == "" {
|
||||
return 0, fmt.Errorf("invalid migration filename %q: empty name", filename)
|
||||
}
|
||||
|
||||
// Split on underscore to separate version from description.
|
||||
// If there's no underscore, the entire stem is the version.
|
||||
versionStr := name
|
||||
if idx := strings.IndexByte(name, '_'); idx >= 0 {
|
||||
versionStr = name[:idx]
|
||||
}
|
||||
|
||||
if versionStr == "" {
|
||||
return 0, fmt.Errorf("invalid migration filename %q: empty version prefix", filename)
|
||||
}
|
||||
|
||||
// Validate the version is purely numeric.
|
||||
for _, ch := range versionStr {
|
||||
if ch < '0' || ch > '9' {
|
||||
return 0, fmt.Errorf(
|
||||
"invalid migration filename %q: version %q contains non-numeric character %q",
|
||||
filename, versionStr, string(ch),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
version, err := strconv.Atoi(versionStr)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid migration filename %q: %w", filename, err)
|
||||
}
|
||||
|
||||
return version, nil
|
||||
}
|
||||
|
||||
// New creates a new database connection at the specified path.
|
||||
// It creates the schema if needed and configures SQLite with WAL mode for
|
||||
// better concurrency. SQLite handles crash recovery automatically when
|
||||
@@ -72,9 +120,9 @@ func New(ctx context.Context, path string) (*DB, error) {
|
||||
}
|
||||
|
||||
db := &DB{conn: conn, path: path}
|
||||
if err := db.createSchema(ctx); err != nil {
|
||||
if err := applyMigrations(ctx, conn); err != nil {
|
||||
_ = conn.Close()
|
||||
return nil, fmt.Errorf("creating schema: %w", err)
|
||||
return nil, fmt.Errorf("applying migrations: %w", err)
|
||||
}
|
||||
return db, nil
|
||||
}
|
||||
@@ -125,9 +173,9 @@ func New(ctx context.Context, path string) (*DB, error) {
|
||||
}
|
||||
|
||||
db := &DB{conn: conn, path: path}
|
||||
if err := db.createSchema(ctx); err != nil {
|
||||
if err := applyMigrations(ctx, conn); err != nil {
|
||||
_ = conn.Close()
|
||||
return nil, fmt.Errorf("creating schema: %w", err)
|
||||
return nil, fmt.Errorf("applying migrations: %w", err)
|
||||
}
|
||||
|
||||
log.Debug("Database connection established successfully", "path", path)
|
||||
@@ -198,9 +246,120 @@ func (db *DB) QueryRowWithLog(
|
||||
return db.conn.QueryRowContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (db *DB) createSchema(ctx context.Context) error {
|
||||
_, err := db.conn.ExecContext(ctx, schemaSQL)
|
||||
// collectMigrations reads the embedded schema directory and returns
|
||||
// migration filenames sorted lexicographically.
|
||||
func collectMigrations() ([]string, error) {
|
||||
entries, err := schemaFS.ReadDir("schema")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read schema directory: %w", err)
|
||||
}
|
||||
|
||||
var migrations []string
|
||||
|
||||
for _, entry := range entries {
|
||||
if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".sql") {
|
||||
migrations = append(migrations, entry.Name())
|
||||
}
|
||||
}
|
||||
|
||||
sort.Strings(migrations)
|
||||
|
||||
return migrations, nil
|
||||
}
|
||||
|
||||
// bootstrapMigrationsTable ensures the schema_migrations table exists
|
||||
// by applying 000.sql if the table is missing.
|
||||
func bootstrapMigrationsTable(ctx context.Context, db *sql.DB) error {
|
||||
var tableExists int
|
||||
|
||||
err := db.QueryRowContext(ctx,
|
||||
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_migrations'",
|
||||
).Scan(&tableExists)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check for migrations table: %w", err)
|
||||
}
|
||||
|
||||
if tableExists > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
content, err := schemaFS.ReadFile("schema/000.sql")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read bootstrap migration 000.sql: %w", err)
|
||||
}
|
||||
|
||||
log.Info("applying bootstrap migration", "version", bootstrapVersion)
|
||||
|
||||
_, err = db.ExecContext(ctx, string(content))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to apply bootstrap migration: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// applyMigrations applies all pending migrations to db. It first bootstraps
|
||||
// the schema_migrations table via 000.sql, then iterates through remaining
|
||||
// migration files in order.
|
||||
func applyMigrations(ctx context.Context, db *sql.DB) error {
|
||||
if err := bootstrapMigrationsTable(ctx, db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
migrations, err := collectMigrations()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, migration := range migrations {
|
||||
version, parseErr := ParseMigrationVersion(migration)
|
||||
if parseErr != nil {
|
||||
return parseErr
|
||||
}
|
||||
|
||||
// Check if already applied.
|
||||
var count int
|
||||
|
||||
err := db.QueryRowContext(ctx,
|
||||
"SELECT COUNT(*) FROM schema_migrations WHERE version = ?",
|
||||
version,
|
||||
).Scan(&count)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check migration status: %w", err)
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
log.Debug("migration already applied", "version", version)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// Read and apply migration.
|
||||
content, readErr := schemaFS.ReadFile(filepath.Join("schema", migration))
|
||||
if readErr != nil {
|
||||
return fmt.Errorf("failed to read migration %s: %w", migration, readErr)
|
||||
}
|
||||
|
||||
log.Info("applying migration", "version", version)
|
||||
|
||||
_, execErr := db.ExecContext(ctx, string(content))
|
||||
if execErr != nil {
|
||||
return fmt.Errorf("failed to apply migration %s: %w", migration, execErr)
|
||||
}
|
||||
|
||||
// Record migration as applied.
|
||||
_, recErr := db.ExecContext(ctx,
|
||||
"INSERT INTO schema_migrations (version) VALUES (?)",
|
||||
version,
|
||||
)
|
||||
if recErr != nil {
|
||||
return fmt.Errorf("failed to record migration %s: %w", migration, recErr)
|
||||
}
|
||||
|
||||
log.Info("migration applied successfully", "version", version)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewTestDB creates an in-memory SQLite database for testing purposes.
|
||||
|
||||
@@ -2,6 +2,7 @@ package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
@@ -26,9 +27,10 @@ func TestDatabase(t *testing.T) {
|
||||
t.Fatal("database connection is nil")
|
||||
}
|
||||
|
||||
// Test schema creation (already done in New)
|
||||
// Test schema creation (already done in New via migrations)
|
||||
// Verify tables exist
|
||||
tables := []string{
|
||||
"schema_migrations",
|
||||
"files", "file_chunks", "chunks", "blobs",
|
||||
"blob_chunks", "chunk_files", "snapshots",
|
||||
}
|
||||
@@ -99,3 +101,139 @@ func TestDatabaseConcurrentAccess(t *testing.T) {
|
||||
t.Errorf("expected 10 chunks, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseMigrationVersion(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
filename string
|
||||
wantVer int
|
||||
wantError bool
|
||||
}{
|
||||
{name: "valid 000.sql", filename: "000.sql", wantVer: 0, wantError: false},
|
||||
{name: "valid 001.sql", filename: "001.sql", wantVer: 1, wantError: false},
|
||||
{name: "valid 099.sql", filename: "099.sql", wantVer: 99, wantError: false},
|
||||
{name: "valid with description", filename: "001_initial_schema.sql", wantVer: 1, wantError: false},
|
||||
{name: "valid large version", filename: "123_big_migration.sql", wantVer: 123, wantError: false},
|
||||
{name: "invalid alpha version", filename: "abc.sql", wantVer: 0, wantError: true},
|
||||
{name: "invalid mixed chars", filename: "12a.sql", wantVer: 0, wantError: true},
|
||||
{name: "invalid no extension", filename: "schema.sql", wantVer: 0, wantError: true},
|
||||
{name: "empty string", filename: "", wantVer: 0, wantError: true},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got, err := ParseMigrationVersion(tc.filename)
|
||||
if tc.wantError {
|
||||
if err == nil {
|
||||
t.Errorf("ParseMigrationVersion(%q) = %d, nil; want error", tc.filename, got)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("ParseMigrationVersion(%q) unexpected error: %v", tc.filename, err)
|
||||
return
|
||||
}
|
||||
if got != tc.wantVer {
|
||||
t.Errorf("ParseMigrationVersion(%q) = %d; want %d", tc.filename, got, tc.wantVer)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyMigrations_Idempotent(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
conn, err := sql.Open("sqlite", ":memory:?_foreign_keys=ON")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open database: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := conn.Close(); err != nil {
|
||||
t.Errorf("failed to close database: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
conn.SetMaxOpenConns(1)
|
||||
conn.SetMaxIdleConns(1)
|
||||
|
||||
// First run: apply all migrations.
|
||||
if err := applyMigrations(ctx, conn); err != nil {
|
||||
t.Fatalf("first applyMigrations failed: %v", err)
|
||||
}
|
||||
|
||||
// Count rows in schema_migrations after first run.
|
||||
var countBefore int
|
||||
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM schema_migrations").Scan(&countBefore); err != nil {
|
||||
t.Fatalf("failed to count schema_migrations after first run: %v", err)
|
||||
}
|
||||
|
||||
// Second run: must be a no-op.
|
||||
if err := applyMigrations(ctx, conn); err != nil {
|
||||
t.Fatalf("second applyMigrations failed: %v", err)
|
||||
}
|
||||
|
||||
// Count rows in schema_migrations after second run — must be unchanged.
|
||||
var countAfter int
|
||||
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM schema_migrations").Scan(&countAfter); err != nil {
|
||||
t.Fatalf("failed to count schema_migrations after second run: %v", err)
|
||||
}
|
||||
|
||||
if countBefore != countAfter {
|
||||
t.Errorf("schema_migrations row count changed: before=%d, after=%d", countBefore, countAfter)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBootstrapMigrationsTable_FreshDatabase(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
conn, err := sql.Open("sqlite", ":memory:?_foreign_keys=ON")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open database: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := conn.Close(); err != nil {
|
||||
t.Errorf("failed to close database: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
conn.SetMaxOpenConns(1)
|
||||
conn.SetMaxIdleConns(1)
|
||||
|
||||
// Verify schema_migrations does NOT exist yet.
|
||||
var tableBefore int
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_migrations'",
|
||||
).Scan(&tableBefore); err != nil {
|
||||
t.Fatalf("failed to check for table before bootstrap: %v", err)
|
||||
}
|
||||
if tableBefore != 0 {
|
||||
t.Fatal("schema_migrations table should not exist before bootstrap")
|
||||
}
|
||||
|
||||
// Run bootstrap.
|
||||
if err := bootstrapMigrationsTable(ctx, conn); err != nil {
|
||||
t.Fatalf("bootstrapMigrationsTable failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify schema_migrations now exists.
|
||||
var tableAfter int
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_migrations'",
|
||||
).Scan(&tableAfter); err != nil {
|
||||
t.Fatalf("failed to check for table after bootstrap: %v", err)
|
||||
}
|
||||
if tableAfter != 1 {
|
||||
t.Fatalf("schema_migrations table should exist after bootstrap, got count=%d", tableAfter)
|
||||
}
|
||||
|
||||
// Verify version 0 row exists.
|
||||
var version int
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
"SELECT version FROM schema_migrations WHERE version = 0",
|
||||
).Scan(&version); err != nil {
|
||||
t.Fatalf("version 0 row not found in schema_migrations: %v", err)
|
||||
}
|
||||
if version != 0 {
|
||||
t.Errorf("expected version 0, got %d", version)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@ func TestFileChunkRepository(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/test/file.txt",
|
||||
MTime: testTime,
|
||||
CTime: testTime,
|
||||
Size: 3072,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -135,7 +134,6 @@ func TestFileChunkRepositoryMultipleFiles(t *testing.T) {
|
||||
file := &File{
|
||||
Path: types.FilePath(path),
|
||||
MTime: testTime,
|
||||
CTime: testTime,
|
||||
Size: 2048,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
|
||||
@@ -25,12 +25,11 @@ func (r *FileRepository) Create(ctx context.Context, tx *sql.Tx, file *File) err
|
||||
}
|
||||
|
||||
query := `
|
||||
INSERT INTO files (id, path, source_path, mtime, ctime, size, mode, uid, gid, link_target)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
INSERT INTO files (id, path, source_path, mtime, size, mode, uid, gid, link_target)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(path) DO UPDATE SET
|
||||
source_path = excluded.source_path,
|
||||
mtime = excluded.mtime,
|
||||
ctime = excluded.ctime,
|
||||
size = excluded.size,
|
||||
mode = excluded.mode,
|
||||
uid = excluded.uid,
|
||||
@@ -42,10 +41,10 @@ func (r *FileRepository) Create(ctx context.Context, tx *sql.Tx, file *File) err
|
||||
var idStr string
|
||||
var err error
|
||||
if tx != nil {
|
||||
LogSQL("Execute", query, file.ID.String(), file.Path.String(), file.SourcePath.String(), file.MTime.Unix(), file.CTime.Unix(), file.Size, file.Mode, file.UID, file.GID, file.LinkTarget.String())
|
||||
err = tx.QueryRowContext(ctx, query, file.ID.String(), file.Path.String(), file.SourcePath.String(), file.MTime.Unix(), file.CTime.Unix(), file.Size, file.Mode, file.UID, file.GID, file.LinkTarget.String()).Scan(&idStr)
|
||||
LogSQL("Execute", query, file.ID.String(), file.Path.String(), file.SourcePath.String(), file.MTime.Unix(), file.Size, file.Mode, file.UID, file.GID, file.LinkTarget.String())
|
||||
err = tx.QueryRowContext(ctx, query, file.ID.String(), file.Path.String(), file.SourcePath.String(), file.MTime.Unix(), file.Size, file.Mode, file.UID, file.GID, file.LinkTarget.String()).Scan(&idStr)
|
||||
} else {
|
||||
err = r.db.QueryRowWithLog(ctx, query, file.ID.String(), file.Path.String(), file.SourcePath.String(), file.MTime.Unix(), file.CTime.Unix(), file.Size, file.Mode, file.UID, file.GID, file.LinkTarget.String()).Scan(&idStr)
|
||||
err = r.db.QueryRowWithLog(ctx, query, file.ID.String(), file.Path.String(), file.SourcePath.String(), file.MTime.Unix(), file.Size, file.Mode, file.UID, file.GID, file.LinkTarget.String()).Scan(&idStr)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@@ -63,7 +62,7 @@ func (r *FileRepository) Create(ctx context.Context, tx *sql.Tx, file *File) err
|
||||
|
||||
func (r *FileRepository) GetByPath(ctx context.Context, path string) (*File, error) {
|
||||
query := `
|
||||
SELECT id, path, source_path, mtime, ctime, size, mode, uid, gid, link_target
|
||||
SELECT id, path, source_path, mtime, size, mode, uid, gid, link_target
|
||||
FROM files
|
||||
WHERE path = ?
|
||||
`
|
||||
@@ -82,7 +81,7 @@ func (r *FileRepository) GetByPath(ctx context.Context, path string) (*File, err
|
||||
// GetByID retrieves a file by its UUID
|
||||
func (r *FileRepository) GetByID(ctx context.Context, id types.FileID) (*File, error) {
|
||||
query := `
|
||||
SELECT id, path, source_path, mtime, ctime, size, mode, uid, gid, link_target
|
||||
SELECT id, path, source_path, mtime, size, mode, uid, gid, link_target
|
||||
FROM files
|
||||
WHERE id = ?
|
||||
`
|
||||
@@ -100,7 +99,7 @@ func (r *FileRepository) GetByID(ctx context.Context, id types.FileID) (*File, e
|
||||
|
||||
func (r *FileRepository) GetByPathTx(ctx context.Context, tx *sql.Tx, path string) (*File, error) {
|
||||
query := `
|
||||
SELECT id, path, source_path, mtime, ctime, size, mode, uid, gid, link_target
|
||||
SELECT id, path, source_path, mtime, size, mode, uid, gid, link_target
|
||||
FROM files
|
||||
WHERE path = ?
|
||||
`
|
||||
@@ -123,7 +122,7 @@ func (r *FileRepository) GetByPathTx(ctx context.Context, tx *sql.Tx, path strin
|
||||
func (r *FileRepository) scanFile(row *sql.Row) (*File, error) {
|
||||
var file File
|
||||
var idStr, pathStr, sourcePathStr string
|
||||
var mtimeUnix, ctimeUnix int64
|
||||
var mtimeUnix int64
|
||||
var linkTarget sql.NullString
|
||||
|
||||
err := row.Scan(
|
||||
@@ -131,7 +130,6 @@ func (r *FileRepository) scanFile(row *sql.Row) (*File, error) {
|
||||
&pathStr,
|
||||
&sourcePathStr,
|
||||
&mtimeUnix,
|
||||
&ctimeUnix,
|
||||
&file.Size,
|
||||
&file.Mode,
|
||||
&file.UID,
|
||||
@@ -149,7 +147,6 @@ func (r *FileRepository) scanFile(row *sql.Row) (*File, error) {
|
||||
file.Path = types.FilePath(pathStr)
|
||||
file.SourcePath = types.SourcePath(sourcePathStr)
|
||||
file.MTime = time.Unix(mtimeUnix, 0).UTC()
|
||||
file.CTime = time.Unix(ctimeUnix, 0).UTC()
|
||||
if linkTarget.Valid {
|
||||
file.LinkTarget = types.FilePath(linkTarget.String)
|
||||
}
|
||||
@@ -161,7 +158,7 @@ func (r *FileRepository) scanFile(row *sql.Row) (*File, error) {
|
||||
func (r *FileRepository) scanFileRows(rows *sql.Rows) (*File, error) {
|
||||
var file File
|
||||
var idStr, pathStr, sourcePathStr string
|
||||
var mtimeUnix, ctimeUnix int64
|
||||
var mtimeUnix int64
|
||||
var linkTarget sql.NullString
|
||||
|
||||
err := rows.Scan(
|
||||
@@ -169,7 +166,6 @@ func (r *FileRepository) scanFileRows(rows *sql.Rows) (*File, error) {
|
||||
&pathStr,
|
||||
&sourcePathStr,
|
||||
&mtimeUnix,
|
||||
&ctimeUnix,
|
||||
&file.Size,
|
||||
&file.Mode,
|
||||
&file.UID,
|
||||
@@ -187,7 +183,6 @@ func (r *FileRepository) scanFileRows(rows *sql.Rows) (*File, error) {
|
||||
file.Path = types.FilePath(pathStr)
|
||||
file.SourcePath = types.SourcePath(sourcePathStr)
|
||||
file.MTime = time.Unix(mtimeUnix, 0).UTC()
|
||||
file.CTime = time.Unix(ctimeUnix, 0).UTC()
|
||||
if linkTarget.Valid {
|
||||
file.LinkTarget = types.FilePath(linkTarget.String)
|
||||
}
|
||||
@@ -197,7 +192,7 @@ func (r *FileRepository) scanFileRows(rows *sql.Rows) (*File, error) {
|
||||
|
||||
func (r *FileRepository) ListModifiedSince(ctx context.Context, since time.Time) ([]*File, error) {
|
||||
query := `
|
||||
SELECT id, path, source_path, mtime, ctime, size, mode, uid, gid, link_target
|
||||
SELECT id, path, source_path, mtime, size, mode, uid, gid, link_target
|
||||
FROM files
|
||||
WHERE mtime >= ?
|
||||
ORDER BY path
|
||||
@@ -258,7 +253,7 @@ func (r *FileRepository) DeleteByID(ctx context.Context, tx *sql.Tx, id types.Fi
|
||||
|
||||
func (r *FileRepository) ListByPrefix(ctx context.Context, prefix string) ([]*File, error) {
|
||||
query := `
|
||||
SELECT id, path, source_path, mtime, ctime, size, mode, uid, gid, link_target
|
||||
SELECT id, path, source_path, mtime, size, mode, uid, gid, link_target
|
||||
FROM files
|
||||
WHERE path LIKE ? || '%'
|
||||
ORDER BY path
|
||||
@@ -285,7 +280,7 @@ func (r *FileRepository) ListByPrefix(ctx context.Context, prefix string) ([]*Fi
|
||||
// ListAll returns all files in the database
|
||||
func (r *FileRepository) ListAll(ctx context.Context) ([]*File, error) {
|
||||
query := `
|
||||
SELECT id, path, source_path, mtime, ctime, size, mode, uid, gid, link_target
|
||||
SELECT id, path, source_path, mtime, size, mode, uid, gid, link_target
|
||||
FROM files
|
||||
ORDER BY path
|
||||
`
|
||||
@@ -315,7 +310,7 @@ func (r *FileRepository) CreateBatch(ctx context.Context, tx *sql.Tx, files []*F
|
||||
return nil
|
||||
}
|
||||
|
||||
// Each File has 10 values, so batch at 100 to be safe with SQLite's variable limit
|
||||
// Each File has 9 values, so batch at 100 to be safe with SQLite's variable limit
|
||||
const batchSize = 100
|
||||
|
||||
for i := 0; i < len(files); i += batchSize {
|
||||
@@ -325,19 +320,18 @@ func (r *FileRepository) CreateBatch(ctx context.Context, tx *sql.Tx, files []*F
|
||||
}
|
||||
batch := files[i:end]
|
||||
|
||||
query := `INSERT INTO files (id, path, source_path, mtime, ctime, size, mode, uid, gid, link_target) VALUES `
|
||||
args := make([]interface{}, 0, len(batch)*10)
|
||||
query := `INSERT INTO files (id, path, source_path, mtime, size, mode, uid, gid, link_target) VALUES `
|
||||
args := make([]interface{}, 0, len(batch)*9)
|
||||
for j, f := range batch {
|
||||
if j > 0 {
|
||||
query += ", "
|
||||
}
|
||||
query += "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
||||
args = append(args, f.ID.String(), f.Path.String(), f.SourcePath.String(), f.MTime.Unix(), f.CTime.Unix(), f.Size, f.Mode, f.UID, f.GID, f.LinkTarget.String())
|
||||
query += "(?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
||||
args = append(args, f.ID.String(), f.Path.String(), f.SourcePath.String(), f.MTime.Unix(), f.Size, f.Mode, f.UID, f.GID, f.LinkTarget.String())
|
||||
}
|
||||
query += ` ON CONFLICT(path) DO UPDATE SET
|
||||
source_path = excluded.source_path,
|
||||
mtime = excluded.mtime,
|
||||
ctime = excluded.ctime,
|
||||
size = excluded.size,
|
||||
mode = excluded.mode,
|
||||
uid = excluded.uid,
|
||||
|
||||
@@ -39,7 +39,6 @@ func TestFileRepository(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/test/file.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -124,7 +123,6 @@ func TestFileRepositorySymlink(t *testing.T) {
|
||||
symlink := &File{
|
||||
Path: "/test/link",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 0,
|
||||
Mode: uint32(0777 | os.ModeSymlink),
|
||||
UID: 1000,
|
||||
@@ -161,7 +159,6 @@ func TestFileRepositoryTransaction(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/test/tx_file.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
|
||||
@@ -17,7 +17,6 @@ type File struct {
|
||||
Path types.FilePath // Absolute path of the file
|
||||
SourcePath types.SourcePath // The source directory this file came from (for restore path stripping)
|
||||
MTime time.Time
|
||||
CTime time.Time
|
||||
Size int64
|
||||
Mode uint32
|
||||
UID uint32
|
||||
|
||||
@@ -23,7 +23,6 @@ func TestRepositoriesTransaction(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/test/tx_file.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -146,7 +145,6 @@ func TestRepositoriesTransactionRollback(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/test/rollback_file.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -202,7 +200,6 @@ func TestRepositoriesReadTransaction(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/test/read_file.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -226,7 +223,6 @@ func TestRepositoriesReadTransaction(t *testing.T) {
|
||||
_ = repos.Files.Create(ctx, tx, &File{
|
||||
Path: "/test/should_fail.txt",
|
||||
MTime: time.Now(),
|
||||
CTime: time.Now(),
|
||||
Size: 0,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
|
||||
@@ -23,7 +23,6 @@ func TestFileRepositoryUUIDGeneration(t *testing.T) {
|
||||
{
|
||||
Path: "/file1.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -32,7 +31,6 @@ func TestFileRepositoryUUIDGeneration(t *testing.T) {
|
||||
{
|
||||
Path: "/file2.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 2048,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -72,7 +70,6 @@ func TestFileRepositoryGetByID(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/test.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -120,7 +117,6 @@ func TestOrphanedFileCleanup(t *testing.T) {
|
||||
file1 := &File{
|
||||
Path: "/orphaned.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -129,7 +125,6 @@ func TestOrphanedFileCleanup(t *testing.T) {
|
||||
file2 := &File{
|
||||
Path: "/referenced.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 2048,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -218,7 +213,6 @@ func TestOrphanedChunkCleanup(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/test.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -348,7 +342,6 @@ func TestFileChunkRepositoryWithUUIDs(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/test.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 3072,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -419,7 +412,6 @@ func TestChunkFileRepositoryWithUUIDs(t *testing.T) {
|
||||
file1 := &File{
|
||||
Path: "/file1.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -428,7 +420,6 @@ func TestChunkFileRepositoryWithUUIDs(t *testing.T) {
|
||||
file2 := &File{
|
||||
Path: "/file2.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -586,7 +577,6 @@ func TestComplexOrphanedDataScenario(t *testing.T) {
|
||||
files[i] = &File{
|
||||
Path: types.FilePath(fmt.Sprintf("/file%d.txt", i)),
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -678,7 +668,6 @@ func TestCascadeDelete(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/cascade-test.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -750,7 +739,6 @@ func TestTransactionIsolation(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/tx-test.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -812,7 +800,6 @@ func TestConcurrentOrphanedCleanup(t *testing.T) {
|
||||
file := &File{
|
||||
Path: types.FilePath(fmt.Sprintf("/concurrent-%d.txt", i)),
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
|
||||
@@ -18,7 +18,6 @@ func TestOrphanedFileCleanupDebug(t *testing.T) {
|
||||
file1 := &File{
|
||||
Path: "/orphaned.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -27,7 +26,6 @@ func TestOrphanedFileCleanupDebug(t *testing.T) {
|
||||
file2 := &File{
|
||||
Path: "/referenced.txt",
|
||||
MTime: time.Now().Truncate(time.Second),
|
||||
CTime: time.Now().Truncate(time.Second),
|
||||
Size: 2048,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
|
||||
@@ -29,7 +29,6 @@ func TestFileRepositoryEdgeCases(t *testing.T) {
|
||||
file: &File{
|
||||
Path: "",
|
||||
MTime: time.Now(),
|
||||
CTime: time.Now(),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -42,7 +41,6 @@ func TestFileRepositoryEdgeCases(t *testing.T) {
|
||||
file: &File{
|
||||
Path: types.FilePath("/" + strings.Repeat("a", 4096)),
|
||||
MTime: time.Now(),
|
||||
CTime: time.Now(),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -55,7 +53,6 @@ func TestFileRepositoryEdgeCases(t *testing.T) {
|
||||
file: &File{
|
||||
Path: "/test/file with spaces and 特殊文字.txt",
|
||||
MTime: time.Now(),
|
||||
CTime: time.Now(),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -68,7 +65,6 @@ func TestFileRepositoryEdgeCases(t *testing.T) {
|
||||
file: &File{
|
||||
Path: "/empty.txt",
|
||||
MTime: time.Now(),
|
||||
CTime: time.Now(),
|
||||
Size: 0,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -81,7 +77,6 @@ func TestFileRepositoryEdgeCases(t *testing.T) {
|
||||
file: &File{
|
||||
Path: "/link",
|
||||
MTime: time.Now(),
|
||||
CTime: time.Now(),
|
||||
Size: 0,
|
||||
Mode: 0777 | 0120000, // symlink mode
|
||||
UID: 1000,
|
||||
@@ -123,7 +118,6 @@ func TestDuplicateHandling(t *testing.T) {
|
||||
file1 := &File{
|
||||
Path: "/duplicate.txt",
|
||||
MTime: time.Now(),
|
||||
CTime: time.Now(),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -132,7 +126,6 @@ func TestDuplicateHandling(t *testing.T) {
|
||||
file2 := &File{
|
||||
Path: "/duplicate.txt", // Same path
|
||||
MTime: time.Now().Add(time.Hour),
|
||||
CTime: time.Now().Add(time.Hour),
|
||||
Size: 2048,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -192,7 +185,6 @@ func TestDuplicateHandling(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/test-dup-fc.txt",
|
||||
MTime: time.Now(),
|
||||
CTime: time.Now(),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -244,7 +236,6 @@ func TestNullHandling(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/regular.txt",
|
||||
MTime: time.Now(),
|
||||
CTime: time.Now(),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -349,7 +340,6 @@ func TestLargeDatasets(t *testing.T) {
|
||||
file := &File{
|
||||
Path: types.FilePath(fmt.Sprintf("/large/file%05d.txt", i)),
|
||||
MTime: time.Now(),
|
||||
CTime: time.Now(),
|
||||
Size: int64(i * 1024),
|
||||
Mode: 0644,
|
||||
UID: uint32(1000 + (i % 10)),
|
||||
@@ -474,7 +464,6 @@ func TestQueryInjection(t *testing.T) {
|
||||
file := &File{
|
||||
Path: types.FilePath(injection),
|
||||
MTime: time.Now(),
|
||||
CTime: time.Now(),
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
@@ -513,7 +502,6 @@ func TestTimezoneHandling(t *testing.T) {
|
||||
file := &File{
|
||||
Path: "/timezone-test.txt",
|
||||
MTime: nyTime,
|
||||
CTime: nyTime,
|
||||
Size: 1024,
|
||||
Mode: 0644,
|
||||
UID: 1000,
|
||||
|
||||
9
internal/database/schema/000.sql
Normal file
9
internal/database/schema/000.sql
Normal file
@@ -0,0 +1,9 @@
|
||||
-- Migration 000: Schema migrations tracking table
|
||||
-- Applied as a bootstrap step before the normal migration loop.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||
version INTEGER PRIMARY KEY,
|
||||
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
INSERT OR IGNORE INTO schema_migrations (version) VALUES (0);
|
||||
@@ -1,6 +1,5 @@
|
||||
-- Vaultik Database Schema
|
||||
-- Note: This database does not support migrations. If the schema changes,
|
||||
-- delete the local database and perform a full backup to recreate it.
|
||||
-- Migration 001: Initial Vaultik schema
|
||||
-- All core tables for tracking files, chunks, blobs, snapshots, and uploads.
|
||||
|
||||
-- Files table: stores metadata about files in the filesystem
|
||||
CREATE TABLE IF NOT EXISTS files (
|
||||
@@ -8,7 +7,6 @@ CREATE TABLE IF NOT EXISTS files (
|
||||
path TEXT NOT NULL UNIQUE,
|
||||
source_path TEXT NOT NULL DEFAULT '', -- The source directory this file came from (for restore path stripping)
|
||||
mtime INTEGER NOT NULL,
|
||||
ctime INTEGER NOT NULL,
|
||||
size INTEGER NOT NULL,
|
||||
mode INTEGER NOT NULL,
|
||||
uid INTEGER NOT NULL,
|
||||
@@ -103,7 +101,7 @@ CREATE TABLE IF NOT EXISTS snapshot_files (
|
||||
file_id TEXT NOT NULL,
|
||||
PRIMARY KEY (snapshot_id, file_id),
|
||||
FOREIGN KEY (snapshot_id) REFERENCES snapshots(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY (file_id) REFERENCES files(id)
|
||||
FOREIGN KEY (file_id) REFERENCES files(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
-- Index for efficient file lookups (used in orphan detection)
|
||||
@@ -116,7 +114,7 @@ CREATE TABLE IF NOT EXISTS snapshot_blobs (
|
||||
blob_hash TEXT NOT NULL,
|
||||
PRIMARY KEY (snapshot_id, blob_id),
|
||||
FOREIGN KEY (snapshot_id) REFERENCES snapshots(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY (blob_id) REFERENCES blobs(id)
|
||||
FOREIGN KEY (blob_id) REFERENCES blobs(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
-- Index for efficient blob lookups (used in orphan detection)
|
||||
@@ -130,7 +128,7 @@ CREATE TABLE IF NOT EXISTS uploads (
|
||||
size INTEGER NOT NULL,
|
||||
duration_ms INTEGER NOT NULL,
|
||||
FOREIGN KEY (blob_hash) REFERENCES blobs(blob_hash),
|
||||
FOREIGN KEY (snapshot_id) REFERENCES snapshots(id)
|
||||
FOREIGN KEY (snapshot_id) REFERENCES snapshots(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
-- Index for efficient snapshot lookups
|
||||
@@ -1,11 +0,0 @@
|
||||
-- Track blob upload metrics
|
||||
CREATE TABLE IF NOT EXISTS uploads (
|
||||
blob_hash TEXT PRIMARY KEY,
|
||||
uploaded_at TIMESTAMP NOT NULL,
|
||||
size INTEGER NOT NULL,
|
||||
duration_ms INTEGER NOT NULL,
|
||||
FOREIGN KEY (blob_hash) REFERENCES blobs(blob_hash)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_uploads_uploaded_at ON uploads(uploaded_at);
|
||||
CREATE INDEX idx_uploads_duration ON uploads(duration_ms);
|
||||
@@ -63,10 +63,3 @@ type Chunk struct {
|
||||
Offset int64
|
||||
Length int64
|
||||
}
|
||||
|
||||
// DirtyPath represents a path marked for backup by inotify
|
||||
type DirtyPath struct {
|
||||
Path string
|
||||
MarkedAt time.Time
|
||||
EventType string // "create", "modify", "delete"
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sync/atomic"
|
||||
|
||||
@@ -10,6 +11,7 @@ import (
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
"github.com/aws/smithy-go/logging"
|
||||
)
|
||||
|
||||
@@ -203,10 +205,13 @@ func (c *Client) HeadObject(ctx context.Context, key string) (bool, error) {
|
||||
Key: aws.String(fullKey),
|
||||
})
|
||||
if err != nil {
|
||||
// Check if it's a not found error
|
||||
// TODO: Add proper error type checking
|
||||
var notFound *s3types.NotFound
|
||||
var noSuchKey *s3types.NoSuchKey
|
||||
if errors.As(err, ¬Found) || errors.As(err, &noSuchKey) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -345,7 +345,6 @@ func (b *BackupEngine) Backup(ctx context.Context, fsys fs.FS, root string) (str
|
||||
Size: info.Size(),
|
||||
Mode: uint32(info.Mode()),
|
||||
MTime: info.ModTime(),
|
||||
CTime: info.ModTime(), // Use mtime as ctime for test
|
||||
UID: 1000, // Default UID for test
|
||||
GID: 1000, // Default GID for test
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -110,15 +110,15 @@ func TestScannerSimpleDirectory(t *testing.T) {
|
||||
t.Errorf("expected at least 97 bytes scanned, got %d", result.BytesScanned)
|
||||
}
|
||||
|
||||
// Verify files in database - only regular files are stored
|
||||
// Verify files in database - includes regular files and directories
|
||||
files, err := repos.Files.ListByPrefix(ctx, "/source")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to list files: %v", err)
|
||||
}
|
||||
|
||||
// We should have 6 files (directories are not stored)
|
||||
if len(files) != 6 {
|
||||
t.Errorf("expected 6 files in database, got %d", len(files))
|
||||
// 6 regular files + 3 directories (/source, /source/subdir, /source/subdir2)
|
||||
if len(files) != 9 {
|
||||
t.Errorf("expected 9 entries in database (6 files + 3 dirs), got %d", len(files))
|
||||
}
|
||||
|
||||
// Verify specific file
|
||||
|
||||
@@ -227,12 +227,39 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st
|
||||
}
|
||||
}()
|
||||
|
||||
// Steps 1-5: Copy, clean, vacuum, compress, and read the database
|
||||
finalData, tempDBPath, err := sm.prepareExportDB(ctx, dbPath, snapshotID, tempDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Step 6: Generate blob manifest (before closing temp DB)
|
||||
blobManifest, err := sm.generateBlobManifest(ctx, tempDBPath, snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("generating blob manifest: %w", err)
|
||||
}
|
||||
|
||||
// Step 7: Upload to S3 in snapshot subdirectory
|
||||
if err := sm.uploadSnapshotArtifacts(ctx, snapshotID, finalData, blobManifest); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("Uploaded snapshot metadata",
|
||||
"snapshot_id", snapshotID,
|
||||
"db_size", len(finalData),
|
||||
"manifest_size", len(blobManifest))
|
||||
return nil
|
||||
}
|
||||
|
||||
// prepareExportDB copies, cleans, vacuums, and compresses the snapshot database for export.
|
||||
// Returns the compressed data and the path to the temporary database (needed for manifest generation).
|
||||
func (sm *SnapshotManager) prepareExportDB(ctx context.Context, dbPath, snapshotID, tempDir string) ([]byte, string, error) {
|
||||
// Step 1: Copy database to temp file
|
||||
// The main database should be closed at this point
|
||||
tempDBPath := filepath.Join(tempDir, "snapshot.db")
|
||||
log.Debug("Copying database to temporary location", "source", dbPath, "destination", tempDBPath)
|
||||
if err := sm.copyFile(dbPath, tempDBPath); err != nil {
|
||||
return fmt.Errorf("copying database: %w", err)
|
||||
return nil, "", fmt.Errorf("copying database: %w", err)
|
||||
}
|
||||
log.Debug("Database copy complete", "size", sm.getFileSize(tempDBPath))
|
||||
|
||||
@@ -240,7 +267,7 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st
|
||||
log.Debug("Cleaning temporary database", "snapshot_id", snapshotID)
|
||||
stats, err := sm.cleanSnapshotDB(ctx, tempDBPath, snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cleaning snapshot database: %w", err)
|
||||
return nil, "", fmt.Errorf("cleaning snapshot database: %w", err)
|
||||
}
|
||||
log.Info("Temporary database cleanup complete",
|
||||
"db_path", tempDBPath,
|
||||
@@ -255,14 +282,14 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st
|
||||
// Step 3: VACUUM the database to remove deleted data and compact
|
||||
// This is critical for security - ensures no stale/deleted data is uploaded
|
||||
if err := sm.vacuumDatabase(tempDBPath); err != nil {
|
||||
return fmt.Errorf("vacuuming database: %w", err)
|
||||
return nil, "", fmt.Errorf("vacuuming database: %w", err)
|
||||
}
|
||||
log.Debug("Database vacuumed", "size", humanize.Bytes(uint64(sm.getFileSize(tempDBPath))))
|
||||
|
||||
// Step 4: Compress and encrypt the binary database file
|
||||
compressedPath := filepath.Join(tempDir, "db.zst.age")
|
||||
if err := sm.compressFile(tempDBPath, compressedPath); err != nil {
|
||||
return fmt.Errorf("compressing database: %w", err)
|
||||
return nil, "", fmt.Errorf("compressing database: %w", err)
|
||||
}
|
||||
log.Debug("Compression complete",
|
||||
"original_size", humanize.Bytes(uint64(sm.getFileSize(tempDBPath))),
|
||||
@@ -271,49 +298,43 @@ func (sm *SnapshotManager) ExportSnapshotMetadata(ctx context.Context, dbPath st
|
||||
// Step 5: Read compressed and encrypted data for upload
|
||||
finalData, err := afero.ReadFile(sm.fs, compressedPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reading compressed dump: %w", err)
|
||||
return nil, "", fmt.Errorf("reading compressed dump: %w", err)
|
||||
}
|
||||
|
||||
// Step 6: Generate blob manifest (before closing temp DB)
|
||||
blobManifest, err := sm.generateBlobManifest(ctx, tempDBPath, snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("generating blob manifest: %w", err)
|
||||
}
|
||||
return finalData, tempDBPath, nil
|
||||
}
|
||||
|
||||
// Step 7: Upload to S3 in snapshot subdirectory
|
||||
// uploadSnapshotArtifacts uploads the database backup and blob manifest to S3
|
||||
func (sm *SnapshotManager) uploadSnapshotArtifacts(ctx context.Context, snapshotID string, dbData, manifestData []byte) error {
|
||||
// Upload database backup (compressed and encrypted)
|
||||
dbKey := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID)
|
||||
|
||||
dbUploadStart := time.Now()
|
||||
if err := sm.storage.Put(ctx, dbKey, bytes.NewReader(finalData)); err != nil {
|
||||
if err := sm.storage.Put(ctx, dbKey, bytes.NewReader(dbData)); err != nil {
|
||||
return fmt.Errorf("uploading snapshot database: %w", err)
|
||||
}
|
||||
dbUploadDuration := time.Since(dbUploadStart)
|
||||
dbUploadSpeed := float64(len(finalData)) * 8 / dbUploadDuration.Seconds() // bits per second
|
||||
dbUploadSpeed := float64(len(dbData)) * 8 / dbUploadDuration.Seconds() // bits per second
|
||||
log.Info("Uploaded snapshot database",
|
||||
"path", dbKey,
|
||||
"size", humanize.Bytes(uint64(len(finalData))),
|
||||
"size", humanize.Bytes(uint64(len(dbData))),
|
||||
"duration", dbUploadDuration,
|
||||
"speed", humanize.SI(dbUploadSpeed, "bps"))
|
||||
|
||||
// Upload blob manifest (compressed only, not encrypted)
|
||||
manifestKey := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||
manifestUploadStart := time.Now()
|
||||
if err := sm.storage.Put(ctx, manifestKey, bytes.NewReader(blobManifest)); err != nil {
|
||||
if err := sm.storage.Put(ctx, manifestKey, bytes.NewReader(manifestData)); err != nil {
|
||||
return fmt.Errorf("uploading blob manifest: %w", err)
|
||||
}
|
||||
manifestUploadDuration := time.Since(manifestUploadStart)
|
||||
manifestUploadSpeed := float64(len(blobManifest)) * 8 / manifestUploadDuration.Seconds() // bits per second
|
||||
manifestUploadSpeed := float64(len(manifestData)) * 8 / manifestUploadDuration.Seconds() // bits per second
|
||||
log.Info("Uploaded blob manifest",
|
||||
"path", manifestKey,
|
||||
"size", humanize.Bytes(uint64(len(blobManifest))),
|
||||
"size", humanize.Bytes(uint64(len(manifestData))),
|
||||
"duration", manifestUploadDuration,
|
||||
"speed", humanize.SI(manifestUploadSpeed, "bps"))
|
||||
|
||||
log.Info("Uploaded snapshot metadata",
|
||||
"snapshot_id", snapshotID,
|
||||
"db_size", len(finalData),
|
||||
"manifest_size", len(blobManifest))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
93
internal/vaultik/blob_fetch.go
Normal file
93
internal/vaultik/blob_fetch.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package vaultik
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"filippo.io/age"
|
||||
"git.eeqj.de/sneak/vaultik/internal/blobgen"
|
||||
)
|
||||
|
||||
// hashVerifyReader wraps a blobgen.Reader and verifies the double-SHA-256 hash
|
||||
// of decrypted plaintext when Close is called. It reuses the hash that
|
||||
// blobgen.Reader already computes internally via its TeeReader, avoiding
|
||||
// redundant SHA-256 computation.
|
||||
type hashVerifyReader struct {
|
||||
reader *blobgen.Reader // underlying decrypted blob reader (has internal hasher)
|
||||
fetcher io.ReadCloser // raw fetched stream (closed on Close)
|
||||
blobHash string // expected double-SHA-256 hex
|
||||
done bool // EOF reached
|
||||
}
|
||||
|
||||
func (h *hashVerifyReader) Read(p []byte) (int, error) {
|
||||
n, err := h.reader.Read(p)
|
||||
if err == io.EOF {
|
||||
h.done = true
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Close verifies the hash (if the stream was fully read) and closes underlying readers.
|
||||
func (h *hashVerifyReader) Close() error {
|
||||
readerErr := h.reader.Close()
|
||||
fetcherErr := h.fetcher.Close()
|
||||
|
||||
if h.done {
|
||||
firstHash := h.reader.Sum256()
|
||||
secondHasher := sha256.New()
|
||||
secondHasher.Write(firstHash)
|
||||
actualHashHex := hex.EncodeToString(secondHasher.Sum(nil))
|
||||
if actualHashHex != h.blobHash {
|
||||
return fmt.Errorf("blob hash mismatch: expected %s, got %s", h.blobHash[:16], actualHashHex[:16])
|
||||
}
|
||||
}
|
||||
|
||||
if readerErr != nil {
|
||||
return readerErr
|
||||
}
|
||||
return fetcherErr
|
||||
}
|
||||
|
||||
// FetchAndDecryptBlob downloads a blob, decrypts and decompresses it, and
|
||||
// returns a streaming reader that computes the double-SHA-256 hash on the fly.
|
||||
// The hash is verified when the returned reader is closed (after fully reading).
|
||||
// This avoids buffering the entire blob in memory.
|
||||
func (v *Vaultik) FetchAndDecryptBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) (io.ReadCloser, error) {
|
||||
rc, _, err := v.FetchBlob(ctx, blobHash, expectedSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reader, err := blobgen.NewReader(rc, identity)
|
||||
if err != nil {
|
||||
_ = rc.Close()
|
||||
return nil, fmt.Errorf("creating blob reader: %w", err)
|
||||
}
|
||||
|
||||
return &hashVerifyReader{
|
||||
reader: reader,
|
||||
fetcher: rc,
|
||||
blobHash: blobHash,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// FetchBlob downloads a blob and returns a reader for the encrypted data.
|
||||
func (v *Vaultik) FetchBlob(ctx context.Context, blobHash string, expectedSize int64) (io.ReadCloser, int64, error) {
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blobHash[:2], blobHash[2:4], blobHash)
|
||||
|
||||
rc, err := v.Storage.Get(ctx, blobPath)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("downloading blob %s: %w", blobHash[:16], err)
|
||||
}
|
||||
|
||||
info, err := v.Storage.Stat(ctx, blobPath)
|
||||
if err != nil {
|
||||
_ = rc.Close()
|
||||
return nil, 0, fmt.Errorf("stat blob %s: %w", blobHash[:16], err)
|
||||
}
|
||||
|
||||
return rc, info.Size, nil
|
||||
}
|
||||
100
internal/vaultik/blob_fetch_hash_test.go
Normal file
100
internal/vaultik/blob_fetch_hash_test.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package vaultik_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"filippo.io/age"
|
||||
"git.eeqj.de/sneak/vaultik/internal/blobgen"
|
||||
"git.eeqj.de/sneak/vaultik/internal/vaultik"
|
||||
)
|
||||
|
||||
// TestFetchAndDecryptBlobVerifiesHash verifies that FetchAndDecryptBlob checks
|
||||
// the double-SHA-256 hash of the decrypted plaintext against the expected blob hash.
|
||||
func TestFetchAndDecryptBlobVerifiesHash(t *testing.T) {
|
||||
identity, err := age.GenerateX25519Identity()
|
||||
if err != nil {
|
||||
t.Fatalf("generating identity: %v", err)
|
||||
}
|
||||
|
||||
// Create test data and encrypt it using blobgen.Writer
|
||||
plaintext := []byte("hello world test data for blob hash verification")
|
||||
var encBuf bytes.Buffer
|
||||
writer, err := blobgen.NewWriter(&encBuf, 1, []string{identity.Recipient().String()})
|
||||
if err != nil {
|
||||
t.Fatalf("creating blobgen writer: %v", err)
|
||||
}
|
||||
if _, err := writer.Write(plaintext); err != nil {
|
||||
t.Fatalf("writing plaintext: %v", err)
|
||||
}
|
||||
if err := writer.Close(); err != nil {
|
||||
t.Fatalf("closing writer: %v", err)
|
||||
}
|
||||
encryptedData := encBuf.Bytes()
|
||||
|
||||
// Compute correct double-SHA-256 hash of the plaintext (matches blobgen.Writer.Sum256)
|
||||
firstHash := sha256.Sum256(plaintext)
|
||||
secondHash := sha256.Sum256(firstHash[:])
|
||||
correctHash := hex.EncodeToString(secondHash[:])
|
||||
|
||||
// Verify our hash matches what blobgen.Writer produces
|
||||
writerHash := hex.EncodeToString(writer.Sum256())
|
||||
if correctHash != writerHash {
|
||||
t.Fatalf("hash computation mismatch: manual=%s, writer=%s", correctHash, writerHash)
|
||||
}
|
||||
|
||||
// Set up mock storage with the blob at the correct path
|
||||
mockStorage := NewMockStorer()
|
||||
blobPath := "blobs/" + correctHash[:2] + "/" + correctHash[2:4] + "/" + correctHash
|
||||
mockStorage.mu.Lock()
|
||||
mockStorage.data[blobPath] = encryptedData
|
||||
mockStorage.mu.Unlock()
|
||||
|
||||
tv := vaultik.NewForTesting(mockStorage)
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("correct hash succeeds", func(t *testing.T) {
|
||||
rc, err := tv.FetchAndDecryptBlob(ctx, correctHash, int64(len(encryptedData)), identity)
|
||||
if err != nil {
|
||||
t.Fatalf("expected success, got error: %v", err)
|
||||
}
|
||||
data, err := io.ReadAll(rc)
|
||||
if err != nil {
|
||||
t.Fatalf("reading stream: %v", err)
|
||||
}
|
||||
if err := rc.Close(); err != nil {
|
||||
t.Fatalf("close (hash verification) failed: %v", err)
|
||||
}
|
||||
if !bytes.Equal(data, plaintext) {
|
||||
t.Fatalf("decrypted data mismatch: got %q, want %q", data, plaintext)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("wrong hash fails", func(t *testing.T) {
|
||||
// Use a fake hash that doesn't match the actual plaintext
|
||||
fakeHash := strings.Repeat("ab", 32) // 64 hex chars
|
||||
fakePath := "blobs/" + fakeHash[:2] + "/" + fakeHash[2:4] + "/" + fakeHash
|
||||
mockStorage.mu.Lock()
|
||||
mockStorage.data[fakePath] = encryptedData
|
||||
mockStorage.mu.Unlock()
|
||||
|
||||
rc, err := tv.FetchAndDecryptBlob(ctx, fakeHash, int64(len(encryptedData)), identity)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error opening stream: %v", err)
|
||||
}
|
||||
// Read all data — hash is verified on Close
|
||||
_, _ = io.ReadAll(rc)
|
||||
err = rc.Close()
|
||||
if err == nil {
|
||||
t.Fatal("expected error for mismatched hash, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "hash mismatch") {
|
||||
t.Fatalf("expected hash mismatch error, got: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -1,55 +0,0 @@
|
||||
package vaultik
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"filippo.io/age"
|
||||
"git.eeqj.de/sneak/vaultik/internal/blobgen"
|
||||
)
|
||||
|
||||
// FetchAndDecryptBlobResult holds the result of fetching and decrypting a blob.
|
||||
type FetchAndDecryptBlobResult struct {
|
||||
Data []byte
|
||||
}
|
||||
|
||||
// FetchAndDecryptBlob downloads a blob, decrypts it, and returns the plaintext data.
|
||||
func (v *Vaultik) FetchAndDecryptBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) (*FetchAndDecryptBlobResult, error) {
|
||||
rc, _, err := v.FetchBlob(ctx, blobHash, expectedSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() { _ = rc.Close() }()
|
||||
|
||||
reader, err := blobgen.NewReader(rc, identity)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating blob reader: %w", err)
|
||||
}
|
||||
defer func() { _ = reader.Close() }()
|
||||
|
||||
data, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading blob data: %w", err)
|
||||
}
|
||||
|
||||
return &FetchAndDecryptBlobResult{Data: data}, nil
|
||||
}
|
||||
|
||||
// FetchBlob downloads a blob and returns a reader for the encrypted data.
|
||||
func (v *Vaultik) FetchBlob(ctx context.Context, blobHash string, expectedSize int64) (io.ReadCloser, int64, error) {
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blobHash[:2], blobHash[2:4], blobHash)
|
||||
|
||||
rc, err := v.Storage.Get(ctx, blobPath)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("downloading blob %s: %w", blobHash[:16], err)
|
||||
}
|
||||
|
||||
info, err := v.Storage.Stat(ctx, blobPath)
|
||||
if err != nil {
|
||||
_ = rc.Close()
|
||||
return nil, 0, fmt.Errorf("stat blob %s: %w", blobHash[:16], err)
|
||||
}
|
||||
|
||||
return rc, info.Size, nil
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package vaultik
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -79,18 +80,55 @@ func parseSnapshotTimestamp(snapshotID string) (time.Time, error) {
|
||||
return timestamp.UTC(), nil
|
||||
}
|
||||
|
||||
// parseDuration parses a duration string with support for days
|
||||
func parseDuration(s string) (time.Duration, error) {
|
||||
// Check for days suffix
|
||||
if strings.HasSuffix(s, "d") {
|
||||
daysStr := strings.TrimSuffix(s, "d")
|
||||
days, err := strconv.Atoi(daysStr)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid days value: %w", err)
|
||||
// parseSnapshotName extracts the snapshot name from a snapshot ID.
|
||||
// Format: hostname_snapshotname_timestamp — the middle part(s) between hostname
|
||||
// and the RFC3339 timestamp are the snapshot name (may contain underscores).
|
||||
// Returns the snapshot name, or empty string if the ID is malformed.
|
||||
func parseSnapshotName(snapshotID string) string {
|
||||
parts := strings.Split(snapshotID, "_")
|
||||
if len(parts) < 3 {
|
||||
// Format: hostname_timestamp — no snapshot name
|
||||
return ""
|
||||
}
|
||||
return time.Duration(days) * 24 * time.Hour, nil
|
||||
// Format: hostname_name_timestamp — middle parts are the name.
|
||||
// The last part is the RFC3339 timestamp, the first part is the hostname,
|
||||
// everything in between is the snapshot name (which may itself contain underscores).
|
||||
return strings.Join(parts[1:len(parts)-1], "_")
|
||||
}
|
||||
|
||||
// parseDuration parses a duration string with support for human-friendly units:
|
||||
// d/day/days, w/week/weeks, mo/month/months, y/year/years, plus standard Go
|
||||
// duration units (h, m, s).
|
||||
func parseDuration(s string) (time.Duration, error) {
|
||||
if d, err := time.ParseDuration(s); err == nil {
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// Otherwise use standard Go duration parsing
|
||||
return time.ParseDuration(s)
|
||||
re := regexp.MustCompile(`(\d+)\s*([a-zA-Z]+)`)
|
||||
matches := re.FindAllStringSubmatch(s, -1)
|
||||
if len(matches) == 0 {
|
||||
return 0, fmt.Errorf("invalid duration: %q", s)
|
||||
}
|
||||
|
||||
var total time.Duration
|
||||
for _, match := range matches {
|
||||
n, err := strconv.Atoi(match[1])
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid number %q: %w", match[1], err)
|
||||
}
|
||||
unit := strings.ToLower(match[2])
|
||||
switch unit {
|
||||
case "d", "day", "days":
|
||||
total += time.Duration(n) * 24 * time.Hour
|
||||
case "w", "week", "weeks":
|
||||
total += time.Duration(n) * 7 * 24 * time.Hour
|
||||
case "mo", "month", "months":
|
||||
total += time.Duration(n) * 30 * 24 * time.Hour
|
||||
case "y", "year", "years":
|
||||
total += time.Duration(n) * 365 * 24 * time.Hour
|
||||
default:
|
||||
return 0, fmt.Errorf("unknown time unit %q", unit)
|
||||
}
|
||||
}
|
||||
return total, nil
|
||||
}
|
||||
|
||||
112
internal/vaultik/helpers_test.go
Normal file
112
internal/vaultik/helpers_test.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package vaultik
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestParseSnapshotName(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
snapshotID string
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "standard format with name",
|
||||
snapshotID: "myhost_home_2026-01-12T14:41:15Z",
|
||||
want: "home",
|
||||
},
|
||||
{
|
||||
name: "standard format with different name",
|
||||
snapshotID: "server1_system_2026-02-15T09:30:00Z",
|
||||
want: "system",
|
||||
},
|
||||
{
|
||||
name: "name with underscores",
|
||||
snapshotID: "myhost_my_special_backup_2026-03-01T00:00:00Z",
|
||||
want: "my_special_backup",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := parseSnapshotName(tt.snapshotID)
|
||||
if got != tt.want {
|
||||
t.Errorf("parseSnapshotName(%q) = %q, want %q", tt.snapshotID, got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseDuration(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
want time.Duration
|
||||
err bool
|
||||
}{
|
||||
{"30d", 30 * 24 * time.Hour, false},
|
||||
{"4w", 4 * 7 * 24 * time.Hour, false},
|
||||
{"6mo", 6 * 30 * 24 * time.Hour, false},
|
||||
{"1y", 365 * 24 * time.Hour, false},
|
||||
{"2w3d", 2*7*24*time.Hour + 3*24*time.Hour, false},
|
||||
{"1h", time.Hour, false},
|
||||
{"30s", 30 * time.Second, false},
|
||||
{"garbage", 0, true},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.input, func(t *testing.T) {
|
||||
got, err := parseDuration(tt.input)
|
||||
if tt.err {
|
||||
if err == nil {
|
||||
t.Fatalf("expected error for %q, got %v", tt.input, got)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error for %q: %v", tt.input, err)
|
||||
}
|
||||
if got != tt.want {
|
||||
t.Errorf("parseDuration(%q) = %v, want %v", tt.input, got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseSnapshotTimestamp(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
snapshotID string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "valid with name",
|
||||
snapshotID: "myhost_home_2026-01-12T14:41:15Z",
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "valid without name",
|
||||
snapshotID: "myhost_2026-01-12T14:41:15Z",
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "invalid - single part",
|
||||
snapshotID: "nounderscore",
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "invalid - bad timestamp",
|
||||
snapshotID: "myhost_home_notadate",
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
_, err := parseSnapshotTimestamp(tt.snapshotID)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("parseSnapshotTimestamp(%q) error = %v, wantErr %v", tt.snapshotID, err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -66,18 +66,6 @@ func (v *Vaultik) ShowInfo() error {
|
||||
}
|
||||
v.printlnStdout()
|
||||
|
||||
// Daemon Settings (if applicable)
|
||||
if v.Config.BackupInterval > 0 || v.Config.MinTimeBetweenRun > 0 {
|
||||
v.printfStdout("=== Daemon Settings ===\n")
|
||||
if v.Config.BackupInterval > 0 {
|
||||
v.printfStdout("Backup Interval: %s\n", v.Config.BackupInterval)
|
||||
}
|
||||
if v.Config.MinTimeBetweenRun > 0 {
|
||||
v.printfStdout("Minimum Time: %s\n", v.Config.MinTimeBetweenRun)
|
||||
}
|
||||
v.printlnStdout()
|
||||
}
|
||||
|
||||
// Local Database
|
||||
v.printfStdout("=== Local Database ===\n")
|
||||
v.printfStdout("Index Path: %s\n", v.Config.IndexPath)
|
||||
@@ -149,9 +137,9 @@ type RemoteInfoResult struct {
|
||||
|
||||
// RemoteInfo displays information about remote storage
|
||||
func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
log.Info("Starting remote storage info gathering")
|
||||
result := &RemoteInfoResult{}
|
||||
|
||||
// Get storage info
|
||||
storageInfo := v.Storage.Info()
|
||||
result.StorageType = storageInfo.Type
|
||||
result.StorageLocation = storageInfo.Location
|
||||
@@ -161,23 +149,52 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
v.printfStdout("Type: %s\n", storageInfo.Type)
|
||||
v.printfStdout("Location: %s\n", storageInfo.Location)
|
||||
v.printlnStdout()
|
||||
}
|
||||
|
||||
// List all snapshot metadata
|
||||
if !jsonOutput {
|
||||
v.printfStdout("Scanning snapshot metadata...\n")
|
||||
}
|
||||
|
||||
snapshotMetadata, snapshotIDs, err := v.collectSnapshotMetadata()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !jsonOutput {
|
||||
v.printfStdout("Downloading %d manifest(s)...\n", len(snapshotIDs))
|
||||
}
|
||||
|
||||
referencedBlobs := v.collectReferencedBlobsFromManifests(snapshotIDs, snapshotMetadata)
|
||||
|
||||
v.populateRemoteInfoResult(result, snapshotMetadata, snapshotIDs, referencedBlobs)
|
||||
|
||||
if err := v.scanRemoteBlobStorage(result, referencedBlobs, jsonOutput); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("Remote info complete",
|
||||
"snapshots", result.TotalMetadataCount,
|
||||
"total_blobs", result.TotalBlobCount,
|
||||
"referenced_blobs", result.ReferencedBlobCount,
|
||||
"orphaned_blobs", result.OrphanedBlobCount)
|
||||
|
||||
if jsonOutput {
|
||||
enc := json.NewEncoder(v.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(result)
|
||||
}
|
||||
|
||||
v.printRemoteInfoTable(result)
|
||||
return nil
|
||||
}
|
||||
|
||||
// collectSnapshotMetadata scans remote metadata and returns per-snapshot info and sorted IDs
|
||||
func (v *Vaultik) collectSnapshotMetadata() (map[string]*SnapshotMetadataInfo, []string, error) {
|
||||
snapshotMetadata := make(map[string]*SnapshotMetadataInfo)
|
||||
|
||||
// Collect metadata files
|
||||
metadataCh := v.Storage.ListStream(v.ctx, "metadata/")
|
||||
for obj := range metadataCh {
|
||||
if obj.Err != nil {
|
||||
return fmt.Errorf("listing metadata: %w", obj.Err)
|
||||
return nil, nil, fmt.Errorf("listing metadata: %w", obj.Err)
|
||||
}
|
||||
|
||||
// Parse key: metadata/<snapshot-id>/<filename>
|
||||
parts := strings.Split(obj.Key, "/")
|
||||
if len(parts) < 3 {
|
||||
continue
|
||||
@@ -185,14 +202,11 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
snapshotID := parts[1]
|
||||
|
||||
if _, exists := snapshotMetadata[snapshotID]; !exists {
|
||||
snapshotMetadata[snapshotID] = &SnapshotMetadataInfo{
|
||||
SnapshotID: snapshotID,
|
||||
}
|
||||
snapshotMetadata[snapshotID] = &SnapshotMetadataInfo{SnapshotID: snapshotID}
|
||||
}
|
||||
|
||||
info := snapshotMetadata[snapshotID]
|
||||
filename := parts[2]
|
||||
|
||||
if strings.HasPrefix(filename, "manifest") {
|
||||
info.ManifestSize = obj.Size
|
||||
} else if strings.HasPrefix(filename, "db") {
|
||||
@@ -201,19 +215,18 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
info.TotalSize = info.ManifestSize + info.DatabaseSize
|
||||
}
|
||||
|
||||
// Sort snapshots by ID for consistent output
|
||||
var snapshotIDs []string
|
||||
for id := range snapshotMetadata {
|
||||
snapshotIDs = append(snapshotIDs, id)
|
||||
}
|
||||
sort.Strings(snapshotIDs)
|
||||
|
||||
// Download and parse all manifests to get referenced blobs
|
||||
if !jsonOutput {
|
||||
v.printfStdout("Downloading %d manifest(s)...\n", len(snapshotIDs))
|
||||
}
|
||||
return snapshotMetadata, snapshotIDs, nil
|
||||
}
|
||||
|
||||
referencedBlobs := make(map[string]int64) // hash -> compressed size
|
||||
// collectReferencedBlobsFromManifests downloads manifests and returns referenced blob hashes with sizes
|
||||
func (v *Vaultik) collectReferencedBlobsFromManifests(snapshotIDs []string, snapshotMetadata map[string]*SnapshotMetadataInfo) map[string]int64 {
|
||||
referencedBlobs := make(map[string]int64)
|
||||
|
||||
for _, snapshotID := range snapshotIDs {
|
||||
manifestKey := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||
@@ -230,10 +243,8 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
continue
|
||||
}
|
||||
|
||||
// Record blob info from manifest
|
||||
info := snapshotMetadata[snapshotID]
|
||||
info.BlobCount = manifest.BlobCount
|
||||
|
||||
var blobsSize int64
|
||||
for _, blob := range manifest.Blobs {
|
||||
referencedBlobs[blob.Hash] = blob.CompressedSize
|
||||
@@ -242,7 +253,11 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
info.BlobsSize = blobsSize
|
||||
}
|
||||
|
||||
// Build result snapshots
|
||||
return referencedBlobs
|
||||
}
|
||||
|
||||
// populateRemoteInfoResult fills in the result's snapshot and referenced blob stats
|
||||
func (v *Vaultik) populateRemoteInfoResult(result *RemoteInfoResult, snapshotMetadata map[string]*SnapshotMetadataInfo, snapshotIDs []string, referencedBlobs map[string]int64) {
|
||||
var totalMetadataSize int64
|
||||
for _, id := range snapshotIDs {
|
||||
info := snapshotMetadata[id]
|
||||
@@ -252,26 +267,25 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
result.TotalMetadataSize = totalMetadataSize
|
||||
result.TotalMetadataCount = len(snapshotIDs)
|
||||
|
||||
// Calculate referenced blob stats
|
||||
for _, size := range referencedBlobs {
|
||||
result.ReferencedBlobCount++
|
||||
result.ReferencedBlobSize += size
|
||||
}
|
||||
}
|
||||
|
||||
// List all blobs on remote
|
||||
// scanRemoteBlobStorage lists all blobs on remote and computes orphan stats
|
||||
func (v *Vaultik) scanRemoteBlobStorage(result *RemoteInfoResult, referencedBlobs map[string]int64, jsonOutput bool) error {
|
||||
if !jsonOutput {
|
||||
v.printfStdout("Scanning blobs...\n")
|
||||
}
|
||||
|
||||
allBlobs := make(map[string]int64) // hash -> size from storage
|
||||
|
||||
blobCh := v.Storage.ListStream(v.ctx, "blobs/")
|
||||
allBlobs := make(map[string]int64)
|
||||
|
||||
for obj := range blobCh {
|
||||
if obj.Err != nil {
|
||||
return fmt.Errorf("listing blobs: %w", obj.Err)
|
||||
}
|
||||
|
||||
// Extract hash from key: blobs/xx/yy/hash
|
||||
parts := strings.Split(obj.Key, "/")
|
||||
if len(parts) < 4 {
|
||||
continue
|
||||
@@ -282,7 +296,6 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
result.TotalBlobSize += obj.Size
|
||||
}
|
||||
|
||||
// Calculate orphaned blobs
|
||||
for hash, size := range allBlobs {
|
||||
if _, referenced := referencedBlobs[hash]; !referenced {
|
||||
result.OrphanedBlobCount++
|
||||
@@ -290,14 +303,11 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Output results
|
||||
if jsonOutput {
|
||||
enc := json.NewEncoder(v.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(result)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Human-readable output
|
||||
// printRemoteInfoTable renders the human-readable remote info output
|
||||
func (v *Vaultik) printRemoteInfoTable(result *RemoteInfoResult) {
|
||||
v.printfStdout("\n=== Snapshot Metadata ===\n")
|
||||
if len(result.Snapshots) == 0 {
|
||||
v.printfStdout("No snapshots found\n")
|
||||
@@ -320,20 +330,15 @@ func (v *Vaultik) RemoteInfo(jsonOutput bool) error {
|
||||
|
||||
v.printfStdout("\n=== Blob Storage ===\n")
|
||||
v.printfStdout("Total blobs on remote: %s (%s)\n",
|
||||
humanize.Comma(int64(result.TotalBlobCount)),
|
||||
humanize.Bytes(uint64(result.TotalBlobSize)))
|
||||
humanize.Comma(int64(result.TotalBlobCount)), humanize.Bytes(uint64(result.TotalBlobSize)))
|
||||
v.printfStdout("Referenced by snapshots: %s (%s)\n",
|
||||
humanize.Comma(int64(result.ReferencedBlobCount)),
|
||||
humanize.Bytes(uint64(result.ReferencedBlobSize)))
|
||||
humanize.Comma(int64(result.ReferencedBlobCount)), humanize.Bytes(uint64(result.ReferencedBlobSize)))
|
||||
v.printfStdout("Orphaned (unreferenced): %s (%s)\n",
|
||||
humanize.Comma(int64(result.OrphanedBlobCount)),
|
||||
humanize.Bytes(uint64(result.OrphanedBlobSize)))
|
||||
humanize.Comma(int64(result.OrphanedBlobCount)), humanize.Bytes(uint64(result.OrphanedBlobSize)))
|
||||
|
||||
if result.OrphanedBlobCount > 0 {
|
||||
v.printfStdout("\nRun 'vaultik prune --remote' to remove orphaned blobs.\n")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// truncateString truncates a string to maxLen, adding "..." if truncated
|
||||
|
||||
@@ -541,3 +541,174 @@ func TestBackupAndRestore(t *testing.T) {
|
||||
|
||||
t.Log("Backup and restore test completed successfully")
|
||||
}
|
||||
|
||||
// TestEndToEndFileStorage exercises the full backup → restore loop against the
|
||||
// real `file://` storage backend (FileStorer) on a real OS filesystem. This is
|
||||
// the closest local approximation of a production backup: encrypted blobs get
|
||||
// written to disk, the metadata SQLite database is exported through the same
|
||||
// blobgen pipeline as a real backup, and restoration reads them back through
|
||||
// the public Vaultik.Restore entrypoint. It is the canonical end-to-end smoke
|
||||
// test for 1.0.
|
||||
func TestEndToEndFileStorage(t *testing.T) {
|
||||
log.Initialize(log.Config{})
|
||||
|
||||
// Real OS filesystem (SQLite + FileStorer both need it).
|
||||
fs := afero.NewOsFs()
|
||||
tempDir, err := os.MkdirTemp("", "vaultik-e2e-")
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = os.RemoveAll(tempDir) }()
|
||||
|
||||
dataDir := filepath.Join(tempDir, "source")
|
||||
storeDir := filepath.Join(tempDir, "remote")
|
||||
restoreDir := filepath.Join(tempDir, "restored")
|
||||
dbPath := filepath.Join(tempDir, "index.sqlite")
|
||||
|
||||
// Write a representative mix of file sizes:
|
||||
// - empty file
|
||||
// - tiny text file
|
||||
// - file just under chunk boundary
|
||||
// - file forcing multiple chunks
|
||||
// - nested subdirectories
|
||||
chunkSize := int64(64 * 1024)
|
||||
maxBlobSize := int64(512 * 1024)
|
||||
|
||||
testFiles := map[string][]byte{
|
||||
filepath.Join(dataDir, "empty.txt"): {},
|
||||
filepath.Join(dataDir, "small.txt"): []byte("hello vaultik"),
|
||||
filepath.Join(dataDir, "subdir", "medium.bin"): bytesPattern("medium-", int(chunkSize/2)),
|
||||
filepath.Join(dataDir, "subdir", "large.bin"): bytesPattern("large-", int(chunkSize*4)),
|
||||
filepath.Join(dataDir, "deep", "nest", "leaf.txt"): []byte("leaf"),
|
||||
}
|
||||
|
||||
for path, content := range testFiles {
|
||||
require.NoError(t, fs.MkdirAll(filepath.Dir(path), 0o755))
|
||||
require.NoError(t, afero.WriteFile(fs, path, content, 0o644))
|
||||
}
|
||||
|
||||
// Create a file with non-default permissions.
|
||||
restrictedPath := filepath.Join(dataDir, "restricted.txt")
|
||||
require.NoError(t, afero.WriteFile(fs, restrictedPath, []byte("secret"), 0o600))
|
||||
testFiles[restrictedPath] = []byte("secret")
|
||||
|
||||
// Create an empty directory (should survive round-trip).
|
||||
emptyDir := filepath.Join(dataDir, "emptydir")
|
||||
require.NoError(t, fs.MkdirAll(emptyDir, 0o755))
|
||||
|
||||
// Create a symlink.
|
||||
symlinkPath := filepath.Join(dataDir, "link-to-small")
|
||||
require.NoError(t, os.Symlink("small.txt", symlinkPath))
|
||||
|
||||
// FileStorer is the real-world local-disk backend.
|
||||
storer, err := storage.NewFileStorer(storeDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
agePublicKey := "age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"
|
||||
ageSecretKey := "AGE-SECRET-KEY-19CR5YSFW59HM4TLD6GXVEDMZFTVVF7PPHKUT68TXSFPK7APHXA2QS2NJA5"
|
||||
|
||||
cfg := &config.Config{
|
||||
AgeRecipients: []string{agePublicKey},
|
||||
AgeSecretKey: ageSecretKey,
|
||||
CompressionLevel: 3,
|
||||
Hostname: "test-host",
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
db, err := database.New(ctx, dbPath)
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = db.Close() }()
|
||||
|
||||
repos := database.NewRepositories(db)
|
||||
|
||||
sm := snapshot.NewSnapshotManager(snapshot.SnapshotManagerParams{
|
||||
Repos: repos,
|
||||
Storage: storer,
|
||||
Config: cfg,
|
||||
})
|
||||
sm.SetFilesystem(fs)
|
||||
|
||||
scanner := snapshot.NewScanner(snapshot.ScannerConfig{
|
||||
FS: fs,
|
||||
Storage: storer,
|
||||
ChunkSize: chunkSize,
|
||||
MaxBlobSize: maxBlobSize,
|
||||
CompressionLevel: cfg.CompressionLevel,
|
||||
AgeRecipients: cfg.AgeRecipients,
|
||||
Repositories: repos,
|
||||
})
|
||||
|
||||
snapshotID, err := sm.CreateSnapshotWithName(ctx, cfg.Hostname, "e2e", "test-version", "test-git")
|
||||
require.NoError(t, err)
|
||||
|
||||
scanResult, err := scanner.Scan(ctx, dataDir, snapshotID)
|
||||
require.NoError(t, err)
|
||||
require.Greater(t, scanResult.FilesScanned, 0)
|
||||
require.Greater(t, scanResult.BlobsCreated, 0)
|
||||
|
||||
require.NoError(t, sm.CompleteSnapshot(ctx, snapshotID))
|
||||
require.NoError(t, sm.ExportSnapshotMetadata(ctx, dbPath, snapshotID))
|
||||
|
||||
// Verify the backup actually landed on disk under blobs/ and metadata/.
|
||||
blobInfo, err := os.Stat(filepath.Join(storeDir, "blobs"))
|
||||
require.NoError(t, err)
|
||||
require.True(t, blobInfo.IsDir())
|
||||
metaInfo, err := os.Stat(filepath.Join(storeDir, "metadata", snapshotID))
|
||||
require.NoError(t, err)
|
||||
require.True(t, metaInfo.IsDir())
|
||||
|
||||
// Tear down the source DB before restore — restore must work using only
|
||||
// the remote bytes plus the secret key, with no help from the local index.
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
restoreVaultik := &vaultik.Vaultik{
|
||||
Config: cfg,
|
||||
Storage: storer,
|
||||
Fs: fs,
|
||||
Stdout: io.Discard,
|
||||
Stderr: io.Discard,
|
||||
}
|
||||
restoreVaultik.SetContext(ctx)
|
||||
|
||||
require.NoError(t, restoreVaultik.Restore(&vaultik.RestoreOptions{
|
||||
SnapshotID: snapshotID,
|
||||
TargetDir: restoreDir,
|
||||
Verify: true,
|
||||
}))
|
||||
|
||||
// Byte-equality compare every original against its restored copy.
|
||||
for origPath, expected := range testFiles {
|
||||
restoredPath := filepath.Join(restoreDir, origPath)
|
||||
got, err := afero.ReadFile(fs, restoredPath)
|
||||
require.NoError(t, err, "restored file missing: %s", restoredPath)
|
||||
require.Equalf(t, expected, got, "byte-equality failed for %s", origPath)
|
||||
}
|
||||
|
||||
// Verify the restricted file kept its permissions.
|
||||
restoredRestricted := filepath.Join(restoreDir, restrictedPath)
|
||||
rInfo, err := os.Stat(restoredRestricted)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, os.FileMode(0o600), rInfo.Mode().Perm(),
|
||||
"restricted file should preserve 0600 permissions")
|
||||
|
||||
// Verify the empty directory was restored.
|
||||
restoredEmptyDir := filepath.Join(restoreDir, emptyDir)
|
||||
dInfo, err := os.Stat(restoredEmptyDir)
|
||||
require.NoError(t, err, "empty directory should be restored")
|
||||
assert.True(t, dInfo.IsDir(), "emptydir should be a directory")
|
||||
|
||||
// Verify the symlink was restored with the correct target.
|
||||
restoredSymlink := filepath.Join(restoreDir, symlinkPath)
|
||||
target, err := os.Readlink(restoredSymlink)
|
||||
require.NoError(t, err, "symlink should be restored")
|
||||
assert.Equal(t, "small.txt", target, "symlink target should be preserved")
|
||||
}
|
||||
|
||||
// bytesPattern returns a deterministic byte slice of length n with a tag prefix,
|
||||
// useful for forcing chunker behavior with reproducible content.
|
||||
func bytesPattern(tag string, n int) []byte {
|
||||
out := make([]byte, n)
|
||||
for i := range out {
|
||||
out[i] = byte(tag[i%len(tag)] ^ byte(i&0xff))
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -27,95 +27,19 @@ type PruneBlobsResult struct {
|
||||
func (v *Vaultik) PruneBlobs(opts *PruneOptions) error {
|
||||
log.Info("Starting prune operation")
|
||||
|
||||
// Get all remote snapshots and their manifests
|
||||
allBlobsReferenced := make(map[string]bool)
|
||||
manifestCount := 0
|
||||
|
||||
// List all snapshots in storage
|
||||
log.Info("Listing remote snapshots")
|
||||
objectCh := v.Storage.ListStream(v.ctx, "metadata/")
|
||||
|
||||
var snapshotIDs []string
|
||||
for object := range objectCh {
|
||||
if object.Err != nil {
|
||||
return fmt.Errorf("listing remote snapshots: %w", object.Err)
|
||||
}
|
||||
|
||||
// Extract snapshot ID from paths like metadata/hostname-20240115-143052Z/
|
||||
parts := strings.Split(object.Key, "/")
|
||||
if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" {
|
||||
// Check if this is a directory by looking for trailing slash
|
||||
if strings.HasSuffix(object.Key, "/") || strings.Contains(object.Key, "/manifest.json.zst") {
|
||||
snapshotID := parts[1]
|
||||
// Only add unique snapshot IDs
|
||||
found := false
|
||||
for _, id := range snapshotIDs {
|
||||
if id == snapshotID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
snapshotIDs = append(snapshotIDs, snapshotID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Found manifests in remote storage", "count", len(snapshotIDs))
|
||||
|
||||
// Download and parse each manifest to get referenced blobs
|
||||
for _, snapshotID := range snapshotIDs {
|
||||
log.Debug("Processing manifest", "snapshot_id", snapshotID)
|
||||
|
||||
manifest, err := v.downloadManifest(snapshotID)
|
||||
allBlobsReferenced, err := v.collectReferencedBlobs()
|
||||
if err != nil {
|
||||
log.Error("Failed to download manifest", "snapshot_id", snapshotID, "error", err)
|
||||
continue
|
||||
return err
|
||||
}
|
||||
|
||||
// Add all blobs from this manifest to our referenced set
|
||||
for _, blob := range manifest.Blobs {
|
||||
allBlobsReferenced[blob.Hash] = true
|
||||
}
|
||||
manifestCount++
|
||||
allBlobs, err := v.listAllRemoteBlobs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("Processed manifests", "count", manifestCount, "unique_blobs_referenced", len(allBlobsReferenced))
|
||||
unreferencedBlobs, totalSize := v.findUnreferencedBlobs(allBlobs, allBlobsReferenced)
|
||||
|
||||
// List all blobs in storage
|
||||
log.Info("Listing all blobs in storage")
|
||||
allBlobs := make(map[string]int64) // hash -> size
|
||||
blobObjectCh := v.Storage.ListStream(v.ctx, "blobs/")
|
||||
|
||||
for object := range blobObjectCh {
|
||||
if object.Err != nil {
|
||||
return fmt.Errorf("listing blobs: %w", object.Err)
|
||||
}
|
||||
|
||||
// Extract hash from path like blobs/ab/cd/abcdef123456...
|
||||
parts := strings.Split(object.Key, "/")
|
||||
if len(parts) == 4 && parts[0] == "blobs" {
|
||||
hash := parts[3]
|
||||
allBlobs[hash] = object.Size
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Found blobs in storage", "count", len(allBlobs))
|
||||
|
||||
// Find unreferenced blobs
|
||||
var unreferencedBlobs []string
|
||||
var totalSize int64
|
||||
for hash, size := range allBlobs {
|
||||
if !allBlobsReferenced[hash] {
|
||||
unreferencedBlobs = append(unreferencedBlobs, hash)
|
||||
totalSize += size
|
||||
}
|
||||
}
|
||||
|
||||
result := &PruneBlobsResult{
|
||||
BlobsFound: len(unreferencedBlobs),
|
||||
}
|
||||
result := &PruneBlobsResult{BlobsFound: len(unreferencedBlobs)}
|
||||
|
||||
if len(unreferencedBlobs) == 0 {
|
||||
log.Info("No unreferenced blobs found")
|
||||
@@ -126,18 +50,15 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Show what will be deleted
|
||||
log.Info("Found unreferenced blobs", "count", len(unreferencedBlobs), "total_size", humanize.Bytes(uint64(totalSize)))
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Found %d unreferenced blob(s) totaling %s\n", len(unreferencedBlobs), humanize.Bytes(uint64(totalSize)))
|
||||
}
|
||||
|
||||
// Confirm unless --force is used (skip in JSON mode - require --force)
|
||||
if !opts.Force && !opts.JSON {
|
||||
v.printfStdout("\nDelete %d unreferenced blob(s)? [y/N] ", len(unreferencedBlobs))
|
||||
var confirm string
|
||||
if _, err := v.scanStdin(&confirm); err != nil {
|
||||
// Treat EOF or error as "no"
|
||||
v.printlnStdout("Cancelled")
|
||||
return nil
|
||||
}
|
||||
@@ -147,10 +68,109 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Delete unreferenced blobs
|
||||
v.deleteUnreferencedBlobs(unreferencedBlobs, allBlobs, result)
|
||||
|
||||
if opts.JSON {
|
||||
return v.outputPruneBlobsJSON(result)
|
||||
}
|
||||
|
||||
v.printfStdout("\nDeleted %d blob(s) totaling %s\n", result.BlobsDeleted, humanize.Bytes(uint64(result.BytesFreed)))
|
||||
if result.BlobsFailed > 0 {
|
||||
v.printfStdout("Failed to delete %d blob(s)\n", result.BlobsFailed)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// collectReferencedBlobs downloads all manifests and returns the set of referenced blob hashes
|
||||
func (v *Vaultik) collectReferencedBlobs() (map[string]bool, error) {
|
||||
log.Info("Listing remote snapshots")
|
||||
snapshotIDs, err := v.listUniqueSnapshotIDs()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listing snapshot IDs: %w", err)
|
||||
}
|
||||
log.Info("Found manifests in remote storage", "count", len(snapshotIDs))
|
||||
|
||||
allBlobsReferenced := make(map[string]bool)
|
||||
manifestCount := 0
|
||||
|
||||
for _, snapshotID := range snapshotIDs {
|
||||
log.Debug("Processing manifest", "snapshot_id", snapshotID)
|
||||
manifest, err := v.downloadManifest(snapshotID)
|
||||
if err != nil {
|
||||
log.Error("Failed to download manifest", "snapshot_id", snapshotID, "error", err)
|
||||
continue
|
||||
}
|
||||
for _, blob := range manifest.Blobs {
|
||||
allBlobsReferenced[blob.Hash] = true
|
||||
}
|
||||
manifestCount++
|
||||
}
|
||||
|
||||
log.Info("Processed manifests", "count", manifestCount, "unique_blobs_referenced", len(allBlobsReferenced))
|
||||
return allBlobsReferenced, nil
|
||||
}
|
||||
|
||||
// listUniqueSnapshotIDs returns deduplicated snapshot IDs from remote metadata
|
||||
func (v *Vaultik) listUniqueSnapshotIDs() ([]string, error) {
|
||||
objectCh := v.Storage.ListStream(v.ctx, "metadata/")
|
||||
seen := make(map[string]bool)
|
||||
var snapshotIDs []string
|
||||
|
||||
for object := range objectCh {
|
||||
if object.Err != nil {
|
||||
return nil, fmt.Errorf("listing metadata objects: %w", object.Err)
|
||||
}
|
||||
parts := strings.Split(object.Key, "/")
|
||||
if len(parts) >= 2 && parts[0] == "metadata" && parts[1] != "" {
|
||||
if strings.HasSuffix(object.Key, "/") || strings.Contains(object.Key, "/manifest.json.zst") {
|
||||
snapshotID := parts[1]
|
||||
if !seen[snapshotID] {
|
||||
seen[snapshotID] = true
|
||||
snapshotIDs = append(snapshotIDs, snapshotID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return snapshotIDs, nil
|
||||
}
|
||||
|
||||
// listAllRemoteBlobs returns a map of all blob hashes to their sizes in remote storage
|
||||
func (v *Vaultik) listAllRemoteBlobs() (map[string]int64, error) {
|
||||
log.Info("Listing all blobs in storage")
|
||||
allBlobs := make(map[string]int64)
|
||||
blobObjectCh := v.Storage.ListStream(v.ctx, "blobs/")
|
||||
|
||||
for object := range blobObjectCh {
|
||||
if object.Err != nil {
|
||||
return nil, fmt.Errorf("listing blobs: %w", object.Err)
|
||||
}
|
||||
parts := strings.Split(object.Key, "/")
|
||||
if len(parts) == 4 && parts[0] == "blobs" {
|
||||
allBlobs[parts[3]] = object.Size
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Found blobs in storage", "count", len(allBlobs))
|
||||
return allBlobs, nil
|
||||
}
|
||||
|
||||
// findUnreferencedBlobs returns blob hashes not referenced by any manifest and their total size
|
||||
func (v *Vaultik) findUnreferencedBlobs(allBlobs map[string]int64, referenced map[string]bool) ([]string, int64) {
|
||||
var unreferenced []string
|
||||
var totalSize int64
|
||||
for hash, size := range allBlobs {
|
||||
if !referenced[hash] {
|
||||
unreferenced = append(unreferenced, hash)
|
||||
totalSize += size
|
||||
}
|
||||
}
|
||||
return unreferenced, totalSize
|
||||
}
|
||||
|
||||
// deleteUnreferencedBlobs deletes the given blobs from storage and populates the result
|
||||
func (v *Vaultik) deleteUnreferencedBlobs(unreferencedBlobs []string, allBlobs map[string]int64, result *PruneBlobsResult) {
|
||||
log.Info("Deleting unreferenced blobs")
|
||||
deletedCount := 0
|
||||
deletedSize := int64(0)
|
||||
|
||||
for i, hash := range unreferencedBlobs {
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", hash[:2], hash[2:4], hash)
|
||||
@@ -160,10 +180,9 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error {
|
||||
continue
|
||||
}
|
||||
|
||||
deletedCount++
|
||||
deletedSize += allBlobs[hash]
|
||||
result.BlobsDeleted++
|
||||
result.BytesFreed += allBlobs[hash]
|
||||
|
||||
// Progress update every 100 blobs
|
||||
if (i+1)%100 == 0 || i == len(unreferencedBlobs)-1 {
|
||||
log.Info("Deletion progress",
|
||||
"deleted", i+1,
|
||||
@@ -173,26 +192,13 @@ func (v *Vaultik) PruneBlobs(opts *PruneOptions) error {
|
||||
}
|
||||
}
|
||||
|
||||
result.BlobsDeleted = deletedCount
|
||||
result.BlobsFailed = len(unreferencedBlobs) - deletedCount
|
||||
result.BytesFreed = deletedSize
|
||||
result.BlobsFailed = len(unreferencedBlobs) - result.BlobsDeleted
|
||||
|
||||
log.Info("Prune complete",
|
||||
"deleted_count", deletedCount,
|
||||
"deleted_size", humanize.Bytes(uint64(deletedSize)),
|
||||
"failed", len(unreferencedBlobs)-deletedCount,
|
||||
"deleted_count", result.BlobsDeleted,
|
||||
"deleted_size", humanize.Bytes(uint64(result.BytesFreed)),
|
||||
"failed", result.BlobsFailed,
|
||||
)
|
||||
|
||||
if opts.JSON {
|
||||
return v.outputPruneBlobsJSON(result)
|
||||
}
|
||||
|
||||
v.printfStdout("\nDeleted %d blob(s) totaling %s\n", deletedCount, humanize.Bytes(uint64(deletedSize)))
|
||||
if deletedCount < len(unreferencedBlobs) {
|
||||
v.printfStdout("Failed to delete %d blob(s)\n", len(unreferencedBlobs)-deletedCount)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// outputPruneBlobsJSON outputs the prune result as JSON
|
||||
|
||||
256
internal/vaultik/purge_per_name_test.go
Normal file
256
internal/vaultik/purge_per_name_test.go
Normal file
@@ -0,0 +1,256 @@
|
||||
package vaultik_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/database"
|
||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||
"git.eeqj.de/sneak/vaultik/internal/types"
|
||||
"git.eeqj.de/sneak/vaultik/internal/vaultik"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// setupPurgeTest creates a Vaultik instance with an in-memory database and mock
|
||||
// storage pre-populated with the given snapshot IDs. Each snapshot is marked as
|
||||
// completed. Remote metadata stubs are created so syncWithRemote keeps them.
|
||||
func setupPurgeTest(t *testing.T, snapshotIDs []string) *vaultik.Vaultik {
|
||||
t.Helper()
|
||||
log.Initialize(log.Config{})
|
||||
|
||||
ctx := context.Background()
|
||||
db, err := database.New(ctx, ":memory:")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = db.Close() })
|
||||
|
||||
repos := database.NewRepositories(db)
|
||||
mockStorage := NewMockStorer()
|
||||
|
||||
// Insert each snapshot into the DB and create remote metadata stubs.
|
||||
// Use timestamps parsed from snapshot IDs for realistic ordering.
|
||||
for _, id := range snapshotIDs {
|
||||
// Parse timestamp from the snapshot ID
|
||||
parts := strings.Split(id, "_")
|
||||
timestampStr := parts[len(parts)-1]
|
||||
startedAt, err := time.Parse(time.RFC3339, timestampStr)
|
||||
require.NoError(t, err, "parsing timestamp from snapshot ID %q", id)
|
||||
|
||||
completedAt := startedAt.Add(5 * time.Minute)
|
||||
snap := &database.Snapshot{
|
||||
ID: types.SnapshotID(id),
|
||||
Hostname: "testhost",
|
||||
VaultikVersion: "test",
|
||||
StartedAt: startedAt,
|
||||
CompletedAt: &completedAt,
|
||||
}
|
||||
err = repos.WithTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
|
||||
return repos.Snapshots.Create(ctx, tx, snap)
|
||||
})
|
||||
require.NoError(t, err, "creating snapshot %s", id)
|
||||
|
||||
// Create remote metadata stub so syncWithRemote keeps it
|
||||
metadataKey := "metadata/" + id + "/manifest.json.zst"
|
||||
err = mockStorage.Put(ctx, metadataKey, strings.NewReader("stub"))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
stdout := &bytes.Buffer{}
|
||||
stderr := &bytes.Buffer{}
|
||||
stdin := &bytes.Buffer{}
|
||||
|
||||
v := &vaultik.Vaultik{
|
||||
Storage: mockStorage,
|
||||
Repositories: repos,
|
||||
DB: db,
|
||||
Stdout: stdout,
|
||||
Stderr: stderr,
|
||||
Stdin: stdin,
|
||||
}
|
||||
v.SetContext(ctx)
|
||||
|
||||
return v
|
||||
}
|
||||
|
||||
// listRemainingSnapshots returns IDs of all completed snapshots in the database.
|
||||
func listRemainingSnapshots(t *testing.T, v *vaultik.Vaultik) []string {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
dbSnaps, err := v.Repositories.Snapshots.ListRecent(ctx, 10000)
|
||||
require.NoError(t, err)
|
||||
|
||||
var ids []string
|
||||
for _, s := range dbSnaps {
|
||||
if s.CompletedAt != nil {
|
||||
ids = append(ids, s.ID.String())
|
||||
}
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
func TestPurgeKeepLatest_PerName(t *testing.T) {
|
||||
// Create snapshots for two different names: "home" and "system".
|
||||
// With per-name --keep-latest, the latest of each should be kept.
|
||||
snapshotIDs := []string{
|
||||
"testhost_system_2026-01-01T00:00:00Z",
|
||||
"testhost_home_2026-01-01T01:00:00Z",
|
||||
"testhost_system_2026-01-01T02:00:00Z",
|
||||
"testhost_home_2026-01-01T03:00:00Z",
|
||||
"testhost_system_2026-01-01T04:00:00Z",
|
||||
}
|
||||
|
||||
v := setupPurgeTest(t, snapshotIDs)
|
||||
|
||||
err := v.PurgeSnapshotsWithOptions(&vaultik.SnapshotPurgeOptions{
|
||||
KeepLatest: true,
|
||||
Force: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
remaining := listRemainingSnapshots(t, v)
|
||||
|
||||
// Should keep the latest of each name
|
||||
assert.Len(t, remaining, 2, "should keep exactly 2 snapshots (one per name)")
|
||||
assert.Contains(t, remaining, "testhost_system_2026-01-01T04:00:00Z", "should keep latest system")
|
||||
assert.Contains(t, remaining, "testhost_home_2026-01-01T03:00:00Z", "should keep latest home")
|
||||
}
|
||||
|
||||
func TestPurgeKeepLatest_SingleName(t *testing.T) {
|
||||
// All snapshots have the same name — keep-latest should keep exactly one.
|
||||
snapshotIDs := []string{
|
||||
"testhost_home_2026-01-01T00:00:00Z",
|
||||
"testhost_home_2026-01-01T01:00:00Z",
|
||||
"testhost_home_2026-01-01T02:00:00Z",
|
||||
}
|
||||
|
||||
v := setupPurgeTest(t, snapshotIDs)
|
||||
|
||||
err := v.PurgeSnapshotsWithOptions(&vaultik.SnapshotPurgeOptions{
|
||||
KeepLatest: true,
|
||||
Force: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
remaining := listRemainingSnapshots(t, v)
|
||||
assert.Len(t, remaining, 1)
|
||||
assert.Contains(t, remaining, "testhost_home_2026-01-01T02:00:00Z", "should keep the newest")
|
||||
}
|
||||
|
||||
func TestPurgeKeepLatest_WithNameFilter(t *testing.T) {
|
||||
// Use --name to filter purge to only "home" snapshots.
|
||||
// "system" snapshots should be untouched.
|
||||
snapshotIDs := []string{
|
||||
"testhost_system_2026-01-01T00:00:00Z",
|
||||
"testhost_home_2026-01-01T01:00:00Z",
|
||||
"testhost_system_2026-01-01T02:00:00Z",
|
||||
"testhost_home_2026-01-01T03:00:00Z",
|
||||
"testhost_home_2026-01-01T04:00:00Z",
|
||||
}
|
||||
|
||||
v := setupPurgeTest(t, snapshotIDs)
|
||||
|
||||
err := v.PurgeSnapshotsWithOptions(&vaultik.SnapshotPurgeOptions{
|
||||
KeepLatest: true,
|
||||
Force: true,
|
||||
Names: []string{"home"},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
remaining := listRemainingSnapshots(t, v)
|
||||
|
||||
// 2 system snapshots untouched + 1 latest home = 3
|
||||
assert.Len(t, remaining, 3)
|
||||
assert.Contains(t, remaining, "testhost_system_2026-01-01T00:00:00Z")
|
||||
assert.Contains(t, remaining, "testhost_system_2026-01-01T02:00:00Z")
|
||||
assert.Contains(t, remaining, "testhost_home_2026-01-01T04:00:00Z")
|
||||
}
|
||||
|
||||
func TestPurgeKeepLatest_NoSnapshots(t *testing.T) {
|
||||
v := setupPurgeTest(t, nil)
|
||||
|
||||
err := v.PurgeSnapshotsWithOptions(&vaultik.SnapshotPurgeOptions{
|
||||
KeepLatest: true,
|
||||
Force: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestPurgeKeepLatest_NameFilterNoMatch(t *testing.T) {
|
||||
snapshotIDs := []string{
|
||||
"testhost_system_2026-01-01T00:00:00Z",
|
||||
"testhost_system_2026-01-01T01:00:00Z",
|
||||
}
|
||||
|
||||
v := setupPurgeTest(t, snapshotIDs)
|
||||
|
||||
err := v.PurgeSnapshotsWithOptions(&vaultik.SnapshotPurgeOptions{
|
||||
KeepLatest: true,
|
||||
Force: true,
|
||||
Names: []string{"nonexistent"},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// All snapshots should remain — the name filter matched nothing
|
||||
remaining := listRemainingSnapshots(t, v)
|
||||
assert.Len(t, remaining, 2)
|
||||
}
|
||||
|
||||
func TestPurgeOlderThan_WithNameFilter(t *testing.T) {
|
||||
// Snapshots with different names and timestamps.
|
||||
// --older-than should apply only to the named subset when --name is used.
|
||||
snapshotIDs := []string{
|
||||
"testhost_system_2020-01-01T00:00:00Z",
|
||||
"testhost_home_2020-01-01T00:00:00Z",
|
||||
"testhost_system_2026-01-01T00:00:00Z",
|
||||
"testhost_home_2026-01-01T00:00:00Z",
|
||||
}
|
||||
|
||||
v := setupPurgeTest(t, snapshotIDs)
|
||||
|
||||
// Purge only "home" snapshots older than 365 days
|
||||
err := v.PurgeSnapshotsWithOptions(&vaultik.SnapshotPurgeOptions{
|
||||
OlderThan: "365d",
|
||||
Force: true,
|
||||
Names: []string{"home"},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
remaining := listRemainingSnapshots(t, v)
|
||||
|
||||
// Old system stays (not filtered by name), old home deleted, recent ones stay
|
||||
assert.Len(t, remaining, 3)
|
||||
assert.Contains(t, remaining, "testhost_system_2020-01-01T00:00:00Z")
|
||||
assert.Contains(t, remaining, "testhost_system_2026-01-01T00:00:00Z")
|
||||
assert.Contains(t, remaining, "testhost_home_2026-01-01T00:00:00Z")
|
||||
}
|
||||
|
||||
func TestPurgeKeepLatest_ThreeNames(t *testing.T) {
|
||||
// Three different snapshot names with multiple snapshots each.
|
||||
snapshotIDs := []string{
|
||||
"testhost_home_2026-01-01T00:00:00Z",
|
||||
"testhost_system_2026-01-01T01:00:00Z",
|
||||
"testhost_media_2026-01-01T02:00:00Z",
|
||||
"testhost_home_2026-01-01T03:00:00Z",
|
||||
"testhost_system_2026-01-01T04:00:00Z",
|
||||
"testhost_media_2026-01-01T05:00:00Z",
|
||||
"testhost_home_2026-01-01T06:00:00Z",
|
||||
}
|
||||
|
||||
v := setupPurgeTest(t, snapshotIDs)
|
||||
|
||||
err := v.PurgeSnapshotsWithOptions(&vaultik.SnapshotPurgeOptions{
|
||||
KeepLatest: true,
|
||||
Force: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
remaining := listRemainingSnapshots(t, v)
|
||||
assert.Len(t, remaining, 3, "should keep one per name")
|
||||
assert.Contains(t, remaining, "testhost_home_2026-01-01T06:00:00Z")
|
||||
assert.Contains(t, remaining, "testhost_system_2026-01-01T04:00:00Z")
|
||||
assert.Contains(t, remaining, "testhost_media_2026-01-01T05:00:00Z")
|
||||
}
|
||||
@@ -55,15 +55,9 @@ type RestoreResult struct {
|
||||
func (v *Vaultik) Restore(opts *RestoreOptions) error {
|
||||
startTime := time.Now()
|
||||
|
||||
// Check for age_secret_key
|
||||
if v.Config.AgeSecretKey == "" {
|
||||
return fmt.Errorf("decryption key required for restore\n\nSet the VAULTIK_AGE_SECRET_KEY environment variable to your age private key:\n export VAULTIK_AGE_SECRET_KEY='AGE-SECRET-KEY-...'")
|
||||
}
|
||||
|
||||
// Parse the age identity
|
||||
identity, err := age.ParseX25519Identity(v.Config.AgeSecretKey)
|
||||
identity, err := v.prepareRestoreIdentity()
|
||||
if err != nil {
|
||||
return fmt.Errorf("parsing age secret key: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("Starting restore operation",
|
||||
@@ -115,10 +109,73 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
|
||||
}
|
||||
|
||||
// Step 5: Restore files
|
||||
result, err := v.restoreAllFiles(files, repos, opts, identity, chunkToBlobMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
result.Duration = time.Since(startTime)
|
||||
|
||||
log.Info("Restore complete",
|
||||
"files_restored", result.FilesRestored,
|
||||
"bytes_restored", humanize.Bytes(uint64(result.BytesRestored)),
|
||||
"blobs_downloaded", result.BlobsDownloaded,
|
||||
"bytes_downloaded", humanize.Bytes(uint64(result.BytesDownloaded)),
|
||||
"duration", result.Duration,
|
||||
)
|
||||
|
||||
v.printfStdout("Restored %d files (%s) in %s\n",
|
||||
result.FilesRestored,
|
||||
humanize.Bytes(uint64(result.BytesRestored)),
|
||||
result.Duration.Round(time.Second),
|
||||
)
|
||||
|
||||
if result.FilesFailed > 0 {
|
||||
_, _ = fmt.Fprintf(v.Stdout, "\nWARNING: %d file(s) failed to restore:\n", result.FilesFailed)
|
||||
for _, path := range result.FailedFiles {
|
||||
_, _ = fmt.Fprintf(v.Stdout, " - %s\n", path)
|
||||
}
|
||||
}
|
||||
|
||||
// Run verification if requested
|
||||
if opts.Verify {
|
||||
if err := v.handleRestoreVerification(repos, files, opts, result); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if result.FilesFailed > 0 {
|
||||
return fmt.Errorf("%d file(s) failed to restore", result.FilesFailed)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// prepareRestoreIdentity validates that an age secret key is configured and parses it
|
||||
func (v *Vaultik) prepareRestoreIdentity() (age.Identity, error) {
|
||||
if v.Config.AgeSecretKey == "" {
|
||||
return nil, fmt.Errorf("decryption key required for restore\n\nSet the VAULTIK_AGE_SECRET_KEY environment variable to your age private key:\n export VAULTIK_AGE_SECRET_KEY='AGE-SECRET-KEY-...'")
|
||||
}
|
||||
|
||||
identity, err := age.ParseX25519Identity(v.Config.AgeSecretKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parsing age secret key: %w", err)
|
||||
}
|
||||
return identity, nil
|
||||
}
|
||||
|
||||
// restoreAllFiles iterates over files and restores each one, tracking progress and failures
|
||||
func (v *Vaultik) restoreAllFiles(
|
||||
files []*database.File,
|
||||
repos *database.Repositories,
|
||||
opts *RestoreOptions,
|
||||
identity age.Identity,
|
||||
chunkToBlobMap map[string]*database.BlobChunk,
|
||||
) (*RestoreResult, error) {
|
||||
result := &RestoreResult{}
|
||||
blobCache, err := newBlobDiskCache(4 * v.Config.BlobSizeLimit.Int64())
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating blob cache: %w", err)
|
||||
return nil, fmt.Errorf("creating blob cache: %w", err)
|
||||
}
|
||||
defer func() { _ = blobCache.Close() }()
|
||||
|
||||
@@ -133,7 +190,7 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
|
||||
|
||||
for i, file := range files {
|
||||
if v.ctx.Err() != nil {
|
||||
return v.ctx.Err()
|
||||
return nil, v.ctx.Err()
|
||||
}
|
||||
|
||||
if err := v.restoreFile(v.ctx, repos, file, opts.TargetDir, identity, chunkToBlobMap, blobCache, result); err != nil {
|
||||
@@ -165,31 +222,16 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
|
||||
_ = bar.Finish()
|
||||
}
|
||||
|
||||
result.Duration = time.Since(startTime)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
log.Info("Restore complete",
|
||||
"files_restored", result.FilesRestored,
|
||||
"bytes_restored", humanize.Bytes(uint64(result.BytesRestored)),
|
||||
"blobs_downloaded", result.BlobsDownloaded,
|
||||
"bytes_downloaded", humanize.Bytes(uint64(result.BytesDownloaded)),
|
||||
"duration", result.Duration,
|
||||
)
|
||||
|
||||
v.printfStdout("Restored %d files (%s) in %s\n",
|
||||
result.FilesRestored,
|
||||
humanize.Bytes(uint64(result.BytesRestored)),
|
||||
result.Duration.Round(time.Second),
|
||||
)
|
||||
|
||||
if result.FilesFailed > 0 {
|
||||
_, _ = fmt.Fprintf(v.Stdout, "\nWARNING: %d file(s) failed to restore:\n", result.FilesFailed)
|
||||
for _, path := range result.FailedFiles {
|
||||
_, _ = fmt.Fprintf(v.Stdout, " - %s\n", path)
|
||||
}
|
||||
}
|
||||
|
||||
// Run verification if requested
|
||||
if opts.Verify {
|
||||
// handleRestoreVerification runs post-restore verification if requested
|
||||
func (v *Vaultik) handleRestoreVerification(
|
||||
repos *database.Repositories,
|
||||
files []*database.File,
|
||||
opts *RestoreOptions,
|
||||
result *RestoreResult,
|
||||
) error {
|
||||
if err := v.verifyRestoredFiles(v.ctx, repos, files, opts.TargetDir, result); err != nil {
|
||||
return fmt.Errorf("verification failed: %w", err)
|
||||
}
|
||||
@@ -206,12 +248,6 @@ func (v *Vaultik) Restore(opts *RestoreOptions) error {
|
||||
result.FilesVerified,
|
||||
humanize.Bytes(uint64(result.BytesVerified)),
|
||||
)
|
||||
}
|
||||
|
||||
if result.FilesFailed > 0 {
|
||||
return fmt.Errorf("%d file(s) failed to restore", result.FilesFailed)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -522,11 +558,23 @@ func (v *Vaultik) restoreRegularFile(
|
||||
|
||||
// downloadBlob downloads and decrypts a blob
|
||||
func (v *Vaultik) downloadBlob(ctx context.Context, blobHash string, expectedSize int64, identity age.Identity) ([]byte, error) {
|
||||
result, err := v.FetchAndDecryptBlob(ctx, blobHash, expectedSize, identity)
|
||||
rc, err := v.FetchAndDecryptBlob(ctx, blobHash, expectedSize, identity)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result.Data, nil
|
||||
|
||||
data, err := io.ReadAll(rc)
|
||||
if err != nil {
|
||||
_ = rc.Close()
|
||||
return nil, fmt.Errorf("reading blob data: %w", err)
|
||||
}
|
||||
|
||||
// Close triggers hash verification
|
||||
if err := rc.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// verifyRestoredFiles verifies that all restored files match their expected chunk hashes
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
@@ -13,7 +14,7 @@ import (
|
||||
"git.eeqj.de/sneak/vaultik/internal/snapshot"
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/klauspost/compress/zstd"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
// VerifyOptions contains options for the verify command
|
||||
@@ -35,6 +36,19 @@ type VerifyResult struct {
|
||||
ErrorMessage string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// deepVerifyFailure records a failure in the result and returns it appropriately
|
||||
func (v *Vaultik) deepVerifyFailure(result *VerifyResult, opts *VerifyOptions, msg string, err error) error {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = msg
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("%s", msg)
|
||||
}
|
||||
|
||||
// RunDeepVerify executes deep verification operation
|
||||
func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
result := &VerifyResult{
|
||||
@@ -42,89 +56,19 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
Mode: "deep",
|
||||
}
|
||||
|
||||
// Check for decryption capability
|
||||
if !v.CanDecrypt() {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = "VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification"
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return fmt.Errorf("VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification")
|
||||
msg := "VAULTIK_AGE_SECRET_KEY not set; required for deep verification"
|
||||
return v.deepVerifyFailure(result, opts, msg, fmt.Errorf("%s", msg))
|
||||
}
|
||||
|
||||
log.Info("Starting snapshot verification",
|
||||
"snapshot_id", snapshotID,
|
||||
"mode", "deep",
|
||||
)
|
||||
|
||||
log.Info("Starting snapshot verification", "snapshot_id", snapshotID, "mode", "deep")
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Deep verification of snapshot: %s\n\n", snapshotID)
|
||||
}
|
||||
|
||||
// Step 1: Download manifest
|
||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||
log.Info("Downloading manifest", "path", manifestPath)
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Downloading manifest...\n")
|
||||
}
|
||||
|
||||
manifestReader, err := v.Storage.Get(v.ctx, manifestPath)
|
||||
manifest, tempDB, dbBlobs, err := v.loadVerificationData(snapshotID, opts, result)
|
||||
if err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = fmt.Sprintf("failed to download manifest: %v", err)
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return fmt.Errorf("failed to download manifest: %w", err)
|
||||
}
|
||||
defer func() { _ = manifestReader.Close() }()
|
||||
|
||||
// Decompress manifest
|
||||
manifest, err := snapshot.DecodeManifest(manifestReader)
|
||||
if err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = fmt.Sprintf("failed to decode manifest: %v", err)
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return fmt.Errorf("failed to decode manifest: %w", err)
|
||||
}
|
||||
|
||||
log.Info("Manifest loaded",
|
||||
"manifest_blob_count", manifest.BlobCount,
|
||||
"manifest_total_size", humanize.Bytes(uint64(manifest.TotalCompressedSize)),
|
||||
)
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Manifest loaded: %d blobs (%s)\n", manifest.BlobCount, humanize.Bytes(uint64(manifest.TotalCompressedSize)))
|
||||
}
|
||||
|
||||
// Step 2: Download and decrypt database (authoritative source)
|
||||
dbPath := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID)
|
||||
log.Info("Downloading encrypted database", "path", dbPath)
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Downloading and decrypting database...\n")
|
||||
}
|
||||
|
||||
dbReader, err := v.Storage.Get(v.ctx, dbPath)
|
||||
if err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = fmt.Sprintf("failed to download database: %v", err)
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return fmt.Errorf("failed to download database: %w", err)
|
||||
}
|
||||
defer func() { _ = dbReader.Close() }()
|
||||
|
||||
// Decrypt and decompress database
|
||||
tempDB, err := v.decryptAndLoadDatabase(dbReader, v.Config.AgeSecretKey)
|
||||
if err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = fmt.Sprintf("failed to decrypt database: %v", err)
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return fmt.Errorf("failed to decrypt database: %w", err)
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if tempDB != nil {
|
||||
@@ -132,17 +76,6 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
}
|
||||
}()
|
||||
|
||||
// Step 3: Get authoritative blob list from database
|
||||
dbBlobs, err := v.getBlobsFromDatabase(snapshotID, tempDB.DB)
|
||||
if err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = fmt.Sprintf("failed to get blobs from database: %v", err)
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return fmt.Errorf("failed to get blobs from database: %w", err)
|
||||
}
|
||||
|
||||
result.BlobCount = len(dbBlobs)
|
||||
var totalSize int64
|
||||
for _, blob := range dbBlobs {
|
||||
@@ -150,54 +83,10 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
}
|
||||
result.TotalSize = totalSize
|
||||
|
||||
log.Info("Database loaded",
|
||||
"db_blob_count", len(dbBlobs),
|
||||
"db_total_size", humanize.Bytes(uint64(totalSize)),
|
||||
)
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Database loaded: %d blobs (%s)\n", len(dbBlobs), humanize.Bytes(uint64(totalSize)))
|
||||
v.printfStdout("Verifying manifest against database...\n")
|
||||
}
|
||||
|
||||
// Step 4: Verify manifest matches database
|
||||
if err := v.verifyManifestAgainstDatabase(manifest, dbBlobs); err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = err.Error()
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
if err := v.runVerificationSteps(manifest, dbBlobs, tempDB, opts, result, totalSize); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Step 5: Verify all blobs exist in S3 (using database as source)
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Manifest verified.\n")
|
||||
v.printfStdout("Checking blob existence in remote storage...\n")
|
||||
}
|
||||
if err := v.verifyBlobExistenceFromDB(dbBlobs); err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = err.Error()
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Step 6: Deep verification - download and verify blob contents
|
||||
if !opts.JSON {
|
||||
v.printfStdout("All blobs exist.\n")
|
||||
v.printfStdout("Downloading and verifying blob contents (%d blobs, %s)...\n", len(dbBlobs), humanize.Bytes(uint64(totalSize)))
|
||||
}
|
||||
if err := v.performDeepVerificationFromDB(dbBlobs, tempDB.DB, opts); err != nil {
|
||||
result.Status = "failed"
|
||||
result.ErrorMessage = err.Error()
|
||||
if opts.JSON {
|
||||
return v.outputVerifyJSON(result)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Success
|
||||
result.Status = "ok"
|
||||
result.Verified = len(dbBlobs)
|
||||
|
||||
@@ -206,11 +95,7 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
}
|
||||
|
||||
log.Info("✓ Verification completed successfully",
|
||||
"snapshot_id", snapshotID,
|
||||
"mode", "deep",
|
||||
"blobs_verified", len(dbBlobs),
|
||||
)
|
||||
|
||||
"snapshot_id", snapshotID, "mode", "deep", "blobs_verified", len(dbBlobs))
|
||||
v.printfStdout("\n✓ Verification completed successfully\n")
|
||||
v.printfStdout(" Snapshot: %s\n", snapshotID)
|
||||
v.printfStdout(" Blobs verified: %d\n", len(dbBlobs))
|
||||
@@ -219,6 +104,106 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadVerificationData downloads manifest, database, and blob list for verification
|
||||
func (v *Vaultik) loadVerificationData(snapshotID string, opts *VerifyOptions, result *VerifyResult) (*snapshot.Manifest, *tempDB, []snapshot.BlobInfo, error) {
|
||||
// Download manifest
|
||||
manifestPath := fmt.Sprintf("metadata/%s/manifest.json.zst", snapshotID)
|
||||
log.Info("Downloading manifest", "path", manifestPath)
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Downloading manifest...\n")
|
||||
}
|
||||
manifestReader, err := v.Storage.Get(v.ctx, manifestPath)
|
||||
if err != nil {
|
||||
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
||||
fmt.Sprintf("failed to download manifest: %v", err),
|
||||
fmt.Errorf("failed to download manifest: %w", err))
|
||||
}
|
||||
defer func() { _ = manifestReader.Close() }()
|
||||
|
||||
manifest, err := snapshot.DecodeManifest(manifestReader)
|
||||
if err != nil {
|
||||
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
||||
fmt.Sprintf("failed to decode manifest: %v", err),
|
||||
fmt.Errorf("failed to decode manifest: %w", err))
|
||||
}
|
||||
|
||||
log.Info("Manifest loaded",
|
||||
"manifest_blob_count", manifest.BlobCount,
|
||||
"manifest_total_size", humanize.Bytes(uint64(manifest.TotalCompressedSize)))
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Manifest loaded: %d blobs (%s)\n", manifest.BlobCount, humanize.Bytes(uint64(manifest.TotalCompressedSize)))
|
||||
v.printfStdout("Downloading and decrypting database...\n")
|
||||
}
|
||||
|
||||
// Download and decrypt database
|
||||
dbPath := fmt.Sprintf("metadata/%s/db.zst.age", snapshotID)
|
||||
log.Info("Downloading encrypted database", "path", dbPath)
|
||||
dbReader, err := v.Storage.Get(v.ctx, dbPath)
|
||||
if err != nil {
|
||||
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
||||
fmt.Sprintf("failed to download database: %v", err),
|
||||
fmt.Errorf("failed to download database: %w", err))
|
||||
}
|
||||
defer func() { _ = dbReader.Close() }()
|
||||
|
||||
tdb, err := v.decryptAndLoadDatabase(dbReader, v.Config.AgeSecretKey)
|
||||
if err != nil {
|
||||
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
||||
fmt.Sprintf("failed to decrypt database: %v", err),
|
||||
fmt.Errorf("failed to decrypt database: %w", err))
|
||||
}
|
||||
|
||||
dbBlobs, err := v.getBlobsFromDatabase(snapshotID, tdb.DB)
|
||||
if err != nil {
|
||||
_ = tdb.Close()
|
||||
return nil, nil, nil, v.deepVerifyFailure(result, opts,
|
||||
fmt.Sprintf("failed to get blobs from database: %v", err),
|
||||
fmt.Errorf("failed to get blobs from database: %w", err))
|
||||
}
|
||||
|
||||
var dbTotalSize int64
|
||||
for _, b := range dbBlobs {
|
||||
dbTotalSize += b.CompressedSize
|
||||
}
|
||||
|
||||
log.Info("Database loaded",
|
||||
"db_blob_count", len(dbBlobs),
|
||||
"db_total_size", humanize.Bytes(uint64(dbTotalSize)))
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Database loaded: %d blobs (%s)\n", len(dbBlobs), humanize.Bytes(uint64(dbTotalSize)))
|
||||
}
|
||||
|
||||
return manifest, tdb, dbBlobs, nil
|
||||
}
|
||||
|
||||
// runVerificationSteps executes manifest verification, blob existence check, and deep content verification
|
||||
func (v *Vaultik) runVerificationSteps(manifest *snapshot.Manifest, dbBlobs []snapshot.BlobInfo, tdb *tempDB, opts *VerifyOptions, result *VerifyResult, totalSize int64) error {
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Verifying manifest against database...\n")
|
||||
}
|
||||
if err := v.verifyManifestAgainstDatabase(manifest, dbBlobs); err != nil {
|
||||
return v.deepVerifyFailure(result, opts, err.Error(), err)
|
||||
}
|
||||
|
||||
if !opts.JSON {
|
||||
v.printfStdout("Manifest verified.\n")
|
||||
v.printfStdout("Checking blob existence in remote storage...\n")
|
||||
}
|
||||
if err := v.verifyBlobExistenceFromDB(dbBlobs); err != nil {
|
||||
return v.deepVerifyFailure(result, opts, err.Error(), err)
|
||||
}
|
||||
|
||||
if !opts.JSON {
|
||||
v.printfStdout("All blobs exist.\n")
|
||||
v.printfStdout("Downloading and verifying blob contents (%d blobs, %s)...\n", len(dbBlobs), humanize.Bytes(uint64(totalSize)))
|
||||
}
|
||||
if err := v.performDeepVerificationFromDB(dbBlobs, tdb.DB, opts); err != nil {
|
||||
return v.deepVerifyFailure(result, opts, err.Error(), err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// tempDB wraps sql.DB with cleanup
|
||||
type tempDB struct {
|
||||
*sql.DB
|
||||
@@ -272,7 +257,7 @@ func (v *Vaultik) decryptAndLoadDatabase(reader io.ReadCloser, secretKey string)
|
||||
log.Info("Database decompressed", "size", humanize.Bytes(uint64(written)))
|
||||
|
||||
// Open the database
|
||||
db, err := sql.Open("sqlite3", tempPath)
|
||||
db, err := sql.Open("sqlite", tempPath)
|
||||
if err != nil {
|
||||
_ = os.Remove(tempPath)
|
||||
return nil, fmt.Errorf("failed to open database: %w", err)
|
||||
@@ -316,7 +301,27 @@ func (v *Vaultik) verifyBlob(blobInfo snapshot.BlobInfo, db *sql.DB) error {
|
||||
}
|
||||
defer decompressor.Close()
|
||||
|
||||
// Query blob chunks from database to get offsets and lengths
|
||||
chunkCount, err := v.verifyBlobChunks(db, blobInfo.Hash, decompressor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := v.verifyBlobFinalIntegrity(decompressor, blobHasher, blobInfo.Hash); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("Blob verified",
|
||||
"hash", blobInfo.Hash[:16]+"...",
|
||||
"chunks", chunkCount,
|
||||
"size", humanize.Bytes(uint64(blobInfo.CompressedSize)),
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyBlobChunks queries blob chunks from the database and verifies each chunk's hash
|
||||
// against the decompressed blob stream
|
||||
func (v *Vaultik) verifyBlobChunks(db *sql.DB, blobHash string, decompressor io.Reader) (int, error) {
|
||||
query := `
|
||||
SELECT bc.chunk_hash, bc.offset, bc.length
|
||||
FROM blob_chunks bc
|
||||
@@ -324,9 +329,9 @@ func (v *Vaultik) verifyBlob(blobInfo snapshot.BlobInfo, db *sql.DB) error {
|
||||
WHERE b.blob_hash = ?
|
||||
ORDER BY bc.offset
|
||||
`
|
||||
rows, err := db.QueryContext(v.ctx, query, blobInfo.Hash)
|
||||
rows, err := db.QueryContext(v.ctx, query, blobHash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query blob chunks: %w", err)
|
||||
return 0, fmt.Errorf("failed to query blob chunks: %w", err)
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
|
||||
@@ -339,12 +344,12 @@ func (v *Vaultik) verifyBlob(blobInfo snapshot.BlobInfo, db *sql.DB) error {
|
||||
var chunkHash string
|
||||
var offset, length int64
|
||||
if err := rows.Scan(&chunkHash, &offset, &length); err != nil {
|
||||
return fmt.Errorf("failed to scan chunk row: %w", err)
|
||||
return 0, fmt.Errorf("failed to scan chunk row: %w", err)
|
||||
}
|
||||
|
||||
// Verify chunk ordering
|
||||
if offset <= lastOffset {
|
||||
return fmt.Errorf("chunks out of order: offset %d after %d", offset, lastOffset)
|
||||
return 0, fmt.Errorf("chunks out of order: offset %d after %d", offset, lastOffset)
|
||||
}
|
||||
lastOffset = offset
|
||||
|
||||
@@ -353,7 +358,7 @@ func (v *Vaultik) verifyBlob(blobInfo snapshot.BlobInfo, db *sql.DB) error {
|
||||
// Skip to the correct offset
|
||||
skipBytes := offset - totalRead
|
||||
if _, err := io.CopyN(io.Discard, decompressor, skipBytes); err != nil {
|
||||
return fmt.Errorf("failed to skip to offset %d: %w", offset, err)
|
||||
return 0, fmt.Errorf("failed to skip to offset %d: %w", offset, err)
|
||||
}
|
||||
totalRead = offset
|
||||
}
|
||||
@@ -361,7 +366,7 @@ func (v *Vaultik) verifyBlob(blobInfo snapshot.BlobInfo, db *sql.DB) error {
|
||||
// Read chunk data
|
||||
chunkData := make([]byte, length)
|
||||
if _, err := io.ReadFull(decompressor, chunkData); err != nil {
|
||||
return fmt.Errorf("failed to read chunk at offset %d: %w", offset, err)
|
||||
return 0, fmt.Errorf("failed to read chunk at offset %d: %w", offset, err)
|
||||
}
|
||||
totalRead += length
|
||||
|
||||
@@ -371,7 +376,7 @@ func (v *Vaultik) verifyBlob(blobInfo snapshot.BlobInfo, db *sql.DB) error {
|
||||
calculatedHash := hex.EncodeToString(hasher.Sum(nil))
|
||||
|
||||
if calculatedHash != chunkHash {
|
||||
return fmt.Errorf("chunk hash mismatch at offset %d: calculated %s, expected %s",
|
||||
return 0, fmt.Errorf("chunk hash mismatch at offset %d: calculated %s, expected %s",
|
||||
offset, calculatedHash, chunkHash)
|
||||
}
|
||||
|
||||
@@ -379,9 +384,15 @@ func (v *Vaultik) verifyBlob(blobInfo snapshot.BlobInfo, db *sql.DB) error {
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return fmt.Errorf("error iterating blob chunks: %w", err)
|
||||
return 0, fmt.Errorf("error iterating blob chunks: %w", err)
|
||||
}
|
||||
|
||||
return chunkCount, nil
|
||||
}
|
||||
|
||||
// verifyBlobFinalIntegrity checks that no trailing data exists in the decompressed stream
|
||||
// and that the encrypted blob hash matches the expected value
|
||||
func (v *Vaultik) verifyBlobFinalIntegrity(decompressor io.Reader, blobHasher hash.Hash, expectedHash string) error {
|
||||
// Verify no remaining data in blob - if chunk list is accurate, blob should be fully consumed
|
||||
remaining, err := io.Copy(io.Discard, decompressor)
|
||||
if err != nil {
|
||||
@@ -393,17 +404,11 @@ func (v *Vaultik) verifyBlob(blobInfo snapshot.BlobInfo, db *sql.DB) error {
|
||||
|
||||
// Verify blob hash matches the encrypted data we downloaded
|
||||
calculatedBlobHash := hex.EncodeToString(blobHasher.Sum(nil))
|
||||
if calculatedBlobHash != blobInfo.Hash {
|
||||
if calculatedBlobHash != expectedHash {
|
||||
return fmt.Errorf("blob hash mismatch: calculated %s, expected %s",
|
||||
calculatedBlobHash, blobInfo.Hash)
|
||||
calculatedBlobHash, expectedHash)
|
||||
}
|
||||
|
||||
log.Info("Blob verified",
|
||||
"hash", blobInfo.Hash[:16]+"...",
|
||||
"chunks", chunkCount,
|
||||
"size", humanize.Bytes(uint64(blobInfo.CompressedSize)),
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -20,9 +20,6 @@ s3:
|
||||
region: us-east-1
|
||||
use_ssl: true
|
||||
part_size: 5242880 # 5MB
|
||||
backup_interval: 1h
|
||||
full_scan_interval: 24h
|
||||
min_time_between_run: 15m
|
||||
index_path: /tmp/vaultik-test.sqlite
|
||||
chunk_size: 10MB
|
||||
blob_size_limit: 10GB
|
||||
|
||||
@@ -17,9 +17,6 @@ s3:
|
||||
region: us-east-1
|
||||
use_ssl: false
|
||||
part_size: 5242880 # 5MB
|
||||
backup_interval: 1h
|
||||
full_scan_interval: 24h
|
||||
min_time_between_run: 15m
|
||||
index_path: /tmp/vaultik-integration-test.sqlite
|
||||
chunk_size: 10MB
|
||||
blob_size_limit: 10GB
|
||||
|
||||
Reference in New Issue
Block a user