Compare commits
20 Commits
feature/da
...
2e2b02a056
| Author | SHA1 | Date | |
|---|---|---|---|
| 2e2b02a056 | |||
| 0b95cb4308 | |||
| 4a3e61f8e1 | |||
| 6fbcac0cd8 | |||
| 34f73f72d8 | |||
| ee240faa32 | |||
| f719ab3adc | |||
| 1a8baf7491 | |||
| 7d5d3fa598 | |||
| ac5d2f4a0d | |||
| b250ddfa94 | |||
| fe3ad13a91 | |||
| ebd6619638 | |||
| 20d3a9ac8c | |||
| 0889cf2804 | |||
| f9ebb4bf25 | |||
| 9f2d722734 | |||
| 6821215b0e | |||
| f97a1dc2eb | |||
| 18c14d1507 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -1,5 +1,5 @@
|
||||
# Binary
|
||||
vaultik
|
||||
/vaultik
|
||||
|
||||
# Test artifacts
|
||||
*.out
|
||||
|
||||
55
.goreleaser.yaml
Normal file
55
.goreleaser.yaml
Normal file
@@ -0,0 +1,55 @@
|
||||
version: 2
|
||||
|
||||
project_name: vaultik
|
||||
|
||||
before:
|
||||
hooks:
|
||||
- go mod tidy
|
||||
|
||||
builds:
|
||||
- id: vaultik
|
||||
main: ./cmd/vaultik
|
||||
binary: vaultik
|
||||
env:
|
||||
- CGO_ENABLED=0
|
||||
goos:
|
||||
- linux
|
||||
- darwin
|
||||
goarch:
|
||||
- amd64
|
||||
- arm64
|
||||
ldflags:
|
||||
- -s -w
|
||||
- -X 'git.eeqj.de/sneak/vaultik/internal/globals.Version={{ .Version }}'
|
||||
- -X 'git.eeqj.de/sneak/vaultik/internal/globals.Commit={{ .Commit }}'
|
||||
|
||||
archives:
|
||||
- id: default
|
||||
name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}"
|
||||
formats:
|
||||
- tar.gz
|
||||
files:
|
||||
- LICENSE
|
||||
- README.md
|
||||
|
||||
checksum:
|
||||
name_template: "checksums.txt"
|
||||
algorithm: sha256
|
||||
|
||||
snapshot:
|
||||
version_template: "{{ incpatch .Version }}-next"
|
||||
|
||||
changelog:
|
||||
sort: asc
|
||||
use: git
|
||||
filters:
|
||||
exclude:
|
||||
- "^docs:"
|
||||
- "^test:"
|
||||
- "^chore:"
|
||||
- "Merge pull request"
|
||||
- "Merge branch"
|
||||
|
||||
release:
|
||||
draft: true
|
||||
prerelease: auto
|
||||
13
AGENTS.md
13
AGENTS.md
@@ -38,10 +38,9 @@ Version: 2025-06-08
|
||||
1. Before committing, tests must pass (`make test`), linting must pass
|
||||
(`make lint`), and code must be formatted (`make fmt`). For go, those
|
||||
makefile targets should use `go fmt` and `go test -v ./...` and
|
||||
`golangci-lint run`. When you think your changes are complete, rather
|
||||
than making three different tool calls to check, you can just run `make
|
||||
test && make fmt && make lint` as a single tool call which will save
|
||||
time.
|
||||
`golangci-lint run`. Each Makefile target does exactly one thing — to
|
||||
run lint + fmt-check + test together (the standard pre-commit gate),
|
||||
use `make check`.
|
||||
|
||||
2. Always write a `Makefile` with the default target being `test`, and with
|
||||
a `fmt` target that formats the code. The `test` target should run all
|
||||
@@ -103,3 +102,9 @@ Version: 2025-06-08
|
||||
build files are acceptable in the root, but source code and other files
|
||||
should be organized in appropriate subdirectories.
|
||||
|
||||
13. Pre-1.0: NEVER write database migrations. There are no live databases
|
||||
anywhere — every user's local index can be rebuilt from a fresh full
|
||||
backup. When the schema changes, just change `schema.sql` (and any code
|
||||
that touches the affected tables). The local index is disposable until
|
||||
1.0 ships and is tagged.
|
||||
|
||||
|
||||
@@ -53,8 +53,8 @@ The database tracks five primary entities and their relationships:
|
||||
### Entity Descriptions
|
||||
|
||||
#### File (`database.File`)
|
||||
Represents a file or directory in the backup system. Stores metadata needed for restoration:
|
||||
- Path, mtime
|
||||
Represents a file, directory, or symlink in the backup system. Stores metadata needed for restoration:
|
||||
- Path, source_path (for restore path stripping), mtime
|
||||
- Size, mode, ownership (uid, gid)
|
||||
- Symlink target (if applicable)
|
||||
|
||||
@@ -95,7 +95,7 @@ Maps chunks to their position within blobs:
|
||||
|
||||
#### Snapshot (`database.Snapshot`)
|
||||
Represents a point-in-time backup:
|
||||
- `ID`: Format is `{hostname}-{YYYYMMDD}-{HHMMSS}Z`
|
||||
- `ID`: Format is `{hostname}_{snapshot-name}_{RFC3339}` (e.g. `server1_home_2025-06-01T12:00:00Z`)
|
||||
- Tracks file count, chunk count, blob count, sizes, compression ratio
|
||||
- `CompletedAt`: Null until snapshot finishes successfully
|
||||
|
||||
@@ -127,7 +127,7 @@ fx.New(
|
||||
config.Module, // 5. Config
|
||||
database.Module, // 6. Database + Repositories
|
||||
log.Module, // 7. Logger initialization
|
||||
s3.Module, // 8. S3 client
|
||||
storage.Module, // 8. Storage backend (S3/file/rclone)
|
||||
snapshot.Module, // 9. SnapshotManager + ScannerFactory
|
||||
fx.Provide(vaultik.New), // 10. Vaultik orchestrator
|
||||
)
|
||||
@@ -161,7 +161,7 @@ type Vaultik struct {
|
||||
Config *config.Config
|
||||
DB *database.DB
|
||||
Repositories *database.Repositories
|
||||
S3Client *s3.Client
|
||||
Storage storage.Storer
|
||||
ScannerFactory snapshot.ScannerFactory
|
||||
SnapshotManager *snapshot.SnapshotManager
|
||||
Shutdowner fx.Shutdowner
|
||||
@@ -341,12 +341,11 @@ CreateSnapshot(opts)
|
||||
└─► SnapshotManager.ExportSnapshotMetadata()
|
||||
│
|
||||
├─► Copy database to temp file
|
||||
├─► Clean to only current snapshot data
|
||||
├─► Dump to SQL
|
||||
├─► Compress with zstd
|
||||
├─► Clean to only current snapshot data (VACUUM)
|
||||
├─► Compress binary SQLite with zstd
|
||||
├─► Encrypt with age
|
||||
├─► Upload db.zst.age to S3
|
||||
└─► Upload manifest.json.zst to S3
|
||||
├─► Upload db.zst.age to storage
|
||||
└─► Upload manifest.json.zst to storage
|
||||
```
|
||||
|
||||
## Deduplication Strategy
|
||||
@@ -368,8 +367,8 @@ bucket/
|
||||
│
|
||||
└── metadata/
|
||||
└── {snapshot-id}/
|
||||
├── db.zst.age # Encrypted database dump
|
||||
└── manifest.json.zst # Blob list (for verification)
|
||||
├── db.zst.age # Encrypted binary SQLite database
|
||||
└── manifest.json.zst # Blob list (for pruning/verification)
|
||||
```
|
||||
|
||||
## Thread Safety
|
||||
|
||||
41
Makefile
41
Makefile
@@ -1,7 +1,7 @@
|
||||
.PHONY: test fmt lint fmt-check check build clean all docker hooks
|
||||
.PHONY: all check test lint fmt fmt-check build clean deps test-coverage test-integration local install release release-snapshot docker hooks
|
||||
|
||||
# Version number
|
||||
VERSION := 0.0.1
|
||||
VERSION := 1.0.0-rc.1
|
||||
|
||||
# Build variables
|
||||
GIT_REVISION := $(shell git rev-parse HEAD 2>/dev/null || echo "unknown")
|
||||
@@ -13,37 +13,45 @@ LDFLAGS := -X 'git.eeqj.de/sneak/vaultik/internal/globals.Version=$(VERSION)' \
|
||||
# Default target
|
||||
all: vaultik
|
||||
|
||||
# Run tests
|
||||
# Combined pre-commit/CI gate: lint, format check, then tests.
|
||||
check: lint fmt-check test
|
||||
|
||||
# Run tests only.
|
||||
test:
|
||||
go test -race -timeout 30s ./...
|
||||
|
||||
# Check if code is formatted (read-only)
|
||||
# Check if code is formatted (read-only).
|
||||
fmt-check:
|
||||
@test -z "$$(gofmt -l .)" || (echo "Files not formatted:" && gofmt -l . && exit 1)
|
||||
|
||||
# Format code
|
||||
# Format code.
|
||||
fmt:
|
||||
go fmt ./...
|
||||
|
||||
# Run linter
|
||||
# Run linter only.
|
||||
lint:
|
||||
golangci-lint run ./...
|
||||
|
||||
# Build binary
|
||||
# Build binary.
|
||||
vaultik: internal/*/*.go cmd/vaultik/*.go
|
||||
go build -ldflags "$(LDFLAGS)" -o $@ ./cmd/vaultik
|
||||
|
||||
# Clean build artifacts
|
||||
# Clean build artifacts.
|
||||
clean:
|
||||
rm -f vaultik
|
||||
go clean
|
||||
|
||||
# Run tests with coverage
|
||||
# Install dependencies.
|
||||
deps:
|
||||
go mod download
|
||||
go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
|
||||
|
||||
# Run tests with coverage.
|
||||
test-coverage:
|
||||
go test -v -coverprofile=coverage.out ./...
|
||||
go tool cover -html=coverage.out -o coverage.html
|
||||
|
||||
# Run integration tests
|
||||
# Run integration tests.
|
||||
test-integration:
|
||||
go test -v -tags=integration ./...
|
||||
|
||||
@@ -54,14 +62,19 @@ local:
|
||||
install: vaultik
|
||||
cp ./vaultik $(HOME)/bin/
|
||||
|
||||
# Run all checks (formatting, linting, tests) without modifying files
|
||||
check: fmt-check lint test
|
||||
# Build and publish release artifacts (linux/darwin × amd64/arm64) via goreleaser.
|
||||
release:
|
||||
goreleaser release --clean
|
||||
|
||||
# Build Docker image
|
||||
# Dry-run a release build without publishing or tagging.
|
||||
release-snapshot:
|
||||
goreleaser release --clean --snapshot
|
||||
|
||||
# Build Docker image.
|
||||
docker:
|
||||
docker build -t vaultik .
|
||||
|
||||
# Install pre-commit hook
|
||||
# Install pre-commit hook.
|
||||
hooks:
|
||||
@printf '#!/bin/sh\nset -e\n' > .git/hooks/pre-commit
|
||||
@printf 'go mod tidy\ngo fmt ./...\ngit diff --exit-code -- go.mod go.sum || { echo "go mod tidy changed files; please stage and retry"; exit 1; }\n' >> .git/hooks/pre-commit
|
||||
|
||||
556
PROCESS.md
556
PROCESS.md
@@ -1,556 +0,0 @@
|
||||
# Vaultik Snapshot Creation Process
|
||||
|
||||
This document describes the lifecycle of objects during snapshot creation, with a focus on database transactions and foreign key constraints.
|
||||
|
||||
## Database Schema Overview
|
||||
|
||||
### Tables and Foreign Key Dependencies
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────────┐
|
||||
│ FOREIGN KEY GRAPH │
|
||||
│ │
|
||||
│ snapshots ◄────── snapshot_files ────────► files │
|
||||
│ │ │ │
|
||||
│ └───────── snapshot_blobs ────────► blobs │ │
|
||||
│ │ │ │
|
||||
│ │ ├──► file_chunks ◄── chunks│
|
||||
│ │ │ ▲ │
|
||||
│ │ └──► chunk_files ────┘ │
|
||||
│ │ │
|
||||
│ └──► blob_chunks ─────────────┘│
|
||||
│ │
|
||||
│ uploads ───────► blobs.blob_hash │
|
||||
│ └──────────► snapshots.id │
|
||||
└─────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Critical Constraint: `chunks` Must Exist First
|
||||
|
||||
These tables reference `chunks.chunk_hash` **without CASCADE**:
|
||||
- `file_chunks.chunk_hash` → `chunks.chunk_hash`
|
||||
- `chunk_files.chunk_hash` → `chunks.chunk_hash`
|
||||
- `blob_chunks.chunk_hash` → `chunks.chunk_hash`
|
||||
|
||||
**Implication**: A chunk record MUST be committed to the database BEFORE any of these referencing records can be created.
|
||||
|
||||
### Order of Operations Required by Schema
|
||||
|
||||
```
|
||||
1. snapshots (created first, before scan)
|
||||
2. blobs (created when packer starts new blob)
|
||||
3. chunks (created during file processing)
|
||||
4. blob_chunks (created immediately after chunk added to packer)
|
||||
5. files (created after file fully chunked)
|
||||
6. file_chunks (created with file record)
|
||||
7. chunk_files (created with file record)
|
||||
8. snapshot_files (created with file record)
|
||||
9. snapshot_blobs (created after blob uploaded)
|
||||
10. uploads (created after blob uploaded)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Snapshot Creation Phases
|
||||
|
||||
### Phase 0: Initialization
|
||||
|
||||
**Actions:**
|
||||
1. Snapshot record created in database (Transaction T0)
|
||||
2. Known files loaded into memory from `files` table
|
||||
3. Known chunks loaded into memory from `chunks` table
|
||||
|
||||
**Transactions:**
|
||||
```
|
||||
T0: INSERT INTO snapshots (id, hostname, ...) VALUES (...)
|
||||
COMMIT
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Phase 1: Scan Directory
|
||||
|
||||
**Actions:**
|
||||
1. Walk filesystem directory tree
|
||||
2. For each file, compare against in-memory `knownFiles` map
|
||||
3. Classify files as: unchanged, new, or modified
|
||||
4. Collect unchanged file IDs for later association
|
||||
5. Collect new/modified files for processing
|
||||
|
||||
**Transactions:**
|
||||
```
|
||||
(None during scan - all in-memory)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Phase 1b: Associate Unchanged Files
|
||||
|
||||
**Actions:**
|
||||
1. For unchanged files, add entries to `snapshot_files` table
|
||||
2. Done in batches of 1000
|
||||
|
||||
**Transactions:**
|
||||
```
|
||||
For each batch of 1000 file IDs:
|
||||
T: BEGIN
|
||||
INSERT INTO snapshot_files (snapshot_id, file_id) VALUES (?, ?)
|
||||
... (up to 1000 inserts)
|
||||
COMMIT
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Phase 2: Process Files
|
||||
|
||||
For each file that needs processing:
|
||||
|
||||
#### Step 2a: Open and Chunk File
|
||||
|
||||
**Location:** `processFileStreaming()`
|
||||
|
||||
For each chunk produced by content-defined chunking:
|
||||
|
||||
##### Step 2a-1: Check Chunk Existence
|
||||
```go
|
||||
chunkExists := s.chunkExists(chunk.Hash) // In-memory lookup
|
||||
```
|
||||
|
||||
##### Step 2a-2: Create Chunk Record (if new)
|
||||
```go
|
||||
// TRANSACTION: Create chunk in database
|
||||
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
dbChunk := &database.Chunk{ChunkHash: chunk.Hash, Size: chunk.Size}
|
||||
return s.repos.Chunks.Create(txCtx, tx, dbChunk)
|
||||
})
|
||||
// COMMIT immediately after WithTx returns
|
||||
|
||||
// Update in-memory cache
|
||||
s.addKnownChunk(chunk.Hash)
|
||||
```
|
||||
|
||||
**Transaction:**
|
||||
```
|
||||
T_chunk: BEGIN
|
||||
INSERT INTO chunks (chunk_hash, size) VALUES (?, ?)
|
||||
COMMIT
|
||||
```
|
||||
|
||||
##### Step 2a-3: Add Chunk to Packer
|
||||
|
||||
```go
|
||||
s.packer.AddChunk(&blob.ChunkRef{Hash: chunk.Hash, Data: chunk.Data})
|
||||
```
|
||||
|
||||
**Inside packer.AddChunk → addChunkToCurrentBlob():**
|
||||
|
||||
```go
|
||||
// TRANSACTION: Create blob_chunks record IMMEDIATELY
|
||||
if p.repos != nil {
|
||||
blobChunk := &database.BlobChunk{
|
||||
BlobID: p.currentBlob.id,
|
||||
ChunkHash: chunk.Hash,
|
||||
Offset: offset,
|
||||
Length: chunkSize,
|
||||
}
|
||||
err := p.repos.WithTx(context.Background(), func(ctx context.Context, tx *sql.Tx) error {
|
||||
return p.repos.BlobChunks.Create(ctx, tx, blobChunk)
|
||||
})
|
||||
// COMMIT immediately
|
||||
}
|
||||
```
|
||||
|
||||
**Transaction:**
|
||||
```
|
||||
T_blob_chunk: BEGIN
|
||||
INSERT INTO blob_chunks (blob_id, chunk_hash, offset, length) VALUES (?, ?, ?, ?)
|
||||
COMMIT
|
||||
```
|
||||
|
||||
**⚠️ CRITICAL DEPENDENCY**: This transaction requires `chunks.chunk_hash` to exist (FK constraint).
|
||||
The chunk MUST be committed in Step 2a-2 BEFORE this can succeed.
|
||||
|
||||
---
|
||||
|
||||
#### Step 2b: Blob Size Limit Handling
|
||||
|
||||
If adding a chunk would exceed blob size limit:
|
||||
|
||||
```go
|
||||
if err == blob.ErrBlobSizeLimitExceeded {
|
||||
if err := s.packer.FinalizeBlob(); err != nil { ... }
|
||||
// Retry adding the chunk
|
||||
if err := s.packer.AddChunk(...); err != nil { ... }
|
||||
}
|
||||
```
|
||||
|
||||
**FinalizeBlob() transactions:**
|
||||
```
|
||||
T_blob_finish: BEGIN
|
||||
UPDATE blobs SET blob_hash=?, uncompressed_size=?, compressed_size=?, finished_ts=? WHERE id=?
|
||||
COMMIT
|
||||
```
|
||||
|
||||
Then blob handler is called (handleBlobReady):
|
||||
```
|
||||
(Upload to S3 - no transaction)
|
||||
|
||||
T_blob_uploaded: BEGIN
|
||||
UPDATE blobs SET uploaded_ts=? WHERE id=?
|
||||
INSERT INTO snapshot_blobs (snapshot_id, blob_id, blob_hash) VALUES (?, ?, ?)
|
||||
INSERT INTO uploads (blob_hash, snapshot_id, uploaded_at, size, duration_ms) VALUES (?, ?, ?, ?, ?)
|
||||
COMMIT
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
#### Step 2c: Queue File for Batch Insertion
|
||||
|
||||
After all chunks for a file are processed:
|
||||
|
||||
```go
|
||||
// Build file data (in-memory, no DB)
|
||||
fileChunks := make([]database.FileChunk, len(chunks))
|
||||
chunkFiles := make([]database.ChunkFile, len(chunks))
|
||||
|
||||
// Queue for batch insertion
|
||||
return s.addPendingFile(ctx, pendingFileData{
|
||||
file: fileToProcess.File,
|
||||
fileChunks: fileChunks,
|
||||
chunkFiles: chunkFiles,
|
||||
})
|
||||
```
|
||||
|
||||
**No transaction yet** - just adds to `pendingFiles` slice.
|
||||
|
||||
If `len(pendingFiles) >= fileBatchSize (100)`, triggers `flushPendingFiles()`.
|
||||
|
||||
---
|
||||
|
||||
### Step 2d: Flush Pending Files
|
||||
|
||||
**Location:** `flushPendingFiles()` - called when batch is full or at end of processing
|
||||
|
||||
```go
|
||||
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
for _, data := range files {
|
||||
// 1. Create file record
|
||||
s.repos.Files.Create(txCtx, tx, data.file) // INSERT OR REPLACE
|
||||
|
||||
// 2. Delete old associations
|
||||
s.repos.FileChunks.DeleteByFileID(txCtx, tx, data.file.ID)
|
||||
s.repos.ChunkFiles.DeleteByFileID(txCtx, tx, data.file.ID)
|
||||
|
||||
// 3. Create file_chunks records
|
||||
for _, fc := range data.fileChunks {
|
||||
s.repos.FileChunks.Create(txCtx, tx, &fc) // FK: chunks.chunk_hash
|
||||
}
|
||||
|
||||
// 4. Create chunk_files records
|
||||
for _, cf := range data.chunkFiles {
|
||||
s.repos.ChunkFiles.Create(txCtx, tx, &cf) // FK: chunks.chunk_hash
|
||||
}
|
||||
|
||||
// 5. Add file to snapshot
|
||||
s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, data.file.ID)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
// COMMIT (all or nothing for the batch)
|
||||
```
|
||||
|
||||
**Transaction:**
|
||||
```
|
||||
T_files_batch: BEGIN
|
||||
-- For each file in batch:
|
||||
INSERT OR REPLACE INTO files (...) VALUES (...)
|
||||
DELETE FROM file_chunks WHERE file_id = ?
|
||||
DELETE FROM chunk_files WHERE file_id = ?
|
||||
INSERT INTO file_chunks (file_id, idx, chunk_hash) VALUES (?, ?, ?) -- FK: chunks
|
||||
INSERT INTO chunk_files (chunk_hash, file_id, ...) VALUES (?, ?, ...) -- FK: chunks
|
||||
INSERT INTO snapshot_files (snapshot_id, file_id) VALUES (?, ?)
|
||||
-- Repeat for each file
|
||||
COMMIT
|
||||
```
|
||||
|
||||
**⚠️ CRITICAL DEPENDENCY**: `file_chunks` and `chunk_files` require `chunks.chunk_hash` to exist.
|
||||
|
||||
---
|
||||
|
||||
### Phase 2 End: Final Flush
|
||||
|
||||
```go
|
||||
// Flush any remaining pending files
|
||||
if err := s.flushAllPending(ctx); err != nil { ... }
|
||||
|
||||
// Final packer flush
|
||||
s.packer.Flush()
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## The Current Bug
|
||||
|
||||
### Problem
|
||||
|
||||
The current code attempts to batch file insertions, but `file_chunks` and `chunk_files` have foreign keys to `chunks.chunk_hash`. The batched file flush tries to insert these records, but if the chunks haven't been committed yet, the FK constraint fails.
|
||||
|
||||
### Why It's Happening
|
||||
|
||||
Looking at the sequence:
|
||||
|
||||
1. Process file A, chunk X
|
||||
2. Create chunk X in DB (Transaction commits)
|
||||
3. Add chunk X to packer
|
||||
4. Packer creates blob_chunks for chunk X (needs chunk X - OK, committed in step 2)
|
||||
5. Queue file A with chunk references
|
||||
6. Process file B, chunk Y
|
||||
7. Create chunk Y in DB (Transaction commits)
|
||||
8. ... etc ...
|
||||
9. At end: flushPendingFiles()
|
||||
10. Insert file_chunks for file A referencing chunk X (chunk X committed - should work)
|
||||
|
||||
The chunks ARE being created individually. But something is going wrong.
|
||||
|
||||
### Actual Issue
|
||||
|
||||
Wait - let me re-read the code. The issue is:
|
||||
|
||||
In `processFileStreaming`, when we queue file data:
|
||||
```go
|
||||
fileChunks[i] = database.FileChunk{
|
||||
FileID: fileToProcess.File.ID,
|
||||
Idx: ci.fileChunk.Idx,
|
||||
ChunkHash: ci.fileChunk.ChunkHash,
|
||||
}
|
||||
```
|
||||
|
||||
The `FileID` is set, but `fileToProcess.File.ID` might be empty at this point because the file record hasn't been created yet!
|
||||
|
||||
Looking at `checkFileInMemory`:
|
||||
```go
|
||||
// For new files:
|
||||
if !exists {
|
||||
return file, true // file.ID is empty string!
|
||||
}
|
||||
|
||||
// For existing files:
|
||||
file.ID = existingFile.ID // Reuse existing ID
|
||||
```
|
||||
|
||||
**For NEW files, `file.ID` is empty!**
|
||||
|
||||
Then in `flushPendingFiles`:
|
||||
```go
|
||||
s.repos.Files.Create(txCtx, tx, data.file) // This generates/uses the ID
|
||||
```
|
||||
|
||||
But `data.fileChunks` was built with the EMPTY ID!
|
||||
|
||||
### The Real Problem
|
||||
|
||||
For new files:
|
||||
1. `checkFileInMemory` creates file record with empty ID
|
||||
2. `processFileStreaming` queues file_chunks with empty `FileID`
|
||||
3. `flushPendingFiles` creates file (generates ID), but file_chunks still have empty `FileID`
|
||||
|
||||
Wait, but `Files.Create` should be INSERT OR REPLACE by path, and the file struct should get updated... Let me check.
|
||||
|
||||
Actually, looking more carefully at the code path - the file IS created first in the flush, but the `fileChunks` slice was already built with the old (possibly empty) ID. The ID isn't updated after the file is created.
|
||||
|
||||
Hmm, but looking at the current code:
|
||||
```go
|
||||
fileChunks[i] = database.FileChunk{
|
||||
FileID: fileToProcess.File.ID, // This uses the ID from the File struct
|
||||
```
|
||||
|
||||
And in `checkFileInMemory` for new files, we create a file struct but don't set the ID. However, looking at the database repository, `Files.Create` should be doing `INSERT OR REPLACE` and the ID should be pre-generated...
|
||||
|
||||
Let me check if IDs are being generated. Looking at the File struct usage, it seems like UUIDs should be generated somewhere...
|
||||
|
||||
Actually, looking at the test failures again:
|
||||
```
|
||||
creating file chunk: inserting file_chunk: constraint failed: FOREIGN KEY constraint failed (787)
|
||||
```
|
||||
|
||||
Error 787 is SQLite's foreign key constraint error. The failing FK is on `file_chunks.chunk_hash → chunks.chunk_hash`.
|
||||
|
||||
So the chunks ARE NOT in the database when we try to insert file_chunks. Let me trace through more carefully...
|
||||
|
||||
---
|
||||
|
||||
## Transaction Timing Issue
|
||||
|
||||
The problem is transaction visibility in SQLite.
|
||||
|
||||
Each `WithTx` creates a new transaction that commits at the end. But with batched file insertion:
|
||||
|
||||
1. Chunk transactions commit one at a time
|
||||
2. File batch transaction runs later
|
||||
|
||||
If chunks are being inserted but something goes wrong with transaction isolation, the file batch might not see them.
|
||||
|
||||
But actually SQLite in WAL mode should have SERIALIZABLE isolation by default, so committed transactions should be visible.
|
||||
|
||||
Let me check if the in-memory cache is masking a database problem...
|
||||
|
||||
Actually, wait. Let me re-check the current broken code more carefully. The issue might be simpler.
|
||||
|
||||
---
|
||||
|
||||
## Current Code Flow Analysis
|
||||
|
||||
Looking at `processFileStreaming` in the current broken state:
|
||||
|
||||
```go
|
||||
// For each chunk:
|
||||
if !chunkExists {
|
||||
err := s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
dbChunk := &database.Chunk{ChunkHash: chunk.Hash, Size: chunk.Size}
|
||||
return s.repos.Chunks.Create(txCtx, tx, dbChunk)
|
||||
})
|
||||
// ... check error ...
|
||||
s.addKnownChunk(chunk.Hash)
|
||||
}
|
||||
|
||||
// ... add to packer (creates blob_chunks) ...
|
||||
|
||||
// Collect chunk info for file
|
||||
chunks = append(chunks, chunkInfo{...})
|
||||
```
|
||||
|
||||
Then at end of function:
|
||||
```go
|
||||
// Queue file for batch insertion
|
||||
return s.addPendingFile(ctx, pendingFileData{
|
||||
file: fileToProcess.File,
|
||||
fileChunks: fileChunks,
|
||||
chunkFiles: chunkFiles,
|
||||
})
|
||||
```
|
||||
|
||||
At end of `processPhase`:
|
||||
```go
|
||||
if err := s.flushAllPending(ctx); err != nil { ... }
|
||||
```
|
||||
|
||||
The chunks are being created one-by-one with individual transactions. By the time `flushPendingFiles` runs, all chunk transactions should have committed.
|
||||
|
||||
Unless... there's a bug in how the chunks are being referenced. Let me check if the chunk_hash values are correct.
|
||||
|
||||
Or... maybe the test database is being recreated between operations somehow?
|
||||
|
||||
Actually, let me check the test setup. Maybe the issue is specific to the test environment.
|
||||
|
||||
---
|
||||
|
||||
## Summary of Object Lifecycle
|
||||
|
||||
| Object | When Created | Transaction | Dependencies |
|
||||
|--------|--------------|-------------|--------------|
|
||||
| snapshot | Before scan | Individual tx | None |
|
||||
| blob | When packer needs new blob | Individual tx | None |
|
||||
| chunk | During file chunking (each chunk) | Individual tx | None |
|
||||
| blob_chunks | Immediately after adding chunk to packer | Individual tx | chunks, blobs |
|
||||
| files | Batched at end of processing | Batch tx | None |
|
||||
| file_chunks | With file (batched) | Batch tx | files, chunks |
|
||||
| chunk_files | With file (batched) | Batch tx | files, chunks |
|
||||
| snapshot_files | With file (batched) | Batch tx | snapshots, files |
|
||||
| snapshot_blobs | After blob upload | Individual tx | snapshots, blobs |
|
||||
| uploads | After blob upload | Same tx as snapshot_blobs | blobs, snapshots |
|
||||
|
||||
---
|
||||
|
||||
## Root Cause Analysis
|
||||
|
||||
After detailed analysis, I believe the issue is one of the following:
|
||||
|
||||
### Hypothesis 1: File ID Not Set
|
||||
|
||||
Looking at `checkFileInMemory()` for NEW files:
|
||||
```go
|
||||
if !exists {
|
||||
return file, true // file.ID is empty string!
|
||||
}
|
||||
```
|
||||
|
||||
For new files, `file.ID` is empty. Then in `processFileStreaming`:
|
||||
```go
|
||||
fileChunks[i] = database.FileChunk{
|
||||
FileID: fileToProcess.File.ID, // Empty for new files!
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
The `FileID` in the built `fileChunks` slice is empty.
|
||||
|
||||
Then in `flushPendingFiles`:
|
||||
```go
|
||||
s.repos.Files.Create(txCtx, tx, data.file) // This generates the ID
|
||||
// But data.fileChunks still has empty FileID!
|
||||
for i := range data.fileChunks {
|
||||
s.repos.FileChunks.Create(...) // Uses empty FileID
|
||||
}
|
||||
```
|
||||
|
||||
**Solution**: Generate file IDs upfront in `checkFileInMemory()`:
|
||||
```go
|
||||
file := &database.File{
|
||||
ID: uuid.New().String(), // Generate ID immediately
|
||||
Path: path,
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
### Hypothesis 2: Transaction Isolation
|
||||
|
||||
SQLite with a single connection pool (`MaxOpenConns(1)`) should serialize all transactions. Committed data should be visible to subsequent transactions.
|
||||
|
||||
However, there might be a subtle issue with how `context.Background()` is used in the packer vs the scanner's context.
|
||||
|
||||
## Recommended Fix
|
||||
|
||||
**Step 1: Generate file IDs upfront**
|
||||
|
||||
In `checkFileInMemory()`, generate the UUID for new files immediately:
|
||||
```go
|
||||
file := &database.File{
|
||||
ID: uuid.New().String(), // Always generate ID
|
||||
Path: path,
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
This ensures `file.ID` is set when building `fileChunks` and `chunkFiles` slices.
|
||||
|
||||
**Step 2: Verify by reverting to per-file transactions**
|
||||
|
||||
If Step 1 doesn't fix it, revert to non-batched file insertion to isolate the issue:
|
||||
|
||||
```go
|
||||
// Instead of queuing:
|
||||
// return s.addPendingFile(ctx, pendingFileData{...})
|
||||
|
||||
// Do immediate insertion:
|
||||
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
// Create file
|
||||
s.repos.Files.Create(txCtx, tx, fileToProcess.File)
|
||||
// Delete old associations
|
||||
s.repos.FileChunks.DeleteByFileID(...)
|
||||
s.repos.ChunkFiles.DeleteByFileID(...)
|
||||
// Create new associations
|
||||
for _, fc := range fileChunks {
|
||||
s.repos.FileChunks.Create(...)
|
||||
}
|
||||
for _, cf := range chunkFiles {
|
||||
s.repos.ChunkFiles.Create(...)
|
||||
}
|
||||
// Add to snapshot
|
||||
s.repos.Snapshots.AddFileByID(...)
|
||||
return nil
|
||||
})
|
||||
```
|
||||
|
||||
**Step 3: If batching is still desired**
|
||||
|
||||
After confirming per-file transactions work, re-implement batching with the ID fix in place, and add debug logging to trace exactly which chunk_hash is failing and why.
|
||||
537
README.md
537
README.md
@@ -1,43 +1,35 @@
|
||||
# vaultik (ваултик)
|
||||
|
||||
WIP: pre-1.0, some functions may not be fully implemented yet
|
||||
|
||||
`vaultik` is an incremental backup daemon written in Go. It encrypts data
|
||||
`vaultik` is an incremental backup tool written in Go. It encrypts data
|
||||
using an `age` public key and uploads each encrypted blob directly to a
|
||||
remote S3-compatible object store. It requires no private keys, secrets, or
|
||||
credentials (other than those required to PUT to encrypted object storage,
|
||||
such as S3 API keys) stored on the backed-up system.
|
||||
|
||||
It includes table-stakes features such as:
|
||||
Features:
|
||||
|
||||
* modern encryption (the excellent `age`)
|
||||
* deduplication
|
||||
* incremental backups
|
||||
* modern multithreaded zstd compression with configurable levels
|
||||
* modern encryption ([age](https://age-encryption.org/), X25519 + XChaCha20-Poly1305)
|
||||
* content-defined chunking with deduplication (FastCDC)
|
||||
* incremental backups (only changed files are re-chunked)
|
||||
* multithreaded zstd compression at configurable levels
|
||||
* content-addressed immutable storage
|
||||
* local state tracking in standard SQLite database, enables write-only
|
||||
incremental backups to destination
|
||||
* local state tracking in SQLite (enables write-only incremental backups)
|
||||
* no mutable remote metadata
|
||||
* no plaintext file paths or metadata stored in remote
|
||||
* does not create huge numbers of small files (to keep S3 operation counts
|
||||
down) even if the source system has many small files
|
||||
* no plaintext file paths or metadata in remote storage
|
||||
* packs small files into large blobs (keeps S3 operation counts down)
|
||||
* backs up regular files, symlinks, empty directories, and file permissions
|
||||
* pluggable storage backends: S3, local filesystem, rclone (70+ providers)
|
||||
* pure Go (no CGO), cross-compiles to linux/darwin × amd64/arm64
|
||||
|
||||
## why
|
||||
|
||||
Existing backup software fails under one or more of these conditions:
|
||||
|
||||
* Requires secrets (passwords, private keys) on the source system, which
|
||||
compromises encrypted backups in the case of host system compromise
|
||||
* Depends on symmetric encryption unsuitable for zero-trust environments
|
||||
* Creates one-blob-per-file, which results in excessive S3 operation counts
|
||||
* is slow
|
||||
|
||||
Other backup tools like `restic`, `borg`, and `duplicity` are designed for
|
||||
environments where the source host can store secrets and has access to
|
||||
decryption keys. I don't want to store backup decryption keys on my hosts,
|
||||
only public keys for encryption.
|
||||
decryption keys. `vaultik` is for environments where you don't want to
|
||||
store backup decryption keys on your hosts — only public keys for
|
||||
encryption.
|
||||
|
||||
My requirements are:
|
||||
Requirements that no existing tool meets:
|
||||
|
||||
* open source
|
||||
* no passphrases or private keys on the source host
|
||||
@@ -46,40 +38,13 @@ My requirements are:
|
||||
* encrypted
|
||||
* s3 compatible without an intermediate step or tool
|
||||
|
||||
Surprisingly, no existing tool meets these requirements, so I wrote `vaultik`.
|
||||
## install
|
||||
|
||||
## design goals
|
||||
```sh
|
||||
go install git.eeqj.de/sneak/vaultik@latest
|
||||
```
|
||||
|
||||
1. Backups must require only a public key on the source host.
|
||||
1. No secrets or private keys may exist on the source system.
|
||||
1. Restore must be possible using **only** the backup bucket and a private key.
|
||||
1. Prune must be possible (requires private key, done on different hosts).
|
||||
1. All encryption uses [`age`](https://age-encryption.org/) (X25519, XChaCha20-Poly1305).
|
||||
1. Compression uses `zstd` at a configurable level.
|
||||
1. Files are chunked, and multiple chunks are packed into encrypted blobs
|
||||
to reduce object count for filesystems with many small files.
|
||||
1. All metadata (snapshots) is stored remotely as encrypted SQLite DBs.
|
||||
|
||||
## what
|
||||
|
||||
`vaultik` walks a set of configured directories and builds a
|
||||
content-addressable chunk map of changed files using deterministic chunking.
|
||||
Each chunk is streamed into a blob packer. Blobs are compressed with `zstd`,
|
||||
encrypted with `age`, and uploaded directly to remote storage under a
|
||||
content-addressed S3 path. At the end, a pruned snapshot-specific sqlite
|
||||
database of metadata is created, encrypted, and uploaded alongside the
|
||||
blobs.
|
||||
|
||||
No plaintext file contents ever hit disk. No private key or secret
|
||||
passphrase is needed or stored locally.
|
||||
|
||||
## how
|
||||
|
||||
1. **install**
|
||||
|
||||
```sh
|
||||
go install git.eeqj.de/sneak/vaultik@latest
|
||||
```
|
||||
## quick start
|
||||
|
||||
1. **generate keypair**
|
||||
|
||||
@@ -88,23 +53,21 @@ passphrase is needed or stored locally.
|
||||
grep 'public key:' agekey.txt
|
||||
```
|
||||
|
||||
1. **write config**
|
||||
2. **write config** (see `config.example.yml` for all options)
|
||||
|
||||
```yaml
|
||||
# Named snapshots - each snapshot can contain multiple paths
|
||||
snapshots:
|
||||
system:
|
||||
paths:
|
||||
- /etc
|
||||
- /var/lib
|
||||
exclude:
|
||||
- '*.cache' # Snapshot-specific exclusions
|
||||
- '*.cache'
|
||||
home:
|
||||
paths:
|
||||
- /home/user/documents
|
||||
- /home/user/photos
|
||||
|
||||
# Global exclusions (apply to all snapshots)
|
||||
exclude:
|
||||
- '*.log'
|
||||
- '*.tmp'
|
||||
@@ -112,32 +75,36 @@ passphrase is needed or stored locally.
|
||||
- 'node_modules'
|
||||
|
||||
age_recipients:
|
||||
- age1278m9q7dp3chsh2dcy82qk27v047zywyvtxwnj4cvt0z65jw6a7q5dqhfj
|
||||
- age1YOUR_PUBLIC_KEY_HERE
|
||||
|
||||
# Storage backend (pick one):
|
||||
storage_url: "s3://mybucket/backups?endpoint=s3.example.com®ion=us-east-1"
|
||||
# storage_url: "file:///mnt/backups"
|
||||
# storage_url: "rclone://myremote/path/to/backups"
|
||||
|
||||
# For s3:// URLs, credentials are still required:
|
||||
s3:
|
||||
endpoint: https://s3.example.com
|
||||
bucket: vaultik-data
|
||||
prefix: host1/
|
||||
access_key_id: ...
|
||||
secret_access_key: ...
|
||||
region: us-east-1
|
||||
backup_interval: 1h
|
||||
full_scan_interval: 24h
|
||||
min_time_between_run: 15m
|
||||
chunk_size: 10MB
|
||||
blob_size_limit: 1GB
|
||||
```
|
||||
|
||||
1. **run**
|
||||
3. **run**
|
||||
|
||||
```sh
|
||||
# Create all configured snapshots
|
||||
vaultik --config /etc/vaultik.yaml snapshot create
|
||||
# Back up all configured snapshots
|
||||
vaultik --config /etc/vaultik.yml snapshot create
|
||||
|
||||
# Create specific snapshots by name
|
||||
vaultik --config /etc/vaultik.yaml snapshot create home system
|
||||
# Back up specific snapshots by name
|
||||
vaultik --config /etc/vaultik.yml snapshot create home system
|
||||
|
||||
# Silent mode for cron
|
||||
vaultik --config /etc/vaultik.yaml snapshot create --cron
|
||||
vaultik --config /etc/vaultik.yml snapshot create --cron
|
||||
|
||||
# Back up and clean up old snapshots + orphan blobs in one shot
|
||||
vaultik --config /etc/vaultik.yml snapshot create --prune
|
||||
|
||||
# Daily cron: back up, keep last 4 weeks of snapshots
|
||||
vaultik --config /etc/vaultik.yml snapshot create --cron --prune --keep-newer-than 4w
|
||||
```
|
||||
|
||||
---
|
||||
@@ -147,302 +114,284 @@ passphrase is needed or stored locally.
|
||||
### commands
|
||||
|
||||
```sh
|
||||
vaultik [--config <path>] snapshot create [snapshot-names...] [--cron] [--daemon] [--prune]
|
||||
vaultik [--config <path>] snapshot create [snapshot-names...] [--cron] [--prune] [--keep-newer-than <duration>] [--skip-errors]
|
||||
vaultik [--config <path>] snapshot list [--json]
|
||||
vaultik [--config <path>] snapshot verify <snapshot-id> [--deep]
|
||||
vaultik [--config <path>] snapshot purge [--keep-latest | --older-than <duration>] [--name <name>] [--force]
|
||||
vaultik [--config <path>] snapshot remove <snapshot-id> [--dry-run] [--force]
|
||||
vaultik [--config <path>] snapshot verify <snapshot-id> [--deep] [--json]
|
||||
vaultik [--config <path>] snapshot purge [--keep-latest | --older-than <duration>] [--snapshot <name>...] [--force]
|
||||
vaultik [--config <path>] snapshot remove <snapshot-id|--all> [--dry-run] [--force] [--remote] [--json]
|
||||
vaultik [--config <path>] snapshot prune
|
||||
vaultik [--config <path>] restore <snapshot-id> <target-dir> [paths...]
|
||||
vaultik [--config <path>] prune [--dry-run] [--force]
|
||||
vaultik [--config <path>] snapshot cleanup
|
||||
vaultik [--config <path>] restore <snapshot-id> <target-dir> [paths...] [--verify]
|
||||
vaultik [--config <path>] prune [--force] [--json]
|
||||
vaultik [--config <path>] info
|
||||
vaultik [--config <path>] remote info [--json]
|
||||
vaultik [--config <path>] store info
|
||||
vaultik [--config <path>] database purge [--force]
|
||||
vaultik version
|
||||
```
|
||||
|
||||
### environment
|
||||
### global flags
|
||||
|
||||
* `VAULTIK_AGE_SECRET_KEY`: Required for `restore` and deep `verify`. Contains the age private key for decryption.
|
||||
* `VAULTIK_CONFIG`: Optional path to config file.
|
||||
* `--config <path>`: Path to config file (default: `$VAULTIK_CONFIG` or `/etc/vaultik/config.yml`)
|
||||
* `--verbose`, `-v`: Enable verbose output
|
||||
* `--debug`: Enable debug output
|
||||
* `--quiet`, `-q`: Suppress non-error output
|
||||
|
||||
### environment variables
|
||||
|
||||
* `VAULTIK_AGE_SECRET_KEY`: Age private key for decryption (required for `restore` and `verify --deep`)
|
||||
* `VAULTIK_CONFIG`: Path to config file (overridden by `--config`)
|
||||
* `VAULTIK_INDEX_PATH`: Override local SQLite index path
|
||||
|
||||
### command details
|
||||
|
||||
**snapshot create**: Perform incremental backup of configured snapshots
|
||||
* Config is located at `/etc/vaultik/config.yml` by default
|
||||
**snapshot create**: Perform incremental backup of configured snapshots.
|
||||
* Optional snapshot names argument to create specific snapshots (default: all)
|
||||
* `--cron`: Silent unless error (for crontab)
|
||||
* `--daemon`: Run continuously with filesystem monitoring and periodic scans (see [daemon mode](#daemon-mode))
|
||||
* `--prune`: Delete old snapshots and orphaned blobs after backup
|
||||
* `--prune`: After backup, drop older snapshots of each backed-up name and
|
||||
remove orphaned blobs from remote storage. By default keeps only the latest
|
||||
snapshot per name; use `--keep-newer-than` for a rolling window.
|
||||
* `--keep-newer-than <duration>`: With `--prune`, keep snapshots newer than
|
||||
this duration instead of only the latest (e.g. `4w`, `30d`, `6mo`, `1y`)
|
||||
* `--skip-errors`: Skip file read errors (log them loudly but continue)
|
||||
|
||||
**snapshot list**: List all snapshots with their timestamps and sizes
|
||||
**snapshot list**: List all snapshots with their timestamps and sizes.
|
||||
* `--json`: Output in JSON format
|
||||
|
||||
**snapshot verify**: Verify snapshot integrity
|
||||
* `--deep`: Download and verify blob contents (not just existence)
|
||||
**snapshot verify**: Verify snapshot integrity.
|
||||
* Default (shallow): checks that all blobs referenced in the manifest exist in storage
|
||||
* `--deep`: Downloads and decrypts each blob, verifies chunk hashes against the
|
||||
encrypted metadata database
|
||||
* `--json`: Output results as JSON
|
||||
|
||||
**snapshot purge**: Remove old snapshots based on criteria
|
||||
* `--keep-latest`: Keep the most recent snapshot per snapshot name
|
||||
* `--older-than`: Remove snapshots older than duration (e.g., 30d, 6mo, 1y)
|
||||
* `--name`: Filter purge to a specific snapshot name
|
||||
**snapshot purge**: Remove old snapshots based on criteria. Retention is
|
||||
per-snapshot-name (`--keep-latest` keeps the latest of each name, not the
|
||||
latest globally).
|
||||
* `--keep-latest`: Keep only the most recent snapshot of each name
|
||||
* `--older-than <duration>`: Remove snapshots older than duration (e.g. `30d`, `6m`, `1y`)
|
||||
* `--snapshot <name>`: Restrict to specific snapshot names (repeat for multiple)
|
||||
* `--force`: Skip confirmation prompt
|
||||
|
||||
**snapshot remove**: Remove a specific snapshot
|
||||
**snapshot remove**: Remove a specific snapshot from the local database.
|
||||
* `--remote`: Also remove snapshot metadata from remote storage
|
||||
* `--all`: Remove all snapshots (requires `--force`)
|
||||
* `--dry-run`: Show what would be deleted without deleting
|
||||
* `--force`: Skip confirmation prompt
|
||||
* `--json`: Output result as JSON
|
||||
|
||||
**snapshot prune**: Clean orphaned data from local database
|
||||
**snapshot prune**: Clean orphaned data from the local database (files,
|
||||
chunks, blobs not referenced by any snapshot).
|
||||
|
||||
**restore**: Restore snapshot to target directory
|
||||
* Requires `VAULTIK_AGE_SECRET_KEY` environment variable with age private key
|
||||
**snapshot cleanup**: Remove stale local snapshot records that have no
|
||||
corresponding metadata in remote storage. These are typically left behind
|
||||
by incomplete or interrupted backups. Does not touch remote storage.
|
||||
|
||||
**restore**: Restore files from a backup snapshot.
|
||||
* Requires `VAULTIK_AGE_SECRET_KEY` environment variable
|
||||
* Optional path arguments to restore specific files/directories (default: all)
|
||||
* Downloads and decrypts metadata, fetches required blobs, reconstructs files
|
||||
* Preserves file permissions, timestamps, and ownership (ownership requires root)
|
||||
* Handles symlinks and directories
|
||||
* Preserves file permissions, timestamps, ownership (ownership requires root),
|
||||
symlinks, and empty directories
|
||||
* `--verify`: After restoring, verify every file's chunk hashes match
|
||||
|
||||
**prune**: Remove unreferenced blobs from remote storage
|
||||
* Scans all snapshots for referenced blobs
|
||||
* Deletes orphaned blobs
|
||||
**prune**: Remove unreferenced blobs from remote storage.
|
||||
* Scans all snapshot manifests for referenced blobs, deletes any blob not referenced
|
||||
* `--force`: Skip confirmation prompt
|
||||
* `--json`: Output stats as JSON
|
||||
|
||||
**info**: Display system and configuration information
|
||||
**info**: Display system configuration, storage settings, encryption
|
||||
recipients, and local database statistics.
|
||||
|
||||
**store info**: Display S3 bucket configuration and storage statistics
|
||||
**remote info**: Show detailed remote storage information including per-snapshot
|
||||
metadata sizes, blob counts, and orphaned blob detection.
|
||||
* `--json`: Output as JSON
|
||||
|
||||
**store info**: Display storage backend type and statistics.
|
||||
|
||||
**database purge**: Delete the local SQLite state database entirely. Remote
|
||||
storage is unaffected; the next backup will do a full scan and re-deduplicate
|
||||
against existing remote blobs.
|
||||
* `--force`: Skip confirmation prompt
|
||||
|
||||
---
|
||||
|
||||
## daemon mode
|
||||
## storage backends
|
||||
|
||||
When `--daemon` is passed to `snapshot create`, vaultik runs as a
|
||||
long-running process that continuously monitors configured directories for
|
||||
changes and creates backups automatically.
|
||||
vaultik supports three storage backends, selected via the `storage_url` config field:
|
||||
|
||||
```sh
|
||||
vaultik --config /etc/vaultik.yaml snapshot create --daemon
|
||||
```
|
||||
**S3** (`s3://bucket/prefix?endpoint=host®ion=us-east-1`): Any S3-compatible
|
||||
object store. Credentials are read from `s3.access_key_id` and
|
||||
`s3.secret_access_key` in the config file.
|
||||
|
||||
### how it works
|
||||
**Local filesystem** (`file:///path/to/backup`): Stores blobs and metadata on
|
||||
a local or mounted filesystem. Useful for testing or backing up to a NAS.
|
||||
|
||||
1. **Initial backup**: On startup, a full backup of all configured snapshots
|
||||
runs immediately.
|
||||
2. **Filesystem watching**: All configured snapshot paths are monitored for
|
||||
file changes using OS-native filesystem notifications (inotify on Linux,
|
||||
FSEvents on macOS, ReadDirectoryChangesW on Windows) via the
|
||||
[fsnotify](https://github.com/fsnotify/fsnotify) library.
|
||||
3. **Periodic backups**: At each `backup_interval` tick, if filesystem
|
||||
changes have been detected and `min_time_between_run` has elapsed since
|
||||
the last backup, a backup runs for only the affected snapshots.
|
||||
4. **Full scans**: At each `full_scan_interval` tick, a full backup of all
|
||||
snapshots runs regardless of detected changes. This catches any changes
|
||||
that filesystem notifications may have missed.
|
||||
5. **Graceful shutdown**: On SIGTERM or SIGINT, the daemon completes any
|
||||
in-progress backup before exiting.
|
||||
**Rclone** (`rclone://remote/path`): Uses rclone's 70+ supported cloud
|
||||
providers. Requires rclone to be configured separately (`rclone config`).
|
||||
|
||||
### configuration
|
||||
|
||||
These config fields control daemon behavior:
|
||||
|
||||
```yaml
|
||||
backup_interval: 1h # How often to check for changes and run backups
|
||||
full_scan_interval: 24h # How often to do a complete scan of all paths
|
||||
min_time_between_run: 15m # Minimum gap between consecutive backup runs
|
||||
```
|
||||
|
||||
### notes
|
||||
|
||||
* New directories created under watched paths are automatically picked up.
|
||||
* The daemon uses the same `CreateSnapshot` logic as one-shot mode — each
|
||||
backup run is a standard incremental snapshot.
|
||||
* The `--prune`, `--cron`, and `--skip-errors` flags work in daemon mode
|
||||
and apply to each individual backup run.
|
||||
Legacy S3 configuration via `s3.*` fields (endpoint, bucket, prefix, etc.) is
|
||||
still supported for backward compatibility. `storage_url` takes precedence if
|
||||
both are set.
|
||||
|
||||
---
|
||||
|
||||
## architecture
|
||||
|
||||
### s3 bucket layout
|
||||
### remote storage layout
|
||||
|
||||
```
|
||||
s3://<bucket>/<prefix>/
|
||||
<bucket>/<prefix>/
|
||||
├── blobs/
|
||||
│ └── <aa>/<bb>/<full_blob_hash>
|
||||
└── metadata/
|
||||
├── <snapshot_id>/
|
||||
│ ├── db.zst.age
|
||||
│ └── manifest.json.zst
|
||||
└── <snapshot_id>/
|
||||
├── db.zst.age # Encrypted binary SQLite database
|
||||
└── manifest.json.zst # Unencrypted blob list (for pruning)
|
||||
```
|
||||
|
||||
* `blobs/<aa>/<bb>/...`: Two-level directory sharding using first 4 hex chars of blob hash
|
||||
* `metadata/<snapshot_id>/db.zst.age`: Encrypted, compressed SQLite database
|
||||
* `metadata/<snapshot_id>/manifest.json.zst`: Unencrypted blob list for pruning
|
||||
* Blobs are two-level directory sharded using the first 4 hex chars of the blob hash
|
||||
* `db.zst.age` is a binary SQLite database (zstd compressed, age encrypted)
|
||||
containing all file metadata, chunk mappings, and relationships for the snapshot
|
||||
* `manifest.json.zst` is an unencrypted compressed JSON blob list, enabling
|
||||
pruning without the private key
|
||||
|
||||
### blob manifest format
|
||||
|
||||
The `manifest.json.zst` file is unencrypted (compressed JSON) to enable pruning without decryption:
|
||||
|
||||
```json
|
||||
{
|
||||
"snapshot_id": "hostname_snapshotname_2025-01-01T12:00:00Z",
|
||||
"blob_hashes": [
|
||||
"aa1234567890abcdef...",
|
||||
"bb2345678901bcdef0..."
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
Snapshot IDs follow the format `<hostname>_<snapshot-name>_<timestamp>` (e.g., `server1_home_2025-01-01T12:00:00Z`).
|
||||
|
||||
### local sqlite schema
|
||||
|
||||
```sql
|
||||
CREATE TABLE files (
|
||||
id TEXT PRIMARY KEY,
|
||||
path TEXT NOT NULL UNIQUE,
|
||||
mtime INTEGER NOT NULL,
|
||||
size INTEGER NOT NULL,
|
||||
mode INTEGER NOT NULL,
|
||||
uid INTEGER NOT NULL,
|
||||
gid INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE file_chunks (
|
||||
file_id TEXT NOT NULL,
|
||||
idx INTEGER NOT NULL,
|
||||
chunk_hash TEXT NOT NULL,
|
||||
PRIMARY KEY (file_id, idx),
|
||||
FOREIGN KEY (file_id) REFERENCES files(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE chunks (
|
||||
chunk_hash TEXT PRIMARY KEY,
|
||||
size INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE blobs (
|
||||
id TEXT PRIMARY KEY,
|
||||
blob_hash TEXT NOT NULL UNIQUE,
|
||||
uncompressed INTEGER NOT NULL,
|
||||
compressed INTEGER NOT NULL,
|
||||
uploaded_at INTEGER
|
||||
);
|
||||
|
||||
CREATE TABLE blob_chunks (
|
||||
blob_hash TEXT NOT NULL,
|
||||
chunk_hash TEXT NOT NULL,
|
||||
offset INTEGER NOT NULL,
|
||||
length INTEGER NOT NULL,
|
||||
PRIMARY KEY (blob_hash, chunk_hash)
|
||||
);
|
||||
|
||||
CREATE TABLE chunk_files (
|
||||
chunk_hash TEXT NOT NULL,
|
||||
file_id TEXT NOT NULL,
|
||||
file_offset INTEGER NOT NULL,
|
||||
length INTEGER NOT NULL,
|
||||
PRIMARY KEY (chunk_hash, file_id)
|
||||
);
|
||||
|
||||
CREATE TABLE snapshots (
|
||||
id TEXT PRIMARY KEY,
|
||||
hostname TEXT NOT NULL,
|
||||
vaultik_version TEXT NOT NULL,
|
||||
started_at INTEGER NOT NULL,
|
||||
completed_at INTEGER,
|
||||
file_count INTEGER NOT NULL,
|
||||
chunk_count INTEGER NOT NULL,
|
||||
blob_count INTEGER NOT NULL,
|
||||
total_size INTEGER NOT NULL,
|
||||
blob_size INTEGER NOT NULL,
|
||||
compression_ratio REAL NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE snapshot_files (
|
||||
snapshot_id TEXT NOT NULL,
|
||||
file_id TEXT NOT NULL,
|
||||
PRIMARY KEY (snapshot_id, file_id)
|
||||
);
|
||||
|
||||
CREATE TABLE snapshot_blobs (
|
||||
snapshot_id TEXT NOT NULL,
|
||||
blob_id TEXT NOT NULL,
|
||||
blob_hash TEXT NOT NULL,
|
||||
PRIMARY KEY (snapshot_id, blob_id)
|
||||
);
|
||||
```
|
||||
Snapshot IDs follow the format `<hostname>_<snapshot-name>_<RFC3339-timestamp>`
|
||||
(e.g. `server1_home_2025-06-01T12:00:00Z`).
|
||||
|
||||
### data flow
|
||||
|
||||
#### backup
|
||||
**backup:**
|
||||
|
||||
1. Load config, open local SQLite index
|
||||
1. Walk source directories, check mtime/size against index
|
||||
1. For changed/new files: chunk using content-defined chunking
|
||||
1. For each chunk: hash, check if already uploaded, add to blob packer
|
||||
1. When blob reaches threshold: compress, encrypt, upload to S3
|
||||
1. Build snapshot metadata, compress, encrypt, upload
|
||||
1. Create blob manifest (unencrypted) for pruning support
|
||||
1. Open local SQLite index, load known files and chunks into memory
|
||||
2. Walk source directories, compare mtime/size/mode against index
|
||||
3. For changed/new files: chunk using content-defined chunking (FastCDC)
|
||||
4. For symlinks and directories: record metadata (no chunking)
|
||||
5. For each chunk: hash, check dedup, add to blob packer
|
||||
6. When blob reaches size threshold: compress (zstd), encrypt (age), upload
|
||||
7. Build snapshot metadata database, compress, encrypt, upload
|
||||
8. Create unencrypted blob manifest for pruning support
|
||||
|
||||
#### restore
|
||||
**restore:**
|
||||
|
||||
1. Download `metadata/<snapshot_id>/db.zst.age`
|
||||
1. Decrypt and decompress SQLite database
|
||||
1. Query files table (optionally filtered by paths)
|
||||
1. For each file, get ordered chunk list from file_chunks
|
||||
1. Download required blobs, decrypt, decompress
|
||||
1. Extract chunks and reconstruct files
|
||||
1. Restore permissions, mtime, uid/gid
|
||||
1. Download and decrypt `metadata/<snapshot_id>/db.zst.age`
|
||||
2. Open the binary SQLite database
|
||||
3. Query files (optionally filtered by paths)
|
||||
4. Download and decrypt required blobs
|
||||
5. Extract chunks, reconstruct files
|
||||
6. Restore permissions, timestamps, ownership, symlinks
|
||||
|
||||
#### prune
|
||||
**prune:**
|
||||
|
||||
1. List all snapshot manifests
|
||||
1. Build set of all referenced blob hashes
|
||||
1. List all blobs in storage
|
||||
1. Delete any blob not in referenced set
|
||||
2. Build set of all referenced blob hashes
|
||||
3. List all blobs in storage
|
||||
4. Delete any blob not in the referenced set
|
||||
|
||||
### chunking
|
||||
### chunking and deduplication
|
||||
|
||||
* Content-defined chunking using FastCDC algorithm
|
||||
* Content-defined chunking using the FastCDC algorithm
|
||||
* Average chunk size: configurable (default 10MB)
|
||||
* Deduplication at chunk level
|
||||
* Multiple chunks packed into blobs for efficiency
|
||||
* Deduplication at file level (unchanged files skipped) and chunk level
|
||||
(identical chunks across files stored once)
|
||||
* Multiple chunks packed into blobs to reduce object count
|
||||
|
||||
### encryption
|
||||
|
||||
* Asymmetric encryption using age (X25519 + XChaCha20-Poly1305)
|
||||
* Only public key needed on source host
|
||||
* Each blob encrypted independently
|
||||
* Metadata databases also encrypted
|
||||
* Only the public key is needed on the source host
|
||||
* Each blob and each metadata database is encrypted independently
|
||||
* Multiple recipients supported (encrypt to multiple keys)
|
||||
|
||||
### compression
|
||||
|
||||
* zstd compression at configurable level
|
||||
* Applied before encryption
|
||||
* Blob-level compression for efficiency
|
||||
* zstd compression at configurable level (1-19, default 3)
|
||||
* Applied before encryption at the blob level
|
||||
|
||||
---
|
||||
|
||||
## does not
|
||||
## configuration reference
|
||||
|
||||
* Store any secrets on the backed-up machine
|
||||
* Require mutable remote metadata
|
||||
* Use tarballs, restic, rsync, or ssh
|
||||
* Require a symmetric passphrase or password
|
||||
* Trust the source system with anything
|
||||
See `config.example.yml` for a complete annotated example. Key fields:
|
||||
|
||||
## does
|
||||
| Field | Default | Description |
|
||||
|-------|---------|-------------|
|
||||
| `age_recipients` | (required) | Age public keys for encryption |
|
||||
| `snapshots` | (required) | Named snapshot definitions with paths and excludes |
|
||||
| `storage_url` | | Storage backend URL (`s3://`, `file://`, `rclone://`) |
|
||||
| `s3.*` | | Legacy S3 configuration (endpoint, bucket, credentials) |
|
||||
| `exclude` | | Global exclude patterns (applied to all snapshots) |
|
||||
| `chunk_size` | `10MB` | Average chunk size for content-defined chunking |
|
||||
| `blob_size_limit` | `10GB` | Maximum blob size before splitting |
|
||||
| `compression_level` | `3` | zstd compression level (1-19) |
|
||||
| `hostname` | system hostname | Hostname used in snapshot IDs |
|
||||
| `index_path` | `~/.local/share/.../index.sqlite` | Local SQLite index path |
|
||||
|
||||
* Incremental deduplicated backup
|
||||
* Blob-packed chunk encryption
|
||||
* Content-addressed immutable blobs
|
||||
* Public-key encryption only
|
||||
* SQLite-based local and snapshot metadata
|
||||
* Fully stream-processed storage
|
||||
---
|
||||
|
||||
## limitations
|
||||
|
||||
* **No extended attributes (xattrs).** ACLs, macOS Finder metadata,
|
||||
quarantine flags, SELinux labels, and other extended attributes are not
|
||||
backed up or restored.
|
||||
* **No hard link detection.** Two hard links to the same inode are backed
|
||||
up as independent files. Content deduplication means the data is stored
|
||||
once, but the hard link relationship is lost on restore.
|
||||
* **No sparse file support.** Sparse files are fully materialized during
|
||||
backup. A 100 GB sparse VM disk that is mostly zeros will consume the
|
||||
full (compressed) size in storage.
|
||||
* **No bandwidth limiting.** Uploads and downloads use whatever bandwidth
|
||||
is available. There is no `--bwlimit` flag yet.
|
||||
* **No parallel blob downloads during restore.** Blobs are fetched
|
||||
sequentially. Restore speed is bound by single-stream throughput.
|
||||
* **Device nodes, named pipes, and sockets are silently skipped.** Only
|
||||
regular files, directories, and symlinks are backed up.
|
||||
* **No database migrations.** If the local SQLite schema changes between
|
||||
versions, delete the local database (`vaultik database purge`) and run
|
||||
a full backup. Remote storage is unaffected.
|
||||
* **Files that change during backup may be inconsistent.** There is no
|
||||
filesystem snapshot or freeze. If a file is modified between the scan
|
||||
and chunk phases, the backed-up copy may reflect a partial write.
|
||||
* **Ownership restoration requires root.** File uid/gid are recorded
|
||||
and restored, but `chown` requires elevated privileges. Without root,
|
||||
files are restored with the current user's ownership.
|
||||
|
||||
---
|
||||
|
||||
## roadmap
|
||||
|
||||
Items for future releases:
|
||||
|
||||
* Error-condition tests (network failures, disk full, corrupted/missing blobs)
|
||||
* Parallel blob downloads during restore
|
||||
* Bandwidth limiting (`--bwlimit`)
|
||||
* Security audit of encryption implementation
|
||||
* Man pages and richer `--help` examples
|
||||
|
||||
---
|
||||
|
||||
## requirements
|
||||
|
||||
* Go 1.24 or later
|
||||
* S3-compatible object storage
|
||||
* Sufficient disk space for local index (typically <1GB)
|
||||
* Go 1.26 or later
|
||||
* S3-compatible object storage (or local filesystem, or rclone remote)
|
||||
|
||||
## development workflow
|
||||
|
||||
All changes follow this workflow. No exceptions.
|
||||
|
||||
1. Create a feature branch off `main`.
|
||||
2. Write tests.
|
||||
3. Write the implementation.
|
||||
4. Fix implementation errors until it compiles and tests pass.
|
||||
5. Fix linting errors (`make lint`).
|
||||
6. Update documentation and README as required by the change.
|
||||
7. Format code (`make fmt`).
|
||||
8. Run `make check` (lint + fmt-check + test). Fix any issues. Repeat until clean.
|
||||
9. Commit on the branch.
|
||||
10. Merge to `main`.
|
||||
11. Push.
|
||||
|
||||
Do not commit directly to `main`. Do not skip steps.
|
||||
|
||||
Repository policies for AI agents are in [`AGENTS.md`](AGENTS.md).
|
||||
|
||||
## license
|
||||
|
||||
|
||||
126
TODO.md
126
TODO.md
@@ -1,126 +0,0 @@
|
||||
# Vaultik 1.0 TODO
|
||||
|
||||
Linear list of tasks to complete before 1.0 release.
|
||||
|
||||
## Rclone Storage Backend (Complete)
|
||||
|
||||
Add rclone as a storage backend via Go library import, allowing vaultik to use any of rclone's 70+ supported cloud storage providers.
|
||||
|
||||
**Configuration:**
|
||||
```yaml
|
||||
storage_url: "rclone://myremote/path/to/backups"
|
||||
```
|
||||
User must have rclone configured separately (via `rclone config`).
|
||||
|
||||
**Implementation Steps:**
|
||||
1. [x] Add rclone dependency to go.mod
|
||||
2. [x] Create `internal/storage/rclone.go` implementing `Storer` interface
|
||||
- `NewRcloneStorer(remote, path)` - init with `configfile.Install()` and `fs.NewFs()`
|
||||
- `Put` / `PutWithProgress` - use `operations.Rcat()`
|
||||
- `Get` - use `fs.NewObject()` then `obj.Open()`
|
||||
- `Stat` - use `fs.NewObject()` for size/metadata
|
||||
- `Delete` - use `obj.Remove()`
|
||||
- `List` / `ListStream` - use `operations.ListFn()`
|
||||
- `Info` - return remote name
|
||||
3. [x] Update `internal/storage/url.go` - parse `rclone://remote/path` URLs
|
||||
4. [x] Update `internal/storage/module.go` - add rclone case to `storerFromURL()`
|
||||
5. [x] Test with real rclone remote
|
||||
|
||||
**Error Mapping:**
|
||||
- `fs.ErrorObjectNotFound` → `ErrNotFound`
|
||||
- `fs.ErrorDirNotFound` → `ErrNotFound`
|
||||
- `fs.ErrorNotFoundInConfigFile` → `ErrRemoteNotFound` (new)
|
||||
|
||||
---
|
||||
|
||||
## CLI Polish (Priority)
|
||||
|
||||
1. Improve error messages throughout
|
||||
- Ensure all errors include actionable context
|
||||
- Add suggestions for common issues (e.g., "did you set VAULTIK_AGE_SECRET_KEY?")
|
||||
|
||||
## Security (Priority)
|
||||
|
||||
1. Audit encryption implementation
|
||||
- Verify age encryption is used correctly
|
||||
- Ensure no plaintext leaks in logs or errors
|
||||
- Verify blob hashes are computed correctly
|
||||
|
||||
1. Secure memory handling for secrets
|
||||
- Clear S3 credentials from memory after client init
|
||||
- Document that age_secret_key is env-var only (already implemented)
|
||||
|
||||
## Testing
|
||||
|
||||
1. Write integration tests for restore command
|
||||
|
||||
1. Write end-to-end integration test
|
||||
- Create backup
|
||||
- Verify backup
|
||||
- Restore backup
|
||||
- Compare restored files to originals
|
||||
|
||||
1. Add tests for edge cases
|
||||
- Empty directories
|
||||
- Symlinks
|
||||
- Special characters in filenames
|
||||
- Very large files (multi-GB)
|
||||
- Many small files (100k+)
|
||||
|
||||
1. Add tests for error conditions
|
||||
- Network failures during upload
|
||||
- Disk full during restore
|
||||
- Corrupted blobs
|
||||
- Missing blobs
|
||||
|
||||
## Performance
|
||||
|
||||
1. Profile and optimize restore performance
|
||||
- Parallel blob downloads
|
||||
- Streaming decompression/decryption
|
||||
- Efficient chunk reassembly
|
||||
|
||||
1. Add bandwidth limiting option
|
||||
- `--bwlimit` flag for upload/download speed limiting
|
||||
|
||||
## Documentation
|
||||
|
||||
1. Add man page or --help improvements
|
||||
- Detailed help for each command
|
||||
- Examples in help output
|
||||
|
||||
## Final Polish
|
||||
|
||||
1. Ensure version is set correctly in releases
|
||||
|
||||
1. Create release process
|
||||
- Binary releases for supported platforms
|
||||
- Checksums for binaries
|
||||
- Release notes template
|
||||
|
||||
1. Final code review
|
||||
- Remove debug statements
|
||||
- Ensure consistent code style
|
||||
|
||||
1. Tag and release v1.0.0
|
||||
|
||||
---
|
||||
|
||||
## Daemon Mode (Complete)
|
||||
|
||||
1. [x] Implement cross-platform filesystem watcher (via fsnotify)
|
||||
- Watches source directories for changes
|
||||
- Tracks dirty paths in memory
|
||||
- Automatically watches new directories
|
||||
|
||||
1. [x] Implement backup scheduler in daemon mode
|
||||
- Respects backup_interval config
|
||||
- Triggers backup when dirty paths exist and interval elapsed
|
||||
- Implements full_scan_interval for periodic full scans
|
||||
- Respects min_time_between_run to prevent excessive runs
|
||||
|
||||
1. [x] Add proper signal handling for daemon
|
||||
- Graceful shutdown on SIGTERM/SIGINT
|
||||
- Completes in-progress backup before exit
|
||||
|
||||
1. [x] Write tests for daemon mode
|
||||
@@ -291,21 +291,6 @@ storage_url: "rclone://las1stor1//srv/pool.2024.04/backups/heraklion"
|
||||
# # Default: 5MB
|
||||
# #part_size: 5MB
|
||||
|
||||
# How often to run backups in daemon mode
|
||||
# Format: 1h, 30m, 24h, etc
|
||||
# Default: 1h
|
||||
#backup_interval: 1h
|
||||
|
||||
# How often to do a full filesystem scan in daemon mode
|
||||
# Between full scans, inotify is used to detect changes
|
||||
# Default: 24h
|
||||
#full_scan_interval: 24h
|
||||
|
||||
# Minimum time between backup runs in daemon mode
|
||||
# Prevents backups from running too frequently
|
||||
# Default: 15m
|
||||
#min_time_between_run: 15m
|
||||
|
||||
# Path to local SQLite index database
|
||||
# This database tracks file state for incremental backups
|
||||
# Default: /var/lib/vaultik/index.sqlite
|
||||
|
||||
@@ -5,8 +5,14 @@
|
||||
Vaultik uses a local SQLite database to track file metadata, chunk mappings, and blob associations during the backup process. This database serves as an index for incremental backups and enables efficient deduplication.
|
||||
|
||||
**Important Notes:**
|
||||
- **No Migration Support**: Vaultik does not support database schema migrations. If the schema changes, the local database must be deleted and recreated by performing a full backup.
|
||||
- **Version Compatibility**: In rare cases, you may need to use the same version of Vaultik to restore a backup as was used to create it. This ensures compatibility with the metadata format stored in S3.
|
||||
- **No Migration Support (pre-1.0)**: Vaultik does not support database schema
|
||||
migrations. The local index is treated as disposable — if the schema changes,
|
||||
delete the local SQLite database (`vaultik database purge`) and run a full
|
||||
backup. The remote storage is unaffected; the new index will re-deduplicate
|
||||
against existing remote blobs.
|
||||
- **Version Compatibility**: In rare cases, you may need to use the same version
|
||||
of Vaultik to restore a backup as was used to create it. This ensures
|
||||
compatibility with the metadata format stored in S3.
|
||||
|
||||
## Database Tables
|
||||
|
||||
|
||||
@@ -43,18 +43,19 @@ Blobs contain the actual file data from backups and must be encrypted for securi
|
||||
Each snapshot has its own subdirectory named with the snapshot ID.
|
||||
|
||||
### Snapshot ID Format
|
||||
- **Format**: `<hostname>-<YYYYMMDD>-<HHMMSSZ>`
|
||||
- **Example**: `laptop-20240115-143052Z`
|
||||
- **Format**: `<hostname>_<snapshot-name>_<RFC3339>` (or `<hostname>_<RFC3339>` if no
|
||||
name was specified)
|
||||
- **Example**: `laptop_home_2024-01-15T14:30:52Z`
|
||||
- **Components**:
|
||||
- Hostname (may contain hyphens)
|
||||
- Date in YYYYMMDD format
|
||||
- Time in HHMMSSZ format (Z indicates UTC)
|
||||
- Short hostname (everything before the first dot is stripped from the FQDN)
|
||||
- Snapshot name from the configured `snapshots:` map (optional)
|
||||
- RFC3339 UTC timestamp
|
||||
|
||||
### Files in Each Snapshot Directory
|
||||
|
||||
#### `db.zst.age` - Encrypted Database Dump
|
||||
- **What it contains**: Complete SQLite database dump for this snapshot
|
||||
- **Format**: SQL dump → Zstandard compressed → Age encrypted
|
||||
#### `db.zst.age` - Encrypted Database
|
||||
- **What it contains**: Pruned binary SQLite database for this snapshot
|
||||
- **Format**: Binary SQLite → Zstandard compressed → Age encrypted
|
||||
- **Encryption**: Encrypted with Age
|
||||
- **Purpose**: Contains full file metadata, chunk mappings, and all relationships
|
||||
- **Why encrypted**: Contains sensitive metadata like file paths, permissions, and ownership
|
||||
@@ -67,7 +68,7 @@ Each snapshot has its own subdirectory named with the snapshot ID.
|
||||
- **Structure**:
|
||||
```json
|
||||
{
|
||||
"snapshot_id": "laptop-20240115-143052Z",
|
||||
"snapshot_id": "laptop_home_2024-01-15T14:30:52Z",
|
||||
"timestamp": "2024-01-15T14:30:52Z",
|
||||
"blob_count": 42,
|
||||
"blobs": [
|
||||
|
||||
2
go.mod
2
go.mod
@@ -13,12 +13,10 @@ require (
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.90.0
|
||||
github.com/aws/smithy-go v1.23.2
|
||||
github.com/dustin/go-humanize v1.0.1
|
||||
github.com/fsnotify/fsnotify v1.9.0
|
||||
github.com/gobwas/glob v0.2.3
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/johannesboyne/gofakes3 v0.0.0-20250603205740-ed9094be7668
|
||||
github.com/klauspost/compress v1.18.1
|
||||
github.com/mattn/go-sqlite3 v1.14.29
|
||||
github.com/rclone/rclone v1.72.1
|
||||
github.com/schollz/progressbar/v3 v3.19.0
|
||||
github.com/spf13/afero v1.15.0
|
||||
|
||||
6
go.sum
6
go.sum
@@ -286,8 +286,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2
|
||||
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
|
||||
github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg=
|
||||
github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
|
||||
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
|
||||
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
|
||||
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
|
||||
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
|
||||
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
|
||||
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
|
||||
github.com/gabriel-vasile/mimetype v1.4.11 h1:AQvxbp830wPhHTqc1u7nzoLT+ZFxGY7emj5DR5DYFik=
|
||||
@@ -593,8 +593,6 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
|
||||
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
|
||||
github.com/mattn/go-runewidth v0.0.19 h1:v++JhqYnZuu5jSKrk9RbgF5v4CGUjqRfBm05byFGLdw=
|
||||
github.com/mattn/go-runewidth v0.0.19/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs=
|
||||
github.com/mattn/go-sqlite3 v1.14.29 h1:1O6nRLJKvsi1H2Sj0Hzdfojwt8GiGKm+LOfLaBFaouQ=
|
||||
github.com/mattn/go-sqlite3 v1.14.29/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
|
||||
github.com/miekg/dns v1.1.41 h1:WMszZWJG0XmzbK9FEmzH2TVcqYzFesusSIB41b8KHxY=
|
||||
|
||||
@@ -18,7 +18,7 @@ func TestCLIEntry(t *testing.T) {
|
||||
}
|
||||
|
||||
// Verify all subcommands are registered
|
||||
expectedCommands := []string{"snapshot", "store", "restore", "prune", "verify", "info", "version"}
|
||||
expectedCommands := []string{"snapshot", "store", "restore", "prune", "info", "version", "remote", "database"}
|
||||
for _, expected := range expectedCommands {
|
||||
found := false
|
||||
for _, cmd := range cmd.Commands() {
|
||||
@@ -38,7 +38,7 @@ func TestCLIEntry(t *testing.T) {
|
||||
t.Errorf("Failed to find snapshot command: %v", err)
|
||||
} else {
|
||||
// Check snapshot subcommands
|
||||
expectedSubCommands := []string{"create", "list", "purge", "verify"}
|
||||
expectedSubCommands := []string{"create", "list", "purge", "verify", "cleanup"}
|
||||
for _, expected := range expectedSubCommands {
|
||||
found := false
|
||||
for _, subcmd := range snapshotCmd.Commands() {
|
||||
|
||||
@@ -1,101 +0,0 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||
"git.eeqj.de/sneak/vaultik/internal/vaultik"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/fx"
|
||||
)
|
||||
|
||||
// NewPurgeCommand creates the purge command
|
||||
func NewPurgeCommand() *cobra.Command {
|
||||
opts := &vaultik.SnapshotPurgeOptions{}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "purge",
|
||||
Short: "Purge old snapshots",
|
||||
Long: `Removes snapshots based on age or count criteria.
|
||||
|
||||
This command allows you to:
|
||||
- Keep only the latest snapshot per name (--keep-latest)
|
||||
- Remove snapshots older than a specific duration (--older-than)
|
||||
- Filter to a specific snapshot name (--name)
|
||||
|
||||
When --keep-latest is used, retention is applied per snapshot name. For example,
|
||||
if you have snapshots named "home" and "system", --keep-latest keeps the most
|
||||
recent of each.
|
||||
|
||||
Use --name to restrict the purge to a single snapshot name.
|
||||
|
||||
Config is located at /etc/vaultik/config.yml by default, but can be overridden by
|
||||
specifying a path using --config or by setting VAULTIK_CONFIG to a path.`,
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
// Validate flags
|
||||
if !opts.KeepLatest && opts.OlderThan == "" {
|
||||
return fmt.Errorf("must specify either --keep-latest or --older-than")
|
||||
}
|
||||
if opts.KeepLatest && opts.OlderThan != "" {
|
||||
return fmt.Errorf("cannot specify both --keep-latest and --older-than")
|
||||
}
|
||||
|
||||
// Use unified config resolution
|
||||
configPath, err := ResolveConfigPath()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Use the app framework like other commands
|
||||
rootFlags := GetRootFlags()
|
||||
return RunWithApp(cmd.Context(), AppOptions{
|
||||
ConfigPath: configPath,
|
||||
LogOptions: log.LogOptions{
|
||||
Verbose: rootFlags.Verbose,
|
||||
Debug: rootFlags.Debug,
|
||||
Quiet: rootFlags.Quiet,
|
||||
},
|
||||
Modules: []fx.Option{},
|
||||
Invokes: []fx.Option{
|
||||
fx.Invoke(func(v *vaultik.Vaultik, lc fx.Lifecycle) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
// Start the purge operation in a goroutine
|
||||
go func() {
|
||||
// Run the purge operation
|
||||
if err := v.PurgeSnapshotsWithOptions(opts); err != nil {
|
||||
if err != context.Canceled {
|
||||
log.Error("Purge operation failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown the app when purge completes
|
||||
if err := v.Shutdowner.Shutdown(); err != nil {
|
||||
log.Error("Failed to shutdown", "error", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
},
|
||||
OnStop: func(ctx context.Context) error {
|
||||
log.Debug("Stopping purge operation")
|
||||
v.Cancel()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}),
|
||||
},
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().BoolVar(&opts.KeepLatest, "keep-latest", false, "Keep only the latest snapshot per name")
|
||||
cmd.Flags().StringVar(&opts.OlderThan, "older-than", "", "Remove snapshots older than duration (e.g. 30d, 6m, 1y)")
|
||||
cmd.Flags().BoolVar(&opts.Force, "force", false, "Skip confirmation prompts")
|
||||
cmd.Flags().StringVar(&opts.Name, "name", "", "Filter purge to a specific snapshot name")
|
||||
|
||||
return cmd
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/config"
|
||||
"git.eeqj.de/sneak/vaultik/internal/globals"
|
||||
@@ -130,6 +131,7 @@ func buildRestoreInvokes(snapshotID string, opts *RestoreOptions) []fx.Option {
|
||||
if err := app.Vaultik.Restore(restoreOpts); err != nil {
|
||||
if err != context.Canceled {
|
||||
log.Error("Restore operation failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ func NewRootCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "vaultik",
|
||||
Short: "Secure incremental backup tool with asymmetric encryption",
|
||||
Long: `vaultik is a secure incremental backup daemon that encrypts data using age
|
||||
Long: `vaultik is a secure incremental backup tool that encrypts data using age
|
||||
public keys and uploads to S3-compatible storage. No private keys are needed
|
||||
on the source system.`,
|
||||
SilenceUsage: true,
|
||||
@@ -41,7 +41,6 @@ on the source system.`,
|
||||
cmd.AddCommand(
|
||||
NewRestoreCommand(),
|
||||
NewPruneCommand(),
|
||||
NewVerifyCommand(),
|
||||
NewStoreCommand(),
|
||||
NewSnapshotCommand(),
|
||||
NewInfoCommand(),
|
||||
@@ -80,5 +79,5 @@ func ResolveConfigPath() (string, error) {
|
||||
return defaultPath, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("no config file specified, VAULTIK_CONFIG not set, and %s not found", defaultPath)
|
||||
return "", fmt.Errorf("no config file found; specify one with --config, set VAULTIK_CONFIG, or create %s", defaultPath)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package cli
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||
@@ -26,6 +27,7 @@ func NewSnapshotCommand() *cobra.Command {
|
||||
cmd.AddCommand(newSnapshotVerifyCommand())
|
||||
cmd.AddCommand(newSnapshotRemoveCommand())
|
||||
cmd.AddCommand(newSnapshotPruneCommand())
|
||||
cmd.AddCommand(newSnapshotCleanupCommand())
|
||||
|
||||
return cmd
|
||||
}
|
||||
@@ -71,10 +73,13 @@ specifying a path using --config or by setting VAULTIK_CONFIG to a path.`,
|
||||
OnStart: func(ctx context.Context) error {
|
||||
// Start the snapshot creation in a goroutine
|
||||
go func() {
|
||||
// Run the snapshot creation
|
||||
if opts.Cron {
|
||||
v.Stdout = io.Discard
|
||||
}
|
||||
if err := v.CreateSnapshot(opts); err != nil {
|
||||
if err != context.Canceled {
|
||||
log.Error("Snapshot creation failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,9 +103,9 @@ specifying a path using --config or by setting VAULTIK_CONFIG to a path.`,
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().BoolVar(&opts.Daemon, "daemon", false, "Run in daemon mode with inotify monitoring")
|
||||
cmd.Flags().BoolVar(&opts.Cron, "cron", false, "Run in cron mode (silent unless error)")
|
||||
cmd.Flags().BoolVar(&opts.Prune, "prune", false, "Delete all previous snapshots and unreferenced blobs after backup")
|
||||
cmd.Flags().BoolVar(&opts.Prune, "prune", false, "After backup, drop older snapshots of the same name and remove orphaned blobs")
|
||||
cmd.Flags().StringVar(&opts.KeepNewerThan, "keep-newer-than", "", "With --prune: keep snapshots newer than this duration (e.g. 4w, 30d, 6mo) instead of only the latest")
|
||||
cmd.Flags().BoolVar(&opts.SkipErrors, "skip-errors", false, "Skip file read errors (log them loudly but continue)")
|
||||
|
||||
return cmd
|
||||
@@ -174,11 +179,9 @@ func newSnapshotPurgeCommand() *cobra.Command {
|
||||
Short: "Purge old snapshots",
|
||||
Long: `Removes snapshots based on age or count criteria.
|
||||
|
||||
When --keep-latest is used, retention is applied per snapshot name. For example,
|
||||
if you have snapshots named "home" and "system", --keep-latest keeps the most
|
||||
recent of each.
|
||||
|
||||
Use --name to restrict the purge to a single snapshot name.`,
|
||||
Retention is per-snapshot-name: --keep-latest keeps the latest of each
|
||||
configured snapshot name, not the latest globally. Use --snapshot to
|
||||
restrict the operation to specific snapshot names.`,
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
// Validate flags
|
||||
@@ -232,10 +235,10 @@ Use --name to restrict the purge to a single snapshot name.`,
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().BoolVar(&opts.KeepLatest, "keep-latest", false, "Keep only the latest snapshot per name")
|
||||
cmd.Flags().BoolVar(&opts.KeepLatest, "keep-latest", false, "Keep only the latest snapshot of each name")
|
||||
cmd.Flags().StringVar(&opts.OlderThan, "older-than", "", "Remove snapshots older than duration (e.g., 30d, 6m, 1y)")
|
||||
cmd.Flags().BoolVar(&opts.Force, "force", false, "Skip confirmation prompt")
|
||||
cmd.Flags().StringVar(&opts.Name, "name", "", "Filter purge to a specific snapshot name")
|
||||
cmd.Flags().StringArrayVar(&opts.Names, "snapshot", nil, "Restrict to snapshots with these names (repeat for multiple)")
|
||||
|
||||
return cmd
|
||||
}
|
||||
@@ -281,13 +284,7 @@ func newSnapshotVerifyCommand() *cobra.Command {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
go func() {
|
||||
var err error
|
||||
if opts.Deep {
|
||||
err = v.RunDeepVerify(snapshotID, opts)
|
||||
} else {
|
||||
err = v.VerifySnapshotWithOptions(snapshotID, opts)
|
||||
}
|
||||
if err != nil {
|
||||
if err := v.VerifySnapshotWithOptions(snapshotID, opts); err != nil {
|
||||
if err != context.Canceled {
|
||||
if !opts.JSON {
|
||||
log.Error("Verification failed", "error", err)
|
||||
@@ -470,3 +467,60 @@ accumulate from incomplete backups or deleted snapshots.`,
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
// newSnapshotCleanupCommand creates the 'snapshot cleanup' subcommand
|
||||
func newSnapshotCleanupCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "cleanup",
|
||||
Short: "Remove stale local snapshot records not found in remote storage",
|
||||
Long: `Removes local database records for snapshots whose metadata no longer
|
||||
exists in remote storage. These are typically left behind by incomplete
|
||||
or interrupted backups.
|
||||
|
||||
This command does not delete anything from remote storage.`,
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
configPath, err := ResolveConfigPath()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rootFlags := GetRootFlags()
|
||||
return RunWithApp(cmd.Context(), AppOptions{
|
||||
ConfigPath: configPath,
|
||||
LogOptions: log.LogOptions{
|
||||
Verbose: rootFlags.Verbose,
|
||||
Debug: rootFlags.Debug,
|
||||
Quiet: rootFlags.Quiet,
|
||||
},
|
||||
Modules: []fx.Option{},
|
||||
Invokes: []fx.Option{
|
||||
fx.Invoke(func(v *vaultik.Vaultik, lc fx.Lifecycle) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
go func() {
|
||||
if err := v.CleanupLocalSnapshots(); err != nil {
|
||||
if err != context.Canceled {
|
||||
log.Error("Cleanup failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
if err := v.Shutdowner.Shutdown(); err != nil {
|
||||
log.Error("Failed to shutdown", "error", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
},
|
||||
OnStop: func(ctx context.Context) error {
|
||||
v.Cancel()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}),
|
||||
},
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
@@ -1,98 +0,0 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||
"git.eeqj.de/sneak/vaultik/internal/vaultik"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/fx"
|
||||
)
|
||||
|
||||
// NewVerifyCommand creates the verify command
|
||||
func NewVerifyCommand() *cobra.Command {
|
||||
opts := &vaultik.VerifyOptions{}
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "verify <snapshot-id>",
|
||||
Short: "Verify snapshot integrity",
|
||||
Long: `Verifies that all blobs referenced in a snapshot exist and optionally verifies their contents.
|
||||
|
||||
Shallow verification (default):
|
||||
- Downloads and decompresses manifest
|
||||
- Checks existence of all blobs in S3
|
||||
- Reports missing blobs
|
||||
|
||||
Deep verification (--deep):
|
||||
- Downloads and decrypts database
|
||||
- Verifies blob lists match between manifest and database
|
||||
- Downloads, decrypts, and decompresses each blob
|
||||
- Verifies SHA256 hash of each chunk matches database
|
||||
- Ensures chunks are ordered correctly
|
||||
|
||||
The command will fail immediately on any verification error and exit with non-zero status.`,
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
snapshotID := args[0]
|
||||
|
||||
// Use unified config resolution
|
||||
configPath, err := ResolveConfigPath()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Use the app framework for all verification
|
||||
rootFlags := GetRootFlags()
|
||||
return RunWithApp(cmd.Context(), AppOptions{
|
||||
ConfigPath: configPath,
|
||||
LogOptions: log.LogOptions{
|
||||
Verbose: rootFlags.Verbose,
|
||||
Debug: rootFlags.Debug,
|
||||
Quiet: rootFlags.Quiet || opts.JSON, // Suppress log output in JSON mode
|
||||
},
|
||||
Modules: []fx.Option{},
|
||||
Invokes: []fx.Option{
|
||||
fx.Invoke(func(v *vaultik.Vaultik, lc fx.Lifecycle) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
// Run the verify operation directly
|
||||
go func() {
|
||||
var err error
|
||||
if opts.Deep {
|
||||
err = v.RunDeepVerify(snapshotID, opts)
|
||||
} else {
|
||||
err = v.VerifySnapshotWithOptions(snapshotID, opts)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if err != context.Canceled {
|
||||
if !opts.JSON {
|
||||
log.Error("Verification failed", "error", err)
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
if err := v.Shutdowner.Shutdown(); err != nil {
|
||||
log.Error("Failed to shutdown", "error", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
},
|
||||
OnStop: func(ctx context.Context) error {
|
||||
log.Debug("Stopping verify operation")
|
||||
v.Cancel()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}),
|
||||
},
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().BoolVar(&opts.Deep, "deep", false, "Perform deep verification by downloading and verifying all blob contents")
|
||||
cmd.Flags().BoolVar(&opts.JSON, "json", false, "Output verification results as JSON")
|
||||
|
||||
return cmd
|
||||
}
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"filippo.io/age"
|
||||
"git.eeqj.de/sneak/smartconfig"
|
||||
@@ -83,19 +82,16 @@ func (c *Config) SnapshotNames() []string {
|
||||
// encryption recipients, storage configuration, and performance tuning parameters.
|
||||
// Configuration is typically loaded from a YAML file.
|
||||
type Config struct {
|
||||
AgeRecipients []string `yaml:"age_recipients"`
|
||||
AgeSecretKey string `yaml:"age_secret_key"`
|
||||
BackupInterval time.Duration `yaml:"backup_interval"`
|
||||
BlobSizeLimit Size `yaml:"blob_size_limit"`
|
||||
ChunkSize Size `yaml:"chunk_size"`
|
||||
Exclude []string `yaml:"exclude"` // Global excludes applied to all snapshots
|
||||
FullScanInterval time.Duration `yaml:"full_scan_interval"`
|
||||
Hostname string `yaml:"hostname"`
|
||||
IndexPath string `yaml:"index_path"`
|
||||
MinTimeBetweenRun time.Duration `yaml:"min_time_between_run"`
|
||||
S3 S3Config `yaml:"s3"`
|
||||
Snapshots map[string]SnapshotConfig `yaml:"snapshots"`
|
||||
CompressionLevel int `yaml:"compression_level"`
|
||||
AgeRecipients []string `yaml:"age_recipients"`
|
||||
AgeSecretKey string `yaml:"age_secret_key"`
|
||||
BlobSizeLimit Size `yaml:"blob_size_limit"`
|
||||
ChunkSize Size `yaml:"chunk_size"`
|
||||
Exclude []string `yaml:"exclude"` // Global excludes applied to all snapshots
|
||||
Hostname string `yaml:"hostname"`
|
||||
IndexPath string `yaml:"index_path"`
|
||||
S3 S3Config `yaml:"s3"`
|
||||
Snapshots map[string]SnapshotConfig `yaml:"snapshots"`
|
||||
CompressionLevel int `yaml:"compression_level"`
|
||||
|
||||
// StorageURL specifies the storage backend using a URL format.
|
||||
// Takes precedence over S3Config if set.
|
||||
@@ -155,13 +151,10 @@ func Load(path string) (*Config, error) {
|
||||
|
||||
cfg := &Config{
|
||||
// Set defaults
|
||||
BlobSizeLimit: Size(10 * 1024 * 1024 * 1024), // 10GB
|
||||
ChunkSize: Size(10 * 1024 * 1024), // 10MB
|
||||
BackupInterval: 1 * time.Hour,
|
||||
FullScanInterval: 24 * time.Hour,
|
||||
MinTimeBetweenRun: 15 * time.Minute,
|
||||
IndexPath: filepath.Join(xdg.DataHome, appName, "index.sqlite"),
|
||||
CompressionLevel: 3,
|
||||
BlobSizeLimit: Size(10 * 1024 * 1024 * 1024), // 10GB
|
||||
ChunkSize: Size(10 * 1024 * 1024), // 10MB
|
||||
IndexPath: filepath.Join(xdg.DataHome, appName, "index.sqlite"),
|
||||
CompressionLevel: 3,
|
||||
}
|
||||
|
||||
// Convert smartconfig data to YAML then unmarshal
|
||||
@@ -243,11 +236,11 @@ func Load(path string) (*Config, error) {
|
||||
// Returns an error describing the first validation failure encountered.
|
||||
func (c *Config) Validate() error {
|
||||
if len(c.AgeRecipients) == 0 {
|
||||
return fmt.Errorf("at least one age_recipient is required")
|
||||
return fmt.Errorf("at least one age_recipient is required (generate with: age-keygen)")
|
||||
}
|
||||
|
||||
if len(c.Snapshots) == 0 {
|
||||
return fmt.Errorf("at least one snapshot must be configured")
|
||||
return fmt.Errorf("at least one snapshot must be configured (see config.example.yml)")
|
||||
}
|
||||
|
||||
for name, snap := range c.Snapshots {
|
||||
@@ -306,7 +299,7 @@ func (c *Config) validateStorage() error {
|
||||
|
||||
// Legacy S3 configuration
|
||||
if c.S3.Endpoint == "" {
|
||||
return fmt.Errorf("s3.endpoint is required (or set storage_url)")
|
||||
return fmt.Errorf("storage not configured; set storage_url or provide s3.endpoint + s3.bucket + credentials")
|
||||
}
|
||||
|
||||
if c.S3.Bucket == "" {
|
||||
|
||||
@@ -6,24 +6,32 @@
|
||||
// multiple source files. Blobs are content-addressed, meaning their filename
|
||||
// is derived from their SHA256 hash after compression and encryption.
|
||||
//
|
||||
// The database does not support migrations. If the schema changes, delete
|
||||
// the local database and perform a full backup to recreate it.
|
||||
// Schema is managed via numbered SQL migrations embedded in the schema/
|
||||
// directory. Migration 000.sql bootstraps the schema_migrations tracking
|
||||
// table; subsequent migrations (001, 002, …) are applied in order.
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"embed"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
//go:embed schema.sql
|
||||
var schemaSQL string
|
||||
//go:embed schema/*.sql
|
||||
var schemaFS embed.FS
|
||||
|
||||
// bootstrapVersion is the migration that creates the schema_migrations
|
||||
// table itself. It is applied before the normal migration loop.
|
||||
const bootstrapVersion = 0
|
||||
|
||||
// DB represents the Vaultik local index database connection.
|
||||
// It uses SQLite to track file metadata, content-defined chunks, and blob associations.
|
||||
@@ -35,6 +43,46 @@ type DB struct {
|
||||
path string
|
||||
}
|
||||
|
||||
// ParseMigrationVersion extracts the numeric version prefix from a migration
|
||||
// filename. Filenames must follow the pattern "<version>.sql" or
|
||||
// "<version>_<description>.sql", where version is a zero-padded numeric
|
||||
// string (e.g. "001", "002"). Returns the version as an integer and an
|
||||
// error if the filename does not match the expected pattern.
|
||||
func ParseMigrationVersion(filename string) (int, error) {
|
||||
name := strings.TrimSuffix(filename, filepath.Ext(filename))
|
||||
if name == "" {
|
||||
return 0, fmt.Errorf("invalid migration filename %q: empty name", filename)
|
||||
}
|
||||
|
||||
// Split on underscore to separate version from description.
|
||||
// If there's no underscore, the entire stem is the version.
|
||||
versionStr := name
|
||||
if idx := strings.IndexByte(name, '_'); idx >= 0 {
|
||||
versionStr = name[:idx]
|
||||
}
|
||||
|
||||
if versionStr == "" {
|
||||
return 0, fmt.Errorf("invalid migration filename %q: empty version prefix", filename)
|
||||
}
|
||||
|
||||
// Validate the version is purely numeric.
|
||||
for _, ch := range versionStr {
|
||||
if ch < '0' || ch > '9' {
|
||||
return 0, fmt.Errorf(
|
||||
"invalid migration filename %q: version %q contains non-numeric character %q",
|
||||
filename, versionStr, string(ch),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
version, err := strconv.Atoi(versionStr)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid migration filename %q: %w", filename, err)
|
||||
}
|
||||
|
||||
return version, nil
|
||||
}
|
||||
|
||||
// New creates a new database connection at the specified path.
|
||||
// It creates the schema if needed and configures SQLite with WAL mode for
|
||||
// better concurrency. SQLite handles crash recovery automatically when
|
||||
@@ -72,9 +120,9 @@ func New(ctx context.Context, path string) (*DB, error) {
|
||||
}
|
||||
|
||||
db := &DB{conn: conn, path: path}
|
||||
if err := db.createSchema(ctx); err != nil {
|
||||
if err := applyMigrations(ctx, conn); err != nil {
|
||||
_ = conn.Close()
|
||||
return nil, fmt.Errorf("creating schema: %w", err)
|
||||
return nil, fmt.Errorf("applying migrations: %w", err)
|
||||
}
|
||||
return db, nil
|
||||
}
|
||||
@@ -125,9 +173,9 @@ func New(ctx context.Context, path string) (*DB, error) {
|
||||
}
|
||||
|
||||
db := &DB{conn: conn, path: path}
|
||||
if err := db.createSchema(ctx); err != nil {
|
||||
if err := applyMigrations(ctx, conn); err != nil {
|
||||
_ = conn.Close()
|
||||
return nil, fmt.Errorf("creating schema: %w", err)
|
||||
return nil, fmt.Errorf("applying migrations: %w", err)
|
||||
}
|
||||
|
||||
log.Debug("Database connection established successfully", "path", path)
|
||||
@@ -198,9 +246,120 @@ func (db *DB) QueryRowWithLog(
|
||||
return db.conn.QueryRowContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (db *DB) createSchema(ctx context.Context) error {
|
||||
_, err := db.conn.ExecContext(ctx, schemaSQL)
|
||||
return err
|
||||
// collectMigrations reads the embedded schema directory and returns
|
||||
// migration filenames sorted lexicographically.
|
||||
func collectMigrations() ([]string, error) {
|
||||
entries, err := schemaFS.ReadDir("schema")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read schema directory: %w", err)
|
||||
}
|
||||
|
||||
var migrations []string
|
||||
|
||||
for _, entry := range entries {
|
||||
if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".sql") {
|
||||
migrations = append(migrations, entry.Name())
|
||||
}
|
||||
}
|
||||
|
||||
sort.Strings(migrations)
|
||||
|
||||
return migrations, nil
|
||||
}
|
||||
|
||||
// bootstrapMigrationsTable ensures the schema_migrations table exists
|
||||
// by applying 000.sql if the table is missing.
|
||||
func bootstrapMigrationsTable(ctx context.Context, db *sql.DB) error {
|
||||
var tableExists int
|
||||
|
||||
err := db.QueryRowContext(ctx,
|
||||
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_migrations'",
|
||||
).Scan(&tableExists)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check for migrations table: %w", err)
|
||||
}
|
||||
|
||||
if tableExists > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
content, err := schemaFS.ReadFile("schema/000.sql")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read bootstrap migration 000.sql: %w", err)
|
||||
}
|
||||
|
||||
log.Info("applying bootstrap migration", "version", bootstrapVersion)
|
||||
|
||||
_, err = db.ExecContext(ctx, string(content))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to apply bootstrap migration: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// applyMigrations applies all pending migrations to db. It first bootstraps
|
||||
// the schema_migrations table via 000.sql, then iterates through remaining
|
||||
// migration files in order.
|
||||
func applyMigrations(ctx context.Context, db *sql.DB) error {
|
||||
if err := bootstrapMigrationsTable(ctx, db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
migrations, err := collectMigrations()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, migration := range migrations {
|
||||
version, parseErr := ParseMigrationVersion(migration)
|
||||
if parseErr != nil {
|
||||
return parseErr
|
||||
}
|
||||
|
||||
// Check if already applied.
|
||||
var count int
|
||||
|
||||
err := db.QueryRowContext(ctx,
|
||||
"SELECT COUNT(*) FROM schema_migrations WHERE version = ?",
|
||||
version,
|
||||
).Scan(&count)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check migration status: %w", err)
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
log.Debug("migration already applied", "version", version)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// Read and apply migration.
|
||||
content, readErr := schemaFS.ReadFile(filepath.Join("schema", migration))
|
||||
if readErr != nil {
|
||||
return fmt.Errorf("failed to read migration %s: %w", migration, readErr)
|
||||
}
|
||||
|
||||
log.Info("applying migration", "version", version)
|
||||
|
||||
_, execErr := db.ExecContext(ctx, string(content))
|
||||
if execErr != nil {
|
||||
return fmt.Errorf("failed to apply migration %s: %w", migration, execErr)
|
||||
}
|
||||
|
||||
// Record migration as applied.
|
||||
_, recErr := db.ExecContext(ctx,
|
||||
"INSERT INTO schema_migrations (version) VALUES (?)",
|
||||
version,
|
||||
)
|
||||
if recErr != nil {
|
||||
return fmt.Errorf("failed to record migration %s: %w", migration, recErr)
|
||||
}
|
||||
|
||||
log.Info("migration applied successfully", "version", version)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewTestDB creates an in-memory SQLite database for testing purposes.
|
||||
|
||||
@@ -2,6 +2,7 @@ package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
@@ -26,9 +27,10 @@ func TestDatabase(t *testing.T) {
|
||||
t.Fatal("database connection is nil")
|
||||
}
|
||||
|
||||
// Test schema creation (already done in New)
|
||||
// Test schema creation (already done in New via migrations)
|
||||
// Verify tables exist
|
||||
tables := []string{
|
||||
"schema_migrations",
|
||||
"files", "file_chunks", "chunks", "blobs",
|
||||
"blob_chunks", "chunk_files", "snapshots",
|
||||
}
|
||||
@@ -99,3 +101,139 @@ func TestDatabaseConcurrentAccess(t *testing.T) {
|
||||
t.Errorf("expected 10 chunks, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseMigrationVersion(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
filename string
|
||||
wantVer int
|
||||
wantError bool
|
||||
}{
|
||||
{name: "valid 000.sql", filename: "000.sql", wantVer: 0, wantError: false},
|
||||
{name: "valid 001.sql", filename: "001.sql", wantVer: 1, wantError: false},
|
||||
{name: "valid 099.sql", filename: "099.sql", wantVer: 99, wantError: false},
|
||||
{name: "valid with description", filename: "001_initial_schema.sql", wantVer: 1, wantError: false},
|
||||
{name: "valid large version", filename: "123_big_migration.sql", wantVer: 123, wantError: false},
|
||||
{name: "invalid alpha version", filename: "abc.sql", wantVer: 0, wantError: true},
|
||||
{name: "invalid mixed chars", filename: "12a.sql", wantVer: 0, wantError: true},
|
||||
{name: "invalid no extension", filename: "schema.sql", wantVer: 0, wantError: true},
|
||||
{name: "empty string", filename: "", wantVer: 0, wantError: true},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got, err := ParseMigrationVersion(tc.filename)
|
||||
if tc.wantError {
|
||||
if err == nil {
|
||||
t.Errorf("ParseMigrationVersion(%q) = %d, nil; want error", tc.filename, got)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("ParseMigrationVersion(%q) unexpected error: %v", tc.filename, err)
|
||||
return
|
||||
}
|
||||
if got != tc.wantVer {
|
||||
t.Errorf("ParseMigrationVersion(%q) = %d; want %d", tc.filename, got, tc.wantVer)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyMigrations_Idempotent(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
conn, err := sql.Open("sqlite", ":memory:?_foreign_keys=ON")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open database: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := conn.Close(); err != nil {
|
||||
t.Errorf("failed to close database: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
conn.SetMaxOpenConns(1)
|
||||
conn.SetMaxIdleConns(1)
|
||||
|
||||
// First run: apply all migrations.
|
||||
if err := applyMigrations(ctx, conn); err != nil {
|
||||
t.Fatalf("first applyMigrations failed: %v", err)
|
||||
}
|
||||
|
||||
// Count rows in schema_migrations after first run.
|
||||
var countBefore int
|
||||
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM schema_migrations").Scan(&countBefore); err != nil {
|
||||
t.Fatalf("failed to count schema_migrations after first run: %v", err)
|
||||
}
|
||||
|
||||
// Second run: must be a no-op.
|
||||
if err := applyMigrations(ctx, conn); err != nil {
|
||||
t.Fatalf("second applyMigrations failed: %v", err)
|
||||
}
|
||||
|
||||
// Count rows in schema_migrations after second run — must be unchanged.
|
||||
var countAfter int
|
||||
if err := conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM schema_migrations").Scan(&countAfter); err != nil {
|
||||
t.Fatalf("failed to count schema_migrations after second run: %v", err)
|
||||
}
|
||||
|
||||
if countBefore != countAfter {
|
||||
t.Errorf("schema_migrations row count changed: before=%d, after=%d", countBefore, countAfter)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBootstrapMigrationsTable_FreshDatabase(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
conn, err := sql.Open("sqlite", ":memory:?_foreign_keys=ON")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open database: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := conn.Close(); err != nil {
|
||||
t.Errorf("failed to close database: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
conn.SetMaxOpenConns(1)
|
||||
conn.SetMaxIdleConns(1)
|
||||
|
||||
// Verify schema_migrations does NOT exist yet.
|
||||
var tableBefore int
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_migrations'",
|
||||
).Scan(&tableBefore); err != nil {
|
||||
t.Fatalf("failed to check for table before bootstrap: %v", err)
|
||||
}
|
||||
if tableBefore != 0 {
|
||||
t.Fatal("schema_migrations table should not exist before bootstrap")
|
||||
}
|
||||
|
||||
// Run bootstrap.
|
||||
if err := bootstrapMigrationsTable(ctx, conn); err != nil {
|
||||
t.Fatalf("bootstrapMigrationsTable failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify schema_migrations now exists.
|
||||
var tableAfter int
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_migrations'",
|
||||
).Scan(&tableAfter); err != nil {
|
||||
t.Fatalf("failed to check for table after bootstrap: %v", err)
|
||||
}
|
||||
if tableAfter != 1 {
|
||||
t.Fatalf("schema_migrations table should exist after bootstrap, got count=%d", tableAfter)
|
||||
}
|
||||
|
||||
// Verify version 0 row exists.
|
||||
var version int
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
"SELECT version FROM schema_migrations WHERE version = 0",
|
||||
).Scan(&version); err != nil {
|
||||
t.Fatalf("version 0 row not found in schema_migrations: %v", err)
|
||||
}
|
||||
if version != 0 {
|
||||
t.Errorf("expected version 0, got %d", version)
|
||||
}
|
||||
}
|
||||
|
||||
9
internal/database/schema/000.sql
Normal file
9
internal/database/schema/000.sql
Normal file
@@ -0,0 +1,9 @@
|
||||
-- Migration 000: Schema migrations tracking table
|
||||
-- Applied as a bootstrap step before the normal migration loop.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||
version INTEGER PRIMARY KEY,
|
||||
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
INSERT OR IGNORE INTO schema_migrations (version) VALUES (0);
|
||||
@@ -1,6 +1,5 @@
|
||||
-- Vaultik Database Schema
|
||||
-- Note: This database does not support migrations. If the schema changes,
|
||||
-- delete the local database and perform a full backup to recreate it.
|
||||
-- Migration 001: Initial Vaultik schema
|
||||
-- All core tables for tracking files, chunks, blobs, snapshots, and uploads.
|
||||
|
||||
-- Files table: stores metadata about files in the filesystem
|
||||
CREATE TABLE IF NOT EXISTS files (
|
||||
@@ -133,4 +132,4 @@ CREATE TABLE IF NOT EXISTS uploads (
|
||||
);
|
||||
|
||||
-- Index for efficient snapshot lookups
|
||||
CREATE INDEX IF NOT EXISTS idx_uploads_snapshot_id ON uploads(snapshot_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_uploads_snapshot_id ON uploads(snapshot_id);
|
||||
@@ -1,11 +0,0 @@
|
||||
-- Track blob upload metrics
|
||||
CREATE TABLE IF NOT EXISTS uploads (
|
||||
blob_hash TEXT PRIMARY KEY,
|
||||
uploaded_at TIMESTAMP NOT NULL,
|
||||
size INTEGER NOT NULL,
|
||||
duration_ms INTEGER NOT NULL,
|
||||
FOREIGN KEY (blob_hash) REFERENCES blobs(blob_hash)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_uploads_uploaded_at ON uploads(uploaded_at);
|
||||
CREATE INDEX idx_uploads_duration ON uploads(duration_ms);
|
||||
@@ -63,10 +63,3 @@ type Chunk struct {
|
||||
Offset int64
|
||||
Length int64
|
||||
}
|
||||
|
||||
// DirtyPath represents a path marked for backup by inotify
|
||||
type DirtyPath struct {
|
||||
Path string
|
||||
MarkedAt time.Time
|
||||
EventType string // "create", "modify", "delete"
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sync/atomic"
|
||||
|
||||
@@ -10,6 +11,7 @@ import (
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
"github.com/aws/smithy-go/logging"
|
||||
)
|
||||
|
||||
@@ -203,9 +205,12 @@ func (c *Client) HeadObject(ctx context.Context, key string) (bool, error) {
|
||||
Key: aws.String(fullKey),
|
||||
})
|
||||
if err != nil {
|
||||
// Check if it's a not found error
|
||||
// TODO: Add proper error type checking
|
||||
return false, nil
|
||||
var notFound *s3types.NotFound
|
||||
var noSuchKey *s3types.NoSuchKey
|
||||
if errors.As(err, ¬Found) || errors.As(err, &noSuchKey) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@@ -58,7 +59,8 @@ type Scanner struct {
|
||||
exclude []string // Glob patterns for files/directories to exclude
|
||||
compiledExclude []compiledPattern // Compiled glob patterns
|
||||
progress *ProgressReporter
|
||||
skipErrors bool // Skip file read errors (log loudly but continue)
|
||||
skipErrors bool // Skip file read errors (log loudly but continue)
|
||||
output io.Writer // User-facing output (os.Stdout or io.Discard in cron mode)
|
||||
|
||||
// In-memory cache of known chunk hashes for fast existence checks
|
||||
knownChunks map[string]struct{}
|
||||
@@ -139,6 +141,11 @@ func NewScanner(cfg ScannerConfig) *Scanner {
|
||||
// Compile exclude patterns
|
||||
compiledExclude := compileExcludePatterns(cfg.Exclude)
|
||||
|
||||
output := io.Writer(io.Discard)
|
||||
if cfg.EnableProgress {
|
||||
output = os.Stdout
|
||||
}
|
||||
|
||||
return &Scanner{
|
||||
fs: cfg.FS,
|
||||
chunker: chunker.NewChunker(cfg.ChunkSize),
|
||||
@@ -152,6 +159,7 @@ func NewScanner(cfg ScannerConfig) *Scanner {
|
||||
compiledExclude: compiledExclude,
|
||||
progress: progress,
|
||||
skipErrors: cfg.SkipErrors,
|
||||
output: output,
|
||||
pendingChunkHashes: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
@@ -202,7 +210,7 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
||||
|
||||
// Phase 1c: Associate unchanged files with this snapshot (no new records needed)
|
||||
if len(scanResult.UnchangedFileIDs) > 0 {
|
||||
fmt.Printf("Associating %s unchanged files with snapshot...\n", formatNumber(len(scanResult.UnchangedFileIDs)))
|
||||
_, _ = fmt.Fprintf(s.output, "Associating %s unchanged files with snapshot...\n", formatNumber(len(scanResult.UnchangedFileIDs)))
|
||||
if err := s.batchAddFilesToSnapshot(ctx, scanResult.UnchangedFileIDs); err != nil {
|
||||
return nil, fmt.Errorf("associating unchanged files: %w", err)
|
||||
}
|
||||
@@ -213,13 +221,13 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
||||
|
||||
// Phase 2: Process files and create chunks
|
||||
if len(filesToProcess) > 0 {
|
||||
fmt.Printf("Processing %s files...\n", formatNumber(len(filesToProcess)))
|
||||
_, _ = fmt.Fprintf(s.output, "Processing %s files...\n", formatNumber(len(filesToProcess)))
|
||||
log.Info("Phase 2/3: Creating snapshot (chunking, compressing, encrypting, and uploading blobs)")
|
||||
if err := s.processPhase(ctx, filesToProcess, result); err != nil {
|
||||
return nil, fmt.Errorf("process phase failed: %w", err)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf("No files need processing. Creating metadata-only snapshot.\n")
|
||||
_, _ = fmt.Fprintf(s.output, "No files need processing. Creating metadata-only snapshot.\n")
|
||||
log.Info("Phase 2/3: Skipping (no files need processing, metadata-only snapshot)")
|
||||
}
|
||||
|
||||
@@ -232,18 +240,18 @@ func (s *Scanner) Scan(ctx context.Context, path string, snapshotID string) (*Sc
|
||||
// loadDatabaseState loads known files and chunks from the database into memory for fast lookup
|
||||
// This avoids per-file and per-chunk database queries during the scan and process phases
|
||||
func (s *Scanner) loadDatabaseState(ctx context.Context, path string) (map[string]*database.File, error) {
|
||||
fmt.Println("Loading known files from database...")
|
||||
_, _ = fmt.Fprintln(s.output, "Loading known files from database...")
|
||||
knownFiles, err := s.loadKnownFiles(ctx, path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("loading known files: %w", err)
|
||||
}
|
||||
fmt.Printf("Loaded %s known files from database\n", formatNumber(len(knownFiles)))
|
||||
_, _ = fmt.Fprintf(s.output, "Loaded %s known files from database\n", formatNumber(len(knownFiles)))
|
||||
|
||||
fmt.Println("Loading known chunks from database...")
|
||||
_, _ = fmt.Fprintln(s.output, "Loading known chunks from database...")
|
||||
if err := s.loadKnownChunks(ctx); err != nil {
|
||||
return nil, fmt.Errorf("loading known chunks: %w", err)
|
||||
}
|
||||
fmt.Printf("Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks)))
|
||||
_, _ = fmt.Fprintf(s.output, "Loaded %s known chunks from database\n", formatNumber(len(s.knownChunks)))
|
||||
|
||||
return knownFiles, nil
|
||||
}
|
||||
@@ -267,17 +275,17 @@ func (s *Scanner) summarizeScanPhase(result *ScanResult, filesToProcess []*FileT
|
||||
"files_skipped", result.FilesSkipped,
|
||||
"bytes_skipped", humanize.Bytes(uint64(result.BytesSkipped)))
|
||||
|
||||
fmt.Printf("Scan complete: %s examined (%s), %s to process (%s)",
|
||||
_, _ = fmt.Fprintf(s.output, "Scan complete: %s examined (%s), %s to process (%s)",
|
||||
formatNumber(result.FilesScanned),
|
||||
humanize.Bytes(uint64(totalSizeToProcess+result.BytesSkipped)),
|
||||
formatNumber(len(filesToProcess)),
|
||||
humanize.Bytes(uint64(totalSizeToProcess)))
|
||||
if result.FilesDeleted > 0 {
|
||||
fmt.Printf(", %s deleted (%s)",
|
||||
_, _ = fmt.Fprintf(s.output, ", %s deleted (%s)",
|
||||
formatNumber(result.FilesDeleted),
|
||||
humanize.Bytes(uint64(result.BytesDeleted)))
|
||||
}
|
||||
fmt.Println()
|
||||
_, _ = fmt.Fprintln(s.output)
|
||||
}
|
||||
|
||||
// finalizeScanResult populates final blob statistics in the scan result
|
||||
@@ -619,7 +627,7 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
||||
if err != nil {
|
||||
if s.skipErrors {
|
||||
log.Error("ERROR: Failed to access file (skipping due to --skip-errors)", "path", filePath, "error", err)
|
||||
fmt.Printf("ERROR: Failed to access %s: %v (skipping)\n", filePath, err)
|
||||
_, _ = fmt.Fprintf(s.output, "ERROR: Failed to access %s: %v (skipping)\n", filePath, err)
|
||||
return nil // Continue scanning
|
||||
}
|
||||
log.Debug("Error accessing filesystem entry", "path", filePath, "error", err)
|
||||
@@ -641,7 +649,40 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
||||
return nil
|
||||
}
|
||||
|
||||
// Skip non-regular files for processing (but still count them)
|
||||
// Handle symlinks
|
||||
if info.Mode()&os.ModeSymlink != 0 {
|
||||
file := s.buildSymlinkEntry(filePath, info)
|
||||
if file != nil {
|
||||
existingFiles[filePath] = struct{}{}
|
||||
mu.Lock()
|
||||
filesToProcess = append(filesToProcess, &FileToProcess{
|
||||
Path: filePath,
|
||||
FileInfo: info,
|
||||
File: file,
|
||||
})
|
||||
filesScanned++
|
||||
mu.Unlock()
|
||||
s.updateScanEntryStats(result, true, info)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Handle directories (record for permission/ownership preservation and empty-dir support)
|
||||
if info.IsDir() {
|
||||
file := s.buildDirectoryEntry(filePath, info)
|
||||
existingFiles[filePath] = struct{}{}
|
||||
mu.Lock()
|
||||
filesToProcess = append(filesToProcess, &FileToProcess{
|
||||
Path: filePath,
|
||||
FileInfo: info,
|
||||
File: file,
|
||||
})
|
||||
filesScanned++
|
||||
mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Skip other non-regular files (devices, sockets, etc.)
|
||||
if !info.Mode().IsRegular() {
|
||||
return nil
|
||||
}
|
||||
@@ -673,7 +714,7 @@ func (s *Scanner) scanPhase(ctx context.Context, path string, result *ScanResult
|
||||
|
||||
// Output periodic status
|
||||
if time.Since(lastStatusTime) >= statusInterval {
|
||||
printScanProgressLine(filesScanned, changedCount, estimatedTotal, startTime)
|
||||
s.printScanProgressLine(filesScanned, changedCount, estimatedTotal, startTime)
|
||||
lastStatusTime = time.Now()
|
||||
}
|
||||
|
||||
@@ -714,7 +755,7 @@ func (s *Scanner) updateScanEntryStats(result *ScanResult, needsProcessing bool,
|
||||
|
||||
// printScanProgressLine prints a periodic progress line during the scan phase,
|
||||
// showing files scanned, percentage complete (if estimate available), and ETA
|
||||
func printScanProgressLine(filesScanned int64, changedCount int, estimatedTotal int64, startTime time.Time) {
|
||||
func (s *Scanner) printScanProgressLine(filesScanned int64, changedCount int, estimatedTotal int64, startTime time.Time) {
|
||||
elapsed := time.Since(startTime)
|
||||
rate := float64(filesScanned) / elapsed.Seconds()
|
||||
|
||||
@@ -732,19 +773,19 @@ func printScanProgressLine(filesScanned int64, changedCount int, estimatedTotal
|
||||
if rate > 0 && remaining > 0 {
|
||||
eta = time.Duration(float64(remaining)/rate) * time.Second
|
||||
}
|
||||
fmt.Printf("Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed",
|
||||
_, _ = fmt.Fprintf(s.output, "Scan: %s files (~%.0f%%), %s changed/new, %.0f files/sec, %s elapsed",
|
||||
formatNumber(int(filesScanned)),
|
||||
pct,
|
||||
formatNumber(changedCount),
|
||||
rate,
|
||||
elapsed.Round(time.Second))
|
||||
if eta > 0 {
|
||||
fmt.Printf(", ETA %s", eta.Round(time.Second))
|
||||
_, _ = fmt.Fprintf(s.output, ", ETA %s", eta.Round(time.Second))
|
||||
}
|
||||
fmt.Println()
|
||||
_, _ = fmt.Fprintln(s.output)
|
||||
} else {
|
||||
// First backup - no estimate available
|
||||
fmt.Printf("Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n",
|
||||
_, _ = fmt.Fprintf(s.output, "Scan: %s files, %s changed/new, %.0f files/sec, %s elapsed\n",
|
||||
formatNumber(int(filesScanned)),
|
||||
formatNumber(changedCount),
|
||||
rate,
|
||||
@@ -752,6 +793,71 @@ func printScanProgressLine(filesScanned int64, changedCount int, estimatedTotal
|
||||
}
|
||||
}
|
||||
|
||||
// buildSymlinkEntry creates a File record for a symlink.
|
||||
// Returns nil if the link target cannot be read.
|
||||
func (s *Scanner) buildSymlinkEntry(path string, info os.FileInfo) *database.File {
|
||||
target, err := os.Readlink(path)
|
||||
if err != nil {
|
||||
log.Debug("Cannot read symlink target", "path", path, "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var uid, gid uint32
|
||||
if stat, ok := info.Sys().(interface {
|
||||
Uid() uint32
|
||||
Gid() uint32
|
||||
}); ok {
|
||||
uid = stat.Uid()
|
||||
gid = stat.Gid()
|
||||
}
|
||||
|
||||
return &database.File{
|
||||
ID: types.NewFileID(),
|
||||
Path: types.FilePath(path),
|
||||
SourcePath: types.SourcePath(s.currentSourcePath),
|
||||
MTime: info.ModTime(),
|
||||
Size: 0,
|
||||
Mode: uint32(info.Mode()),
|
||||
UID: uid,
|
||||
GID: gid,
|
||||
LinkTarget: types.FilePath(target),
|
||||
}
|
||||
}
|
||||
|
||||
// buildDirectoryEntry creates a File record for a directory.
|
||||
func (s *Scanner) buildDirectoryEntry(path string, info os.FileInfo) *database.File {
|
||||
var uid, gid uint32
|
||||
if stat, ok := info.Sys().(interface {
|
||||
Uid() uint32
|
||||
Gid() uint32
|
||||
}); ok {
|
||||
uid = stat.Uid()
|
||||
gid = stat.Gid()
|
||||
}
|
||||
|
||||
return &database.File{
|
||||
ID: types.NewFileID(),
|
||||
Path: types.FilePath(path),
|
||||
SourcePath: types.SourcePath(s.currentSourcePath),
|
||||
MTime: info.ModTime(),
|
||||
Size: 0,
|
||||
Mode: uint32(info.Mode()),
|
||||
UID: uid,
|
||||
GID: gid,
|
||||
}
|
||||
}
|
||||
|
||||
// recordNonRegularFile writes a symlink or directory entry to the database
|
||||
// and associates it with the current snapshot. No chunking is performed.
|
||||
func (s *Scanner) recordNonRegularFile(ctx context.Context, ftp *FileToProcess) error {
|
||||
return s.repos.WithTx(ctx, func(txCtx context.Context, tx *sql.Tx) error {
|
||||
if err := s.repos.Files.Create(txCtx, tx, ftp.File); err != nil {
|
||||
return fmt.Errorf("creating non-regular file record: %w", err)
|
||||
}
|
||||
return s.repos.Snapshots.AddFileByID(txCtx, tx, s.snapshotID, ftp.File.ID)
|
||||
})
|
||||
}
|
||||
|
||||
// checkFileInMemory checks if a file needs processing using the in-memory map
|
||||
// No database access is performed - this is purely CPU/memory work
|
||||
func (s *Scanner) checkFileInMemory(path string, info os.FileInfo, knownFiles map[string]*database.File) (*database.File, bool) {
|
||||
@@ -849,7 +955,7 @@ func (s *Scanner) batchAddFilesToSnapshot(ctx context.Context, fileIDs []types.F
|
||||
elapsed := time.Since(startTime)
|
||||
rate := float64(end) / elapsed.Seconds()
|
||||
pct := float64(end) / float64(len(fileIDs)) * 100
|
||||
fmt.Printf("Associating files: %s/%s (%.1f%%), %.0f files/sec\n",
|
||||
_, _ = fmt.Fprintf(s.output, "Associating files: %s/%s (%.1f%%), %.0f files/sec\n",
|
||||
formatNumber(end), formatNumber(len(fileIDs)), pct, rate)
|
||||
lastStatusTime = time.Now()
|
||||
}
|
||||
@@ -857,7 +963,7 @@ func (s *Scanner) batchAddFilesToSnapshot(ctx context.Context, fileIDs []types.F
|
||||
|
||||
elapsed := time.Since(startTime)
|
||||
rate := float64(len(fileIDs)) / elapsed.Seconds()
|
||||
fmt.Printf("Associated %s unchanged files in %s (%.0f files/sec)\n",
|
||||
_, _ = fmt.Fprintf(s.output, "Associated %s unchanged files in %s (%.0f files/sec)\n",
|
||||
formatNumber(len(fileIDs)), elapsed.Round(time.Second), rate)
|
||||
|
||||
return nil
|
||||
@@ -905,7 +1011,7 @@ func (s *Scanner) processPhase(ctx context.Context, filesToProcess []*FileToProc
|
||||
|
||||
// Output periodic status
|
||||
if time.Since(lastStatusTime) >= statusInterval {
|
||||
printProcessingProgress(filesProcessed, totalFiles, bytesProcessed, totalBytes, startTime)
|
||||
s.printProcessingProgress(filesProcessed, totalFiles, bytesProcessed, totalBytes, startTime)
|
||||
lastStatusTime = time.Now()
|
||||
}
|
||||
}
|
||||
@@ -927,7 +1033,7 @@ func (s *Scanner) processFileWithErrorHandling(ctx context.Context, fileToProces
|
||||
// Skip file read errors if --skip-errors is enabled
|
||||
if s.skipErrors {
|
||||
log.Error("ERROR: Failed to process file (skipping due to --skip-errors)", "path", fileToProcess.Path, "error", err)
|
||||
fmt.Printf("ERROR: Failed to process %s: %v (skipping)\n", fileToProcess.Path, err)
|
||||
_, _ = fmt.Fprintf(s.output, "ERROR: Failed to process %s: %v (skipping)\n", fileToProcess.Path, err)
|
||||
result.FilesSkipped++
|
||||
return true, nil
|
||||
}
|
||||
@@ -938,7 +1044,7 @@ func (s *Scanner) processFileWithErrorHandling(ctx context.Context, fileToProces
|
||||
|
||||
// printProcessingProgress prints a periodic progress line during the process phase,
|
||||
// showing files processed, bytes transferred, throughput, and ETA
|
||||
func printProcessingProgress(filesProcessed, totalFiles int, bytesProcessed, totalBytes int64, startTime time.Time) {
|
||||
func (s *Scanner) printProcessingProgress(filesProcessed, totalFiles int, bytesProcessed, totalBytes int64, startTime time.Time) {
|
||||
elapsed := time.Since(startTime)
|
||||
pct := float64(bytesProcessed) / float64(totalBytes) * 100
|
||||
byteRate := float64(bytesProcessed) / elapsed.Seconds()
|
||||
@@ -952,7 +1058,7 @@ func printProcessingProgress(filesProcessed, totalFiles int, bytesProcessed, tot
|
||||
}
|
||||
|
||||
// Format: Progress [5.7k/610k] 6.7 GB/44 GB (15.4%), 106MB/sec, 500 files/sec, running for 1m30s, ETA: 5m49s
|
||||
fmt.Printf("Progress [%s/%s] %s/%s (%.1f%%), %s/sec, %.0f files/sec, running for %s",
|
||||
_, _ = fmt.Fprintf(s.output, "Progress [%s/%s] %s/%s (%.1f%%), %s/sec, %.0f files/sec, running for %s",
|
||||
formatCompact(filesProcessed),
|
||||
formatCompact(totalFiles),
|
||||
humanize.Bytes(uint64(bytesProcessed)),
|
||||
@@ -962,9 +1068,9 @@ func printProcessingProgress(filesProcessed, totalFiles int, bytesProcessed, tot
|
||||
fileRate,
|
||||
elapsed.Round(time.Second))
|
||||
if eta > 0 {
|
||||
fmt.Printf(", ETA: %s", eta.Round(time.Second))
|
||||
_, _ = fmt.Fprintf(s.output, ", ETA: %s", eta.Round(time.Second))
|
||||
}
|
||||
fmt.Println()
|
||||
_, _ = fmt.Fprintln(s.output)
|
||||
}
|
||||
|
||||
// finalizeProcessPhase flushes the packer, writes remaining pending files to the database,
|
||||
@@ -1056,7 +1162,7 @@ func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobW
|
||||
if _, err := s.storage.Stat(ctx, blobPath); err == nil {
|
||||
log.Info("Blob already exists in storage, skipping upload",
|
||||
"hash", finishedBlob.Hash, "size", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
fmt.Printf("Blob exists: %s (%s, skipped upload)\n",
|
||||
_, _ = fmt.Fprintf(s.output, "Blob exists: %s (%s, skipped upload)\n",
|
||||
finishedBlob.Hash[:12]+"...", humanize.Bytes(uint64(finishedBlob.Compressed)))
|
||||
return true, nil
|
||||
}
|
||||
@@ -1071,7 +1177,7 @@ func (s *Scanner) uploadBlobIfNeeded(ctx context.Context, blobPath string, blobW
|
||||
uploadDuration := time.Since(startTime)
|
||||
uploadSpeedBps := float64(finishedBlob.Compressed) / uploadDuration.Seconds()
|
||||
|
||||
fmt.Printf("Blob stored: %s (%s, %s/sec, %s)\n",
|
||||
_, _ = fmt.Fprintf(s.output, "Blob stored: %s (%s, %s/sec, %s)\n",
|
||||
finishedBlob.Hash[:12]+"...",
|
||||
humanize.Bytes(uint64(finishedBlob.Compressed)),
|
||||
humanize.Bytes(uint64(uploadSpeedBps)),
|
||||
@@ -1176,6 +1282,12 @@ type streamingChunkInfo struct {
|
||||
|
||||
// processFileStreaming processes a file by streaming chunks directly to the packer
|
||||
func (s *Scanner) processFileStreaming(ctx context.Context, fileToProcess *FileToProcess, result *ScanResult) error {
|
||||
// Symlinks and directories have no data to chunk — just record them in the DB.
|
||||
mode := os.FileMode(fileToProcess.File.Mode)
|
||||
if mode&os.ModeSymlink != 0 || mode.IsDir() {
|
||||
return s.recordNonRegularFile(ctx, fileToProcess)
|
||||
}
|
||||
|
||||
file, err := s.fs.Open(fileToProcess.Path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening file: %w", err)
|
||||
@@ -1329,7 +1441,7 @@ func (s *Scanner) detectDeletedFilesFromMap(ctx context.Context, knownFiles map[
|
||||
}
|
||||
|
||||
if result.FilesDeleted > 0 {
|
||||
fmt.Printf("Found %s deleted files\n", formatNumber(result.FilesDeleted))
|
||||
_, _ = fmt.Fprintf(s.output, "Found %s deleted files\n", formatNumber(result.FilesDeleted))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -110,15 +110,15 @@ func TestScannerSimpleDirectory(t *testing.T) {
|
||||
t.Errorf("expected at least 97 bytes scanned, got %d", result.BytesScanned)
|
||||
}
|
||||
|
||||
// Verify files in database - only regular files are stored
|
||||
// Verify files in database - includes regular files and directories
|
||||
files, err := repos.Files.ListByPrefix(ctx, "/source")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to list files: %v", err)
|
||||
}
|
||||
|
||||
// We should have 6 files (directories are not stored)
|
||||
if len(files) != 6 {
|
||||
t.Errorf("expected 6 files in database, got %d", len(files))
|
||||
// 6 regular files + 3 directories (/source, /source/subdir, /source/subdir2)
|
||||
if len(files) != 9 {
|
||||
t.Errorf("expected 9 entries in database (6 files + 3 dirs), got %d", len(files))
|
||||
}
|
||||
|
||||
// Verify specific file
|
||||
|
||||
@@ -1,434 +0,0 @@
|
||||
package vaultik
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/log"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
)
|
||||
|
||||
// daemonMinBackupInterval is the absolute minimum time allowed between backup runs,
|
||||
// regardless of config, to prevent runaway backup loops.
|
||||
const daemonMinBackupInterval = 1 * time.Minute
|
||||
|
||||
// daemonShutdownTimeout is the maximum time to wait for an in-progress backup
|
||||
// to complete during graceful shutdown before force-exiting.
|
||||
const daemonShutdownTimeout = 5 * time.Minute
|
||||
|
||||
// RunDaemon runs vaultik in daemon mode: it watches configured directories for
|
||||
// changes using filesystem notifications, runs periodic backups at the configured
|
||||
// interval, and performs full scans at the full_scan_interval. It handles
|
||||
// SIGTERM/SIGINT for graceful shutdown, completing any in-progress backup before
|
||||
// exiting.
|
||||
func (v *Vaultik) RunDaemon(opts *SnapshotCreateOptions) error {
|
||||
backupInterval := v.Config.BackupInterval
|
||||
if backupInterval < daemonMinBackupInterval {
|
||||
backupInterval = daemonMinBackupInterval
|
||||
}
|
||||
|
||||
minTimeBetween := v.Config.MinTimeBetweenRun
|
||||
if minTimeBetween < daemonMinBackupInterval {
|
||||
minTimeBetween = daemonMinBackupInterval
|
||||
}
|
||||
|
||||
fullScanInterval := v.Config.FullScanInterval
|
||||
if fullScanInterval <= 0 {
|
||||
fullScanInterval = 24 * time.Hour
|
||||
}
|
||||
|
||||
log.Info("Starting daemon mode",
|
||||
"backup_interval", backupInterval,
|
||||
"min_time_between_run", minTimeBetween,
|
||||
"full_scan_interval", fullScanInterval,
|
||||
)
|
||||
v.printfStdout("Daemon mode started\n")
|
||||
v.printfStdout(" Backup interval: %s\n", backupInterval)
|
||||
v.printfStdout(" Min time between: %s\n", minTimeBetween)
|
||||
v.printfStdout(" Full scan interval: %s\n", fullScanInterval)
|
||||
|
||||
// Create a daemon-scoped context that we cancel on signal.
|
||||
ctx, cancel := context.WithCancel(v.ctx)
|
||||
defer cancel()
|
||||
|
||||
// Set up signal handling for graceful shutdown.
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// Tracker for filesystem change events.
|
||||
tracker := newChangeTracker()
|
||||
|
||||
// Start the filesystem watcher.
|
||||
watcher, err := v.startWatcher(ctx, tracker)
|
||||
if err != nil {
|
||||
return fmt.Errorf("starting filesystem watcher: %w", err)
|
||||
}
|
||||
defer func() { _ = watcher.Close() }()
|
||||
|
||||
// Timers
|
||||
backupTicker := time.NewTicker(backupInterval)
|
||||
defer backupTicker.Stop()
|
||||
|
||||
fullScanTicker := time.NewTicker(fullScanInterval)
|
||||
defer fullScanTicker.Stop()
|
||||
|
||||
var lastBackupTime time.Time
|
||||
backupRunning := make(chan struct{}, 1) // semaphore: 1 = backup in progress
|
||||
|
||||
// Run an initial full backup immediately on startup.
|
||||
log.Info("Running initial backup on daemon startup")
|
||||
v.printfStdout("Running initial backup...\n")
|
||||
if err := v.runDaemonBackup(ctx, opts, tracker, false); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return nil // context cancelled, shutting down
|
||||
}
|
||||
log.Error("Initial backup failed", "error", err)
|
||||
v.printfStderr("Initial backup failed: %v\n", err)
|
||||
// Continue running — next scheduled backup may succeed.
|
||||
} else {
|
||||
lastBackupTime = time.Now()
|
||||
tracker.reset()
|
||||
}
|
||||
|
||||
v.printfStdout("Watching for changes...\n")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info("Daemon context cancelled, shutting down")
|
||||
return nil
|
||||
|
||||
case sig := <-sigCh:
|
||||
log.Info("Received signal, initiating graceful shutdown", "signal", sig)
|
||||
v.printfStdout("\nReceived %s, shutting down...\n", sig)
|
||||
cancel()
|
||||
|
||||
// Wait for any in-progress backup to finish.
|
||||
select {
|
||||
case backupRunning <- struct{}{}:
|
||||
// No backup running, we can exit immediately.
|
||||
<-backupRunning
|
||||
default:
|
||||
// Backup is running, wait for it to complete.
|
||||
v.printfStdout("Waiting for in-progress backup to complete...\n")
|
||||
shutdownTimer := time.NewTimer(daemonShutdownTimeout)
|
||||
select {
|
||||
case backupRunning <- struct{}{}:
|
||||
<-backupRunning
|
||||
shutdownTimer.Stop()
|
||||
case <-shutdownTimer.C:
|
||||
log.Warn("Shutdown timeout exceeded, forcing exit")
|
||||
v.printfStderr("Shutdown timeout exceeded, forcing exit\n")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
case <-backupTicker.C:
|
||||
// Periodic backup tick. Only run if there are changes and enough
|
||||
// time has elapsed since the last run.
|
||||
if !tracker.hasChanges() {
|
||||
log.Debug("Backup tick: no changes detected, skipping")
|
||||
continue
|
||||
}
|
||||
if time.Since(lastBackupTime) < minTimeBetween {
|
||||
log.Debug("Backup tick: too soon since last backup",
|
||||
"last_backup", lastBackupTime,
|
||||
"min_interval", minTimeBetween,
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
// Try to acquire the backup semaphore (non-blocking).
|
||||
select {
|
||||
case backupRunning <- struct{}{}:
|
||||
default:
|
||||
log.Debug("Backup tick: backup already in progress, skipping")
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info("Running scheduled backup", "changes", tracker.changeCount())
|
||||
v.printfStdout("Running scheduled backup (%d changes detected)...\n", tracker.changeCount())
|
||||
if err := v.runDaemonBackup(ctx, opts, tracker, false); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
<-backupRunning
|
||||
return nil
|
||||
}
|
||||
log.Error("Scheduled backup failed", "error", err)
|
||||
v.printfStderr("Scheduled backup failed: %v\n", err)
|
||||
} else {
|
||||
lastBackupTime = time.Now()
|
||||
tracker.reset()
|
||||
}
|
||||
<-backupRunning
|
||||
|
||||
case <-fullScanTicker.C:
|
||||
// Full scan — ignore whether changes were detected; do a complete scan.
|
||||
if time.Since(lastBackupTime) < minTimeBetween {
|
||||
log.Debug("Full scan tick: too soon since last backup, deferring")
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case backupRunning <- struct{}{}:
|
||||
default:
|
||||
log.Debug("Full scan tick: backup already in progress, skipping")
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info("Running full periodic scan")
|
||||
v.printfStdout("Running full periodic scan...\n")
|
||||
if err := v.runDaemonBackup(ctx, opts, tracker, true); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
<-backupRunning
|
||||
return nil
|
||||
}
|
||||
log.Error("Full scan backup failed", "error", err)
|
||||
v.printfStderr("Full scan backup failed: %v\n", err)
|
||||
} else {
|
||||
lastBackupTime = time.Now()
|
||||
tracker.reset()
|
||||
}
|
||||
<-backupRunning
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runDaemonBackup executes a single backup run within the daemon loop.
|
||||
// If fullScan is true, all snapshots are processed regardless of tracked changes.
|
||||
// Otherwise, only snapshots whose paths overlap with tracked changes are processed.
|
||||
func (v *Vaultik) runDaemonBackup(ctx context.Context, opts *SnapshotCreateOptions, tracker *changeTracker, fullScan bool) error {
|
||||
startTime := time.Now()
|
||||
|
||||
// Build a one-shot create options for this run.
|
||||
runOpts := &SnapshotCreateOptions{
|
||||
Cron: opts.Cron,
|
||||
Prune: opts.Prune,
|
||||
SkipErrors: opts.SkipErrors,
|
||||
}
|
||||
|
||||
if !fullScan {
|
||||
// Filter to only snapshots whose paths had changes.
|
||||
changedPaths := tracker.changedPaths()
|
||||
affected := v.snapshotsAffectedByChanges(changedPaths)
|
||||
if len(affected) == 0 {
|
||||
log.Debug("No snapshots affected by changes")
|
||||
return nil
|
||||
}
|
||||
runOpts.Snapshots = affected
|
||||
log.Info("Running incremental backup for affected snapshots", "snapshots", affected)
|
||||
}
|
||||
// fullScan: leave runOpts.Snapshots empty → CreateSnapshot processes all.
|
||||
|
||||
// Use a child context so cancellation propagates but we can still finish
|
||||
// if the parent hasn't been cancelled.
|
||||
childCtx, childCancel := context.WithCancel(ctx)
|
||||
defer childCancel()
|
||||
|
||||
// Temporarily swap the Vaultik context.
|
||||
origCtx := v.ctx
|
||||
v.ctx = childCtx
|
||||
defer func() { v.ctx = origCtx }()
|
||||
|
||||
if err := v.CreateSnapshot(runOpts); err != nil {
|
||||
return fmt.Errorf("backup run failed: %w", err)
|
||||
}
|
||||
|
||||
log.Info("Daemon backup complete", "duration", time.Since(startTime))
|
||||
v.printfStdout("Backup complete in %s\n", formatDuration(time.Since(startTime)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// snapshotsAffectedByChanges returns the names of configured snapshots whose
|
||||
// paths overlap with any of the changed paths.
|
||||
func (v *Vaultik) snapshotsAffectedByChanges(changedPaths []string) []string {
|
||||
var affected []string
|
||||
for _, snapName := range v.Config.SnapshotNames() {
|
||||
snapCfg := v.Config.Snapshots[snapName]
|
||||
for _, snapPath := range snapCfg.Paths {
|
||||
absSnapPath, err := filepath.Abs(snapPath)
|
||||
if err != nil {
|
||||
absSnapPath = snapPath
|
||||
}
|
||||
for _, changed := range changedPaths {
|
||||
if isSubpath(changed, absSnapPath) {
|
||||
affected = append(affected, snapName)
|
||||
goto nextSnapshot
|
||||
}
|
||||
}
|
||||
}
|
||||
nextSnapshot:
|
||||
}
|
||||
return affected
|
||||
}
|
||||
|
||||
// isSubpath returns true if child is under parent (or equal to it).
|
||||
func isSubpath(child, parent string) bool {
|
||||
// Normalize both paths.
|
||||
child = filepath.Clean(child)
|
||||
parent = filepath.Clean(parent)
|
||||
if child == parent {
|
||||
return true
|
||||
}
|
||||
// Ensure parent ends with a separator for prefix matching,
|
||||
// unless parent is the root directory (which already ends with /).
|
||||
prefix := parent
|
||||
if !strings.HasSuffix(prefix, string(filepath.Separator)) {
|
||||
prefix += string(filepath.Separator)
|
||||
}
|
||||
return strings.HasPrefix(child, prefix)
|
||||
}
|
||||
|
||||
// startWatcher creates an fsnotify watcher and adds all configured snapshot paths.
|
||||
// It spawns a goroutine that reads events and feeds the change tracker.
|
||||
func (v *Vaultik) startWatcher(ctx context.Context, tracker *changeTracker) (*fsnotify.Watcher, error) {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating watcher: %w", err)
|
||||
}
|
||||
|
||||
// Collect unique absolute paths to watch.
|
||||
watchPaths := make(map[string]struct{})
|
||||
for _, snapName := range v.Config.SnapshotNames() {
|
||||
snapCfg := v.Config.Snapshots[snapName]
|
||||
for _, p := range snapCfg.Paths {
|
||||
absPath, err := filepath.Abs(p)
|
||||
if err != nil {
|
||||
log.Warn("Failed to resolve absolute path for watch", "path", p, "error", err)
|
||||
continue
|
||||
}
|
||||
watchPaths[absPath] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// Add paths to watcher. Walk the top-level to add subdirectories
|
||||
// since fsnotify doesn't recurse automatically.
|
||||
for p := range watchPaths {
|
||||
if err := v.addWatchRecursive(watcher, p); err != nil {
|
||||
log.Warn("Failed to watch path", "path", p, "error", err)
|
||||
// Non-fatal: the path might not exist yet.
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn the event reader goroutine.
|
||||
go v.watcherLoop(ctx, watcher, tracker)
|
||||
|
||||
return watcher, nil
|
||||
}
|
||||
|
||||
// addWatchRecursive walks a directory tree and adds each directory to the watcher.
|
||||
func (v *Vaultik) addWatchRecursive(watcher *fsnotify.Watcher, root string) error {
|
||||
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
// Can't read — skip this subtree.
|
||||
if info != nil && info.IsDir() {
|
||||
return filepath.SkipDir
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if info.IsDir() {
|
||||
// Skip common directories that don't need watching.
|
||||
base := filepath.Base(path)
|
||||
if base == ".git" || base == "node_modules" || base == "__pycache__" {
|
||||
return filepath.SkipDir
|
||||
}
|
||||
if err := watcher.Add(path); err != nil {
|
||||
log.Debug("Failed to watch directory", "path", path, "error", err)
|
||||
// Non-fatal: continue walking.
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// watcherLoop reads filesystem events from the watcher and records them
|
||||
// in the change tracker. It runs until the context is cancelled.
|
||||
func (v *Vaultik) watcherLoop(ctx context.Context, watcher *fsnotify.Watcher, tracker *changeTracker) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case event, ok := <-watcher.Events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// Only track write/create/remove/rename events.
|
||||
if event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Remove|fsnotify.Rename) != 0 {
|
||||
tracker.recordChange(event.Name)
|
||||
log.Debug("Filesystem change detected", "path", event.Name, "op", event.Op)
|
||||
}
|
||||
// If a new directory was created, watch it too.
|
||||
if event.Op&fsnotify.Create != 0 {
|
||||
if info, err := os.Stat(event.Name); err == nil && info.IsDir() {
|
||||
if err := v.addWatchRecursive(watcher, event.Name); err != nil {
|
||||
log.Debug("Failed to watch new directory", "path", event.Name, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
case err, ok := <-watcher.Errors:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
log.Warn("Filesystem watcher error", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// changeTracker records filesystem paths that have been modified since the
|
||||
// last backup. It is safe for concurrent use.
|
||||
type changeTracker struct {
|
||||
mu sync.Mutex
|
||||
changes map[string]time.Time // path → last change time
|
||||
}
|
||||
|
||||
// newChangeTracker creates a new empty change tracker.
|
||||
func newChangeTracker() *changeTracker {
|
||||
return &changeTracker{
|
||||
changes: make(map[string]time.Time),
|
||||
}
|
||||
}
|
||||
|
||||
// recordChange records that a path has been modified.
|
||||
func (ct *changeTracker) recordChange(path string) {
|
||||
ct.mu.Lock()
|
||||
ct.changes[path] = time.Now()
|
||||
ct.mu.Unlock()
|
||||
}
|
||||
|
||||
// hasChanges returns true if any changes have been recorded.
|
||||
func (ct *changeTracker) hasChanges() bool {
|
||||
ct.mu.Lock()
|
||||
defer ct.mu.Unlock()
|
||||
return len(ct.changes) > 0
|
||||
}
|
||||
|
||||
// changeCount returns the number of unique changed paths.
|
||||
func (ct *changeTracker) changeCount() int {
|
||||
ct.mu.Lock()
|
||||
defer ct.mu.Unlock()
|
||||
return len(ct.changes)
|
||||
}
|
||||
|
||||
// changedPaths returns all changed paths.
|
||||
func (ct *changeTracker) changedPaths() []string {
|
||||
ct.mu.Lock()
|
||||
defer ct.mu.Unlock()
|
||||
paths := make([]string, 0, len(ct.changes))
|
||||
for p := range ct.changes {
|
||||
paths = append(paths, p)
|
||||
}
|
||||
return paths
|
||||
}
|
||||
|
||||
// reset clears all recorded changes.
|
||||
func (ct *changeTracker) reset() {
|
||||
ct.mu.Lock()
|
||||
ct.changes = make(map[string]time.Time)
|
||||
ct.mu.Unlock()
|
||||
}
|
||||
@@ -1,196 +0,0 @@
|
||||
package vaultik
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/config"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewChangeTracker(t *testing.T) {
|
||||
ct := newChangeTracker()
|
||||
require.NotNil(t, ct)
|
||||
assert.False(t, ct.hasChanges())
|
||||
assert.Equal(t, 0, ct.changeCount())
|
||||
assert.Empty(t, ct.changedPaths())
|
||||
}
|
||||
|
||||
func TestChangeTrackerRecordChange(t *testing.T) {
|
||||
ct := newChangeTracker()
|
||||
|
||||
ct.recordChange("/home/user/file1.txt")
|
||||
assert.True(t, ct.hasChanges())
|
||||
assert.Equal(t, 1, ct.changeCount())
|
||||
|
||||
ct.recordChange("/home/user/file2.txt")
|
||||
assert.Equal(t, 2, ct.changeCount())
|
||||
|
||||
// Duplicate path should update time but not increase count.
|
||||
ct.recordChange("/home/user/file1.txt")
|
||||
assert.Equal(t, 2, ct.changeCount())
|
||||
|
||||
paths := ct.changedPaths()
|
||||
assert.Len(t, paths, 2)
|
||||
assert.Contains(t, paths, "/home/user/file1.txt")
|
||||
assert.Contains(t, paths, "/home/user/file2.txt")
|
||||
}
|
||||
|
||||
func TestChangeTrackerReset(t *testing.T) {
|
||||
ct := newChangeTracker()
|
||||
|
||||
ct.recordChange("/home/user/file1.txt")
|
||||
ct.recordChange("/home/user/file2.txt")
|
||||
assert.Equal(t, 2, ct.changeCount())
|
||||
|
||||
ct.reset()
|
||||
assert.False(t, ct.hasChanges())
|
||||
assert.Equal(t, 0, ct.changeCount())
|
||||
assert.Empty(t, ct.changedPaths())
|
||||
}
|
||||
|
||||
func TestChangeTrackerConcurrency(t *testing.T) {
|
||||
ct := newChangeTracker()
|
||||
done := make(chan struct{})
|
||||
|
||||
// Write from multiple goroutines simultaneously.
|
||||
for i := 0; i < 10; i++ {
|
||||
go func(n int) {
|
||||
for j := 0; j < 100; j++ {
|
||||
ct.recordChange("/path/" + string(rune('a'+n)))
|
||||
}
|
||||
done <- struct{}{}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Also read concurrently.
|
||||
go func() {
|
||||
for i := 0; i < 100; i++ {
|
||||
_ = ct.hasChanges()
|
||||
_ = ct.changeCount()
|
||||
_ = ct.changedPaths()
|
||||
}
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
// Wait for all goroutines.
|
||||
for i := 0; i < 11; i++ {
|
||||
<-done
|
||||
}
|
||||
|
||||
assert.True(t, ct.hasChanges())
|
||||
assert.LessOrEqual(t, ct.changeCount(), 10) // 10 unique paths
|
||||
}
|
||||
|
||||
func TestChangeTrackerRecordTimestamp(t *testing.T) {
|
||||
ct := newChangeTracker()
|
||||
|
||||
before := time.Now()
|
||||
ct.recordChange("/some/path")
|
||||
after := time.Now()
|
||||
|
||||
ct.mu.Lock()
|
||||
ts := ct.changes["/some/path"]
|
||||
ct.mu.Unlock()
|
||||
|
||||
assert.False(t, ts.Before(before))
|
||||
assert.False(t, ts.After(after))
|
||||
}
|
||||
|
||||
func TestIsSubpath(t *testing.T) {
|
||||
tests := []struct {
|
||||
child string
|
||||
parent string
|
||||
expected bool
|
||||
}{
|
||||
{"/home/user/file.txt", "/home/user", true},
|
||||
{"/home/user", "/home/user", true},
|
||||
{"/home/user/deep/nested/file.txt", "/home/user", true},
|
||||
{"/home/other/file.txt", "/home/user", false},
|
||||
{"/home/username/file.txt", "/home/user", false}, // not a subpath, just prefix match
|
||||
{"/etc/config", "/home/user", false},
|
||||
{"/", "/", true},
|
||||
{"/a", "/", true},
|
||||
{"/a/b", "/a", true},
|
||||
{"/ab", "/a", false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.child+"_under_"+tt.parent, func(t *testing.T) {
|
||||
result := isSubpath(tt.child, tt.parent)
|
||||
assert.Equal(t, tt.expected, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotsAffectedByChanges(t *testing.T) {
|
||||
// We can't easily test this without a full Vaultik instance with config,
|
||||
// but we can verify the helper function isSubpath which it depends on.
|
||||
// The full integration is tested via the daemon integration test.
|
||||
|
||||
// Verify basic subpath logic used by snapshotsAffectedByChanges.
|
||||
assert.True(t, isSubpath("/home/user/docs/report.txt", "/home/user"))
|
||||
assert.False(t, isSubpath("/var/log/syslog", "/home/user"))
|
||||
}
|
||||
|
||||
func TestDaemonConstants(t *testing.T) {
|
||||
// Verify daemon constants are reasonable values.
|
||||
assert.GreaterOrEqual(t, daemonMinBackupInterval, 1*time.Minute)
|
||||
assert.GreaterOrEqual(t, daemonShutdownTimeout, 1*time.Minute)
|
||||
}
|
||||
|
||||
func TestRunDaemon_CancelledContext(t *testing.T) {
|
||||
// Create a temporary directory to use as a snapshot path.
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
// Write a file so the watched path is non-empty.
|
||||
err := os.WriteFile(filepath.Join(tmpDir, "testfile.txt"), []byte("hello"), 0o644)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Build a minimal Vaultik with daemon-friendly config.
|
||||
// RunDaemon will fail on the initial backup (no storage configured),
|
||||
// but it should continue running. We cancel the context to verify
|
||||
// graceful shutdown.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
stdout := &bytes.Buffer{}
|
||||
stderr := &bytes.Buffer{}
|
||||
|
||||
v := &Vaultik{
|
||||
Config: &config.Config{
|
||||
BackupInterval: 1 * time.Hour,
|
||||
FullScanInterval: 24 * time.Hour,
|
||||
MinTimeBetweenRun: 1 * time.Minute,
|
||||
Snapshots: map[string]config.SnapshotConfig{
|
||||
"test": {
|
||||
Paths: []string{tmpDir},
|
||||
},
|
||||
},
|
||||
},
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
Stdout: stdout,
|
||||
Stderr: stderr,
|
||||
}
|
||||
|
||||
// Cancel the context shortly after RunDaemon starts so the daemon
|
||||
// loop exits via its ctx.Done() path.
|
||||
go func() {
|
||||
// Wait for the initial backup to fail (it will, since there's no
|
||||
// storage backend), then cancel.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
cancel()
|
||||
}()
|
||||
|
||||
err = v.RunDaemon(&SnapshotCreateOptions{})
|
||||
// RunDaemon should return nil on context cancellation (graceful shutdown).
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Verify daemon printed startup messages.
|
||||
output := stdout.String()
|
||||
assert.Contains(t, output, "Daemon mode started")
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package vaultik
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -95,18 +96,39 @@ func parseSnapshotName(snapshotID string) string {
|
||||
return strings.Join(parts[1:len(parts)-1], "_")
|
||||
}
|
||||
|
||||
// parseDuration parses a duration string with support for days
|
||||
// parseDuration parses a duration string with support for human-friendly units:
|
||||
// d/day/days, w/week/weeks, mo/month/months, y/year/years, plus standard Go
|
||||
// duration units (h, m, s).
|
||||
func parseDuration(s string) (time.Duration, error) {
|
||||
// Check for days suffix
|
||||
if strings.HasSuffix(s, "d") {
|
||||
daysStr := strings.TrimSuffix(s, "d")
|
||||
days, err := strconv.Atoi(daysStr)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid days value: %w", err)
|
||||
}
|
||||
return time.Duration(days) * 24 * time.Hour, nil
|
||||
if d, err := time.ParseDuration(s); err == nil {
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// Otherwise use standard Go duration parsing
|
||||
return time.ParseDuration(s)
|
||||
re := regexp.MustCompile(`(\d+)\s*([a-zA-Z]+)`)
|
||||
matches := re.FindAllStringSubmatch(s, -1)
|
||||
if len(matches) == 0 {
|
||||
return 0, fmt.Errorf("invalid duration: %q", s)
|
||||
}
|
||||
|
||||
var total time.Duration
|
||||
for _, match := range matches {
|
||||
n, err := strconv.Atoi(match[1])
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid number %q: %w", match[1], err)
|
||||
}
|
||||
unit := strings.ToLower(match[2])
|
||||
switch unit {
|
||||
case "d", "day", "days":
|
||||
total += time.Duration(n) * 24 * time.Hour
|
||||
case "w", "week", "weeks":
|
||||
total += time.Duration(n) * 7 * 24 * time.Hour
|
||||
case "mo", "month", "months":
|
||||
total += time.Duration(n) * 30 * 24 * time.Hour
|
||||
case "y", "year", "years":
|
||||
total += time.Duration(n) * 365 * 24 * time.Hour
|
||||
default:
|
||||
return 0, fmt.Errorf("unknown time unit %q", unit)
|
||||
}
|
||||
}
|
||||
return total, nil
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package vaultik
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestParseSnapshotName(t *testing.T) {
|
||||
@@ -37,6 +38,41 @@ func TestParseSnapshotName(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseDuration(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
want time.Duration
|
||||
err bool
|
||||
}{
|
||||
{"30d", 30 * 24 * time.Hour, false},
|
||||
{"4w", 4 * 7 * 24 * time.Hour, false},
|
||||
{"6mo", 6 * 30 * 24 * time.Hour, false},
|
||||
{"1y", 365 * 24 * time.Hour, false},
|
||||
{"2w3d", 2*7*24*time.Hour + 3*24*time.Hour, false},
|
||||
{"1h", time.Hour, false},
|
||||
{"30s", 30 * time.Second, false},
|
||||
{"garbage", 0, true},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.input, func(t *testing.T) {
|
||||
got, err := parseDuration(tt.input)
|
||||
if tt.err {
|
||||
if err == nil {
|
||||
t.Fatalf("expected error for %q, got %v", tt.input, got)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error for %q: %v", tt.input, err)
|
||||
}
|
||||
if got != tt.want {
|
||||
t.Errorf("parseDuration(%q) = %v, want %v", tt.input, got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseSnapshotTimestamp(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
@@ -66,18 +66,6 @@ func (v *Vaultik) ShowInfo() error {
|
||||
}
|
||||
v.printlnStdout()
|
||||
|
||||
// Daemon Settings (if applicable)
|
||||
if v.Config.BackupInterval > 0 || v.Config.MinTimeBetweenRun > 0 {
|
||||
v.printfStdout("=== Daemon Settings ===\n")
|
||||
if v.Config.BackupInterval > 0 {
|
||||
v.printfStdout("Backup Interval: %s\n", v.Config.BackupInterval)
|
||||
}
|
||||
if v.Config.MinTimeBetweenRun > 0 {
|
||||
v.printfStdout("Minimum Time: %s\n", v.Config.MinTimeBetweenRun)
|
||||
}
|
||||
v.printlnStdout()
|
||||
}
|
||||
|
||||
// Local Database
|
||||
v.printfStdout("=== Local Database ===\n")
|
||||
v.printfStdout("Index Path: %s\n", v.Config.IndexPath)
|
||||
|
||||
@@ -541,3 +541,174 @@ func TestBackupAndRestore(t *testing.T) {
|
||||
|
||||
t.Log("Backup and restore test completed successfully")
|
||||
}
|
||||
|
||||
// TestEndToEndFileStorage exercises the full backup → restore loop against the
|
||||
// real `file://` storage backend (FileStorer) on a real OS filesystem. This is
|
||||
// the closest local approximation of a production backup: encrypted blobs get
|
||||
// written to disk, the metadata SQLite database is exported through the same
|
||||
// blobgen pipeline as a real backup, and restoration reads them back through
|
||||
// the public Vaultik.Restore entrypoint. It is the canonical end-to-end smoke
|
||||
// test for 1.0.
|
||||
func TestEndToEndFileStorage(t *testing.T) {
|
||||
log.Initialize(log.Config{})
|
||||
|
||||
// Real OS filesystem (SQLite + FileStorer both need it).
|
||||
fs := afero.NewOsFs()
|
||||
tempDir, err := os.MkdirTemp("", "vaultik-e2e-")
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = os.RemoveAll(tempDir) }()
|
||||
|
||||
dataDir := filepath.Join(tempDir, "source")
|
||||
storeDir := filepath.Join(tempDir, "remote")
|
||||
restoreDir := filepath.Join(tempDir, "restored")
|
||||
dbPath := filepath.Join(tempDir, "index.sqlite")
|
||||
|
||||
// Write a representative mix of file sizes:
|
||||
// - empty file
|
||||
// - tiny text file
|
||||
// - file just under chunk boundary
|
||||
// - file forcing multiple chunks
|
||||
// - nested subdirectories
|
||||
chunkSize := int64(64 * 1024)
|
||||
maxBlobSize := int64(512 * 1024)
|
||||
|
||||
testFiles := map[string][]byte{
|
||||
filepath.Join(dataDir, "empty.txt"): {},
|
||||
filepath.Join(dataDir, "small.txt"): []byte("hello vaultik"),
|
||||
filepath.Join(dataDir, "subdir", "medium.bin"): bytesPattern("medium-", int(chunkSize/2)),
|
||||
filepath.Join(dataDir, "subdir", "large.bin"): bytesPattern("large-", int(chunkSize*4)),
|
||||
filepath.Join(dataDir, "deep", "nest", "leaf.txt"): []byte("leaf"),
|
||||
}
|
||||
|
||||
for path, content := range testFiles {
|
||||
require.NoError(t, fs.MkdirAll(filepath.Dir(path), 0o755))
|
||||
require.NoError(t, afero.WriteFile(fs, path, content, 0o644))
|
||||
}
|
||||
|
||||
// Create a file with non-default permissions.
|
||||
restrictedPath := filepath.Join(dataDir, "restricted.txt")
|
||||
require.NoError(t, afero.WriteFile(fs, restrictedPath, []byte("secret"), 0o600))
|
||||
testFiles[restrictedPath] = []byte("secret")
|
||||
|
||||
// Create an empty directory (should survive round-trip).
|
||||
emptyDir := filepath.Join(dataDir, "emptydir")
|
||||
require.NoError(t, fs.MkdirAll(emptyDir, 0o755))
|
||||
|
||||
// Create a symlink.
|
||||
symlinkPath := filepath.Join(dataDir, "link-to-small")
|
||||
require.NoError(t, os.Symlink("small.txt", symlinkPath))
|
||||
|
||||
// FileStorer is the real-world local-disk backend.
|
||||
storer, err := storage.NewFileStorer(storeDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
agePublicKey := "age1ezrjmfpwsc95svdg0y54mums3zevgzu0x0ecq2f7tp8a05gl0sjq9q9wjg"
|
||||
ageSecretKey := "AGE-SECRET-KEY-19CR5YSFW59HM4TLD6GXVEDMZFTVVF7PPHKUT68TXSFPK7APHXA2QS2NJA5"
|
||||
|
||||
cfg := &config.Config{
|
||||
AgeRecipients: []string{agePublicKey},
|
||||
AgeSecretKey: ageSecretKey,
|
||||
CompressionLevel: 3,
|
||||
Hostname: "test-host",
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
db, err := database.New(ctx, dbPath)
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = db.Close() }()
|
||||
|
||||
repos := database.NewRepositories(db)
|
||||
|
||||
sm := snapshot.NewSnapshotManager(snapshot.SnapshotManagerParams{
|
||||
Repos: repos,
|
||||
Storage: storer,
|
||||
Config: cfg,
|
||||
})
|
||||
sm.SetFilesystem(fs)
|
||||
|
||||
scanner := snapshot.NewScanner(snapshot.ScannerConfig{
|
||||
FS: fs,
|
||||
Storage: storer,
|
||||
ChunkSize: chunkSize,
|
||||
MaxBlobSize: maxBlobSize,
|
||||
CompressionLevel: cfg.CompressionLevel,
|
||||
AgeRecipients: cfg.AgeRecipients,
|
||||
Repositories: repos,
|
||||
})
|
||||
|
||||
snapshotID, err := sm.CreateSnapshotWithName(ctx, cfg.Hostname, "e2e", "test-version", "test-git")
|
||||
require.NoError(t, err)
|
||||
|
||||
scanResult, err := scanner.Scan(ctx, dataDir, snapshotID)
|
||||
require.NoError(t, err)
|
||||
require.Greater(t, scanResult.FilesScanned, 0)
|
||||
require.Greater(t, scanResult.BlobsCreated, 0)
|
||||
|
||||
require.NoError(t, sm.CompleteSnapshot(ctx, snapshotID))
|
||||
require.NoError(t, sm.ExportSnapshotMetadata(ctx, dbPath, snapshotID))
|
||||
|
||||
// Verify the backup actually landed on disk under blobs/ and metadata/.
|
||||
blobInfo, err := os.Stat(filepath.Join(storeDir, "blobs"))
|
||||
require.NoError(t, err)
|
||||
require.True(t, blobInfo.IsDir())
|
||||
metaInfo, err := os.Stat(filepath.Join(storeDir, "metadata", snapshotID))
|
||||
require.NoError(t, err)
|
||||
require.True(t, metaInfo.IsDir())
|
||||
|
||||
// Tear down the source DB before restore — restore must work using only
|
||||
// the remote bytes plus the secret key, with no help from the local index.
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
restoreVaultik := &vaultik.Vaultik{
|
||||
Config: cfg,
|
||||
Storage: storer,
|
||||
Fs: fs,
|
||||
Stdout: io.Discard,
|
||||
Stderr: io.Discard,
|
||||
}
|
||||
restoreVaultik.SetContext(ctx)
|
||||
|
||||
require.NoError(t, restoreVaultik.Restore(&vaultik.RestoreOptions{
|
||||
SnapshotID: snapshotID,
|
||||
TargetDir: restoreDir,
|
||||
Verify: true,
|
||||
}))
|
||||
|
||||
// Byte-equality compare every original against its restored copy.
|
||||
for origPath, expected := range testFiles {
|
||||
restoredPath := filepath.Join(restoreDir, origPath)
|
||||
got, err := afero.ReadFile(fs, restoredPath)
|
||||
require.NoError(t, err, "restored file missing: %s", restoredPath)
|
||||
require.Equalf(t, expected, got, "byte-equality failed for %s", origPath)
|
||||
}
|
||||
|
||||
// Verify the restricted file kept its permissions.
|
||||
restoredRestricted := filepath.Join(restoreDir, restrictedPath)
|
||||
rInfo, err := os.Stat(restoredRestricted)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, os.FileMode(0o600), rInfo.Mode().Perm(),
|
||||
"restricted file should preserve 0600 permissions")
|
||||
|
||||
// Verify the empty directory was restored.
|
||||
restoredEmptyDir := filepath.Join(restoreDir, emptyDir)
|
||||
dInfo, err := os.Stat(restoredEmptyDir)
|
||||
require.NoError(t, err, "empty directory should be restored")
|
||||
assert.True(t, dInfo.IsDir(), "emptydir should be a directory")
|
||||
|
||||
// Verify the symlink was restored with the correct target.
|
||||
restoredSymlink := filepath.Join(restoreDir, symlinkPath)
|
||||
target, err := os.Readlink(restoredSymlink)
|
||||
require.NoError(t, err, "symlink should be restored")
|
||||
assert.Equal(t, "small.txt", target, "symlink target should be preserved")
|
||||
}
|
||||
|
||||
// bytesPattern returns a deterministic byte slice of length n with a tag prefix,
|
||||
// useful for forcing chunker behavior with reproducible content.
|
||||
func bytesPattern(tag string, n int) []byte {
|
||||
out := make([]byte, n)
|
||||
for i := range out {
|
||||
out[i] = byte(tag[i%len(tag)] ^ byte(i&0xff))
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -156,7 +156,7 @@ func TestPurgeKeepLatest_WithNameFilter(t *testing.T) {
|
||||
err := v.PurgeSnapshotsWithOptions(&vaultik.SnapshotPurgeOptions{
|
||||
KeepLatest: true,
|
||||
Force: true,
|
||||
Name: "home",
|
||||
Names: []string{"home"},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -190,7 +190,7 @@ func TestPurgeKeepLatest_NameFilterNoMatch(t *testing.T) {
|
||||
err := v.PurgeSnapshotsWithOptions(&vaultik.SnapshotPurgeOptions{
|
||||
KeepLatest: true,
|
||||
Force: true,
|
||||
Name: "nonexistent",
|
||||
Names: []string{"nonexistent"},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -215,7 +215,7 @@ func TestPurgeOlderThan_WithNameFilter(t *testing.T) {
|
||||
err := v.PurgeSnapshotsWithOptions(&vaultik.SnapshotPurgeOptions{
|
||||
OlderThan: "365d",
|
||||
Force: true,
|
||||
Name: "home",
|
||||
Names: []string{"home"},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
@@ -22,11 +22,11 @@ import (
|
||||
|
||||
// SnapshotCreateOptions contains options for the snapshot create command
|
||||
type SnapshotCreateOptions struct {
|
||||
Daemon bool
|
||||
Cron bool
|
||||
Prune bool
|
||||
SkipErrors bool // Skip file read errors (log them loudly but continue)
|
||||
Snapshots []string // Optional list of snapshot names to process (empty = all)
|
||||
Cron bool
|
||||
Prune bool
|
||||
KeepNewerThan string // With --prune: keep snapshots newer than this duration (e.g. "4w"); default: keep only latest
|
||||
SkipErrors bool // Skip file read errors (log them loudly but continue)
|
||||
Snapshots []string // Optional list of snapshot names to process (empty = all)
|
||||
}
|
||||
|
||||
// CreateSnapshot executes the snapshot creation operation
|
||||
@@ -57,10 +57,6 @@ func (v *Vaultik) CreateSnapshot(opts *SnapshotCreateOptions) error {
|
||||
return fmt.Errorf("prune database: %w", err)
|
||||
}
|
||||
|
||||
if opts.Daemon {
|
||||
return v.RunDaemon(opts)
|
||||
}
|
||||
|
||||
// Determine which snapshots to process
|
||||
snapshotNames := opts.Snapshots
|
||||
if len(snapshotNames) == 0 {
|
||||
@@ -90,25 +86,41 @@ func (v *Vaultik) CreateSnapshot(opts *SnapshotCreateOptions) error {
|
||||
v.printfStdout("\nAll %d snapshots completed in %s\n", len(snapshotNames), time.Since(overallStartTime).Round(time.Second))
|
||||
}
|
||||
|
||||
// Prune old snapshots and unreferenced blobs if --prune was specified
|
||||
if opts.Prune {
|
||||
log.Info("Pruning enabled - deleting old snapshots and unreferenced blobs")
|
||||
v.printlnStdout("\nPruning old snapshots (keeping latest)...")
|
||||
|
||||
if err := v.PurgeSnapshotsWithOptions(&SnapshotPurgeOptions{
|
||||
KeepLatest: true,
|
||||
Force: true,
|
||||
}); err != nil {
|
||||
return fmt.Errorf("prune: purging old snapshots: %w", err)
|
||||
if err := v.runPostBackupPrune(snapshotNames, opts.KeepNewerThan); err != nil {
|
||||
return fmt.Errorf("post-backup prune: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
v.printlnStdout("Pruning unreferenced blobs...")
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := v.PruneBlobs(&PruneOptions{Force: true}); err != nil {
|
||||
return fmt.Errorf("prune: removing unreferenced blobs: %w", err)
|
||||
}
|
||||
// runPostBackupPrune drops older snapshots of the given names and removes
|
||||
// orphan blobs from remote storage. If keepNewerThan is set (e.g. "4w"),
|
||||
// snapshots newer than that duration are kept. Otherwise only the latest
|
||||
// snapshot of each name is kept.
|
||||
func (v *Vaultik) runPostBackupPrune(snapshotNames []string, keepNewerThan string) error {
|
||||
log.Info("Running post-backup prune", "snapshots", snapshotNames, "keep_newer_than", keepNewerThan)
|
||||
v.printlnStdout("\n=== Post-backup prune ===")
|
||||
|
||||
log.Info("Pruning complete")
|
||||
purgeOpts := &SnapshotPurgeOptions{
|
||||
Force: true,
|
||||
Names: snapshotNames,
|
||||
Quiet: true,
|
||||
}
|
||||
|
||||
if keepNewerThan != "" {
|
||||
purgeOpts.OlderThan = keepNewerThan
|
||||
} else {
|
||||
purgeOpts.KeepLatest = true
|
||||
}
|
||||
|
||||
if err := v.PurgeSnapshotsWithOptions(purgeOpts); err != nil {
|
||||
return fmt.Errorf("purging old snapshots: %w", err)
|
||||
}
|
||||
|
||||
if err := v.PruneBlobs(&PruneOptions{Force: true}); err != nil {
|
||||
return fmt.Errorf("pruning orphaned blobs: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -397,7 +409,26 @@ func (v *Vaultik) ListSnapshots(jsonOutput bool) error {
|
||||
return encoder.Encode(snapshots)
|
||||
}
|
||||
|
||||
return v.printSnapshotTable(snapshots)
|
||||
if err := v.printSnapshotTable(snapshots); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Warn about local snapshots that don't exist in remote storage.
|
||||
var stale []string
|
||||
for id := range localSnapshotMap {
|
||||
if !remoteSnapshots[id] {
|
||||
stale = append(stale, id)
|
||||
}
|
||||
}
|
||||
if len(stale) > 0 {
|
||||
v.printfStdout("\nWarning: %d local snapshot(s) not found in remote storage:\n", len(stale))
|
||||
for _, id := range stale {
|
||||
v.printfStdout(" %s\n", id)
|
||||
}
|
||||
v.printlnStdout("Run 'vaultik snapshot cleanup' to remove stale local records.")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// listRemoteSnapshotIDs returns a set of snapshot IDs found in remote storage
|
||||
@@ -583,18 +614,19 @@ func (v *Vaultik) printSnapshotTable(snapshots []SnapshotInfo) error {
|
||||
return w.Flush()
|
||||
}
|
||||
|
||||
// SnapshotPurgeOptions contains options for the snapshot purge command
|
||||
// SnapshotPurgeOptions contains options for the snapshot purge command.
|
||||
type SnapshotPurgeOptions struct {
|
||||
KeepLatest bool
|
||||
OlderThan string
|
||||
Force bool
|
||||
Name string // Filter purge to a specific snapshot name
|
||||
KeepLatest bool // Keep only the most recent snapshot per name
|
||||
OlderThan string // Drop snapshots older than this duration (e.g. "30d", "6m", "1y")
|
||||
Force bool // Skip confirmation prompt
|
||||
Names []string // If non-empty, only operate on snapshots with one of these names
|
||||
Quiet bool // Suppress informational output (used by --prune flag)
|
||||
}
|
||||
|
||||
// PurgeSnapshotsWithOptions removes old snapshots based on criteria.
|
||||
// When KeepLatest is true, retention is applied per snapshot name — the latest
|
||||
// snapshot for each distinct name is kept. If Name is non-empty, only snapshots
|
||||
// matching that name are considered for purge.
|
||||
// Retention is per-snapshot-name: KeepLatest keeps the latest of EACH configured
|
||||
// snapshot name, not the latest globally. This prevents `home` and `system`
|
||||
// snapshots from cannibalizing each other.
|
||||
func (v *Vaultik) PurgeSnapshotsWithOptions(opts *SnapshotPurgeOptions) error {
|
||||
// Sync with remote first
|
||||
if err := v.syncWithRemote(); err != nil {
|
||||
@@ -607,27 +639,28 @@ func (v *Vaultik) PurgeSnapshotsWithOptions(opts *SnapshotPurgeOptions) error {
|
||||
return fmt.Errorf("listing snapshots: %w", err)
|
||||
}
|
||||
|
||||
// Convert to SnapshotInfo format, only including completed snapshots
|
||||
snapshots := make([]SnapshotInfo, 0, len(dbSnapshots))
|
||||
for _, s := range dbSnapshots {
|
||||
if s.CompletedAt != nil {
|
||||
snapshots = append(snapshots, SnapshotInfo{
|
||||
ID: s.ID,
|
||||
Timestamp: s.StartedAt,
|
||||
CompressedSize: s.BlobSize,
|
||||
})
|
||||
}
|
||||
// Build name filter set if --snapshot was specified.
|
||||
nameFilter := make(map[string]struct{}, len(opts.Names))
|
||||
for _, n := range opts.Names {
|
||||
nameFilter[n] = struct{}{}
|
||||
}
|
||||
|
||||
// If --name is specified, filter to only snapshots matching that name
|
||||
if opts.Name != "" {
|
||||
filtered := make([]SnapshotInfo, 0, len(snapshots))
|
||||
for _, snap := range snapshots {
|
||||
if parseSnapshotName(snap.ID.String()) == opts.Name {
|
||||
filtered = append(filtered, snap)
|
||||
// Collect completed snapshots, applying the name filter.
|
||||
snapshots := make([]SnapshotInfo, 0, len(dbSnapshots))
|
||||
for _, s := range dbSnapshots {
|
||||
if s.CompletedAt == nil {
|
||||
continue
|
||||
}
|
||||
if len(nameFilter) > 0 {
|
||||
if _, ok := nameFilter[parseSnapshotName(s.ID.String())]; !ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
snapshots = filtered
|
||||
snapshots = append(snapshots, SnapshotInfo{
|
||||
ID: s.ID,
|
||||
Timestamp: s.StartedAt,
|
||||
CompressedSize: s.BlobSize,
|
||||
})
|
||||
}
|
||||
|
||||
// Sort by timestamp (newest first)
|
||||
@@ -638,21 +671,18 @@ func (v *Vaultik) PurgeSnapshotsWithOptions(opts *SnapshotPurgeOptions) error {
|
||||
var toDelete []SnapshotInfo
|
||||
|
||||
if opts.KeepLatest {
|
||||
// Keep the latest snapshot per snapshot name
|
||||
// Group snapshots by name, then mark all but the newest in each group
|
||||
latestByName := make(map[string]bool) // tracks whether we've seen the latest for each name
|
||||
// Keep the latest snapshot per snapshot name. Snapshots are sorted
|
||||
// newest-first, so the first occurrence of each name is kept.
|
||||
seen := make(map[string]bool)
|
||||
for _, snap := range snapshots {
|
||||
name := parseSnapshotName(snap.ID.String())
|
||||
if latestByName[name] {
|
||||
// Already kept the latest for this name — delete this one
|
||||
if seen[name] {
|
||||
toDelete = append(toDelete, snap)
|
||||
} else {
|
||||
// This is the latest (sorted newest-first) — keep it
|
||||
latestByName[name] = true
|
||||
continue
|
||||
}
|
||||
seen[name] = true
|
||||
}
|
||||
} else if opts.OlderThan != "" {
|
||||
// Parse duration
|
||||
duration, err := parseDuration(opts.OlderThan)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid duration: %w", err)
|
||||
@@ -667,22 +697,25 @@ func (v *Vaultik) PurgeSnapshotsWithOptions(opts *SnapshotPurgeOptions) error {
|
||||
}
|
||||
|
||||
if len(toDelete) == 0 {
|
||||
v.printlnStdout("No snapshots to delete")
|
||||
if !opts.Quiet {
|
||||
v.printlnStdout("No snapshots to delete")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return v.confirmAndExecutePurge(toDelete, opts.Force)
|
||||
return v.confirmAndExecutePurge(toDelete, opts.Force, opts.Quiet)
|
||||
}
|
||||
|
||||
// confirmAndExecutePurge shows deletion candidates, confirms with user, and deletes snapshots
|
||||
func (v *Vaultik) confirmAndExecutePurge(toDelete []SnapshotInfo, force bool) error {
|
||||
// Show what will be deleted
|
||||
v.printfStdout("The following snapshots will be deleted:\n\n")
|
||||
for _, snap := range toDelete {
|
||||
v.printfStdout(" %s (%s, %s)\n",
|
||||
snap.ID,
|
||||
snap.Timestamp.Format("2006-01-02 15:04:05"),
|
||||
formatBytes(snap.CompressedSize))
|
||||
func (v *Vaultik) confirmAndExecutePurge(toDelete []SnapshotInfo, force, quiet bool) error {
|
||||
if !quiet {
|
||||
v.printfStdout("The following snapshots will be deleted:\n\n")
|
||||
for _, snap := range toDelete {
|
||||
v.printfStdout(" %s (%s, %s)\n",
|
||||
snap.ID,
|
||||
snap.Timestamp.Format("2006-01-02 15:04:05"),
|
||||
formatBytes(snap.CompressedSize))
|
||||
}
|
||||
}
|
||||
|
||||
// Confirm unless --force is used
|
||||
@@ -698,7 +731,7 @@ func (v *Vaultik) confirmAndExecutePurge(toDelete []SnapshotInfo, force bool) er
|
||||
v.printlnStdout("Cancelled")
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
} else if !quiet {
|
||||
v.printfStdout("\nDeleting %d snapshot(s) (--force specified)\n", len(toDelete))
|
||||
}
|
||||
|
||||
@@ -714,10 +747,10 @@ func (v *Vaultik) confirmAndExecutePurge(toDelete []SnapshotInfo, force bool) er
|
||||
}
|
||||
}
|
||||
|
||||
v.printfStdout("Deleted %d snapshot(s)\n", len(toDelete))
|
||||
|
||||
// Note: Run 'vaultik prune' separately to clean up unreferenced blobs
|
||||
v.printlnStdout("\nNote: Run 'vaultik prune' to clean up unreferenced blobs.")
|
||||
if !quiet {
|
||||
v.printfStdout("Deleted %d snapshot(s)\n", len(toDelete))
|
||||
v.printlnStdout("\nNote: Run 'vaultik prune' to clean up unreferenced blobs.")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -731,15 +764,17 @@ func (v *Vaultik) VerifySnapshot(snapshotID string, deep bool) error {
|
||||
return v.VerifySnapshotWithOptions(snapshotID, opts)
|
||||
}
|
||||
|
||||
// VerifySnapshotWithOptions checks snapshot integrity with full options
|
||||
// VerifySnapshotWithOptions checks snapshot integrity with full options.
|
||||
// Deep verification is delegated to RunDeepVerify so this function only
|
||||
// implements the shallow (existence-only) path.
|
||||
func (v *Vaultik) VerifySnapshotWithOptions(snapshotID string, opts *VerifyOptions) error {
|
||||
if opts.Deep {
|
||||
return v.RunDeepVerify(snapshotID, opts)
|
||||
}
|
||||
result := &VerifyResult{
|
||||
SnapshotID: snapshotID,
|
||||
Mode: "shallow",
|
||||
}
|
||||
if opts.Deep {
|
||||
result.Mode = "deep"
|
||||
}
|
||||
|
||||
v.printVerifyHeader(snapshotID, opts)
|
||||
|
||||
@@ -777,22 +812,12 @@ func (v *Vaultik) VerifySnapshotWithOptions(snapshotID string, opts *VerifyOptio
|
||||
return v.formatVerifyResult(result, manifest, opts)
|
||||
}
|
||||
|
||||
// printVerifyHeader prints the snapshot ID and parsed timestamp for verification output
|
||||
// printVerifyHeader prints the snapshot ID and parsed timestamp for verification output.
|
||||
// Snapshot ID format: hostname[_name]_<RFC3339>
|
||||
func (v *Vaultik) printVerifyHeader(snapshotID string, opts *VerifyOptions) {
|
||||
// Parse snapshot ID to extract timestamp
|
||||
parts := strings.Split(snapshotID, "-")
|
||||
var snapshotTime time.Time
|
||||
if len(parts) >= 3 {
|
||||
// Format: hostname-YYYYMMDD-HHMMSSZ
|
||||
dateStr := parts[len(parts)-2]
|
||||
timeStr := parts[len(parts)-1]
|
||||
if len(dateStr) == 8 && len(timeStr) == 7 && strings.HasSuffix(timeStr, "Z") {
|
||||
timeStr = timeStr[:6] // Remove Z
|
||||
timestamp, err := time.Parse("20060102150405", dateStr+timeStr)
|
||||
if err == nil {
|
||||
snapshotTime = timestamp
|
||||
}
|
||||
}
|
||||
if t, err := parseSnapshotTimestamp(snapshotID); err == nil {
|
||||
snapshotTime = t
|
||||
}
|
||||
|
||||
if !opts.JSON {
|
||||
@@ -809,7 +834,7 @@ func (v *Vaultik) verifyManifestBlobsExist(manifest *snapshot.Manifest, opts *Ve
|
||||
for _, blob := range manifest.Blobs {
|
||||
blobPath := fmt.Sprintf("blobs/%s/%s/%s", blob.Hash[:2], blob.Hash[2:4], blob.Hash)
|
||||
|
||||
// Just check existence (deep verification is handled by RunDeepVerify)
|
||||
// Shallow: just check existence (deep verification is handled by RunDeepVerify)
|
||||
_, err := v.Storage.Stat(v.ctx, blobPath)
|
||||
if err != nil {
|
||||
if !opts.JSON {
|
||||
@@ -867,6 +892,41 @@ func (v *Vaultik) outputVerifyJSON(result *VerifyResult) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// CleanupLocalSnapshots removes local snapshot records that have no
|
||||
// corresponding metadata in remote storage. These are typically left
|
||||
// behind by incomplete or interrupted backups.
|
||||
func (v *Vaultik) CleanupLocalSnapshots() error {
|
||||
remoteSnapshots, err := v.listRemoteSnapshotIDs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
localSnapshots, err := v.Repositories.Snapshots.ListRecent(v.ctx, 10000)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing local snapshots: %w", err)
|
||||
}
|
||||
|
||||
var removed int
|
||||
for _, snap := range localSnapshots {
|
||||
id := snap.ID.String()
|
||||
if !remoteSnapshots[id] {
|
||||
v.printfStdout("Removing stale local record: %s\n", id)
|
||||
if err := v.deleteSnapshotFromLocalDB(id); err != nil {
|
||||
log.Error("Failed to delete local snapshot", "snapshot_id", id, "error", err)
|
||||
continue
|
||||
}
|
||||
removed++
|
||||
}
|
||||
}
|
||||
|
||||
if removed == 0 {
|
||||
v.printlnStdout("No stale local snapshots found.")
|
||||
} else {
|
||||
v.printfStdout("Removed %d stale local snapshot record(s).\n", removed)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Helper methods that were previously on SnapshotApp
|
||||
|
||||
func (v *Vaultik) downloadManifest(snapshotID string) (*snapshot.Manifest, error) {
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
"git.eeqj.de/sneak/vaultik/internal/snapshot"
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/klauspost/compress/zstd"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
// VerifyOptions contains options for the verify command
|
||||
@@ -57,9 +57,8 @@ func (v *Vaultik) RunDeepVerify(snapshotID string, opts *VerifyOptions) error {
|
||||
}
|
||||
|
||||
if !v.CanDecrypt() {
|
||||
return v.deepVerifyFailure(result, opts,
|
||||
"VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification",
|
||||
fmt.Errorf("VAULTIK_AGE_SECRET_KEY environment variable not set - required for deep verification"))
|
||||
msg := "VAULTIK_AGE_SECRET_KEY not set; required for deep verification"
|
||||
return v.deepVerifyFailure(result, opts, msg, fmt.Errorf("%s", msg))
|
||||
}
|
||||
|
||||
log.Info("Starting snapshot verification", "snapshot_id", snapshotID, "mode", "deep")
|
||||
@@ -258,7 +257,7 @@ func (v *Vaultik) decryptAndLoadDatabase(reader io.ReadCloser, secretKey string)
|
||||
log.Info("Database decompressed", "size", humanize.Bytes(uint64(written)))
|
||||
|
||||
// Open the database
|
||||
db, err := sql.Open("sqlite3", tempPath)
|
||||
db, err := sql.Open("sqlite", tempPath)
|
||||
if err != nil {
|
||||
_ = os.Remove(tempPath)
|
||||
return nil, fmt.Errorf("failed to open database: %w", err)
|
||||
|
||||
@@ -20,9 +20,6 @@ s3:
|
||||
region: us-east-1
|
||||
use_ssl: true
|
||||
part_size: 5242880 # 5MB
|
||||
backup_interval: 1h
|
||||
full_scan_interval: 24h
|
||||
min_time_between_run: 15m
|
||||
index_path: /tmp/vaultik-test.sqlite
|
||||
chunk_size: 10MB
|
||||
blob_size_limit: 10GB
|
||||
|
||||
@@ -17,9 +17,6 @@ s3:
|
||||
region: us-east-1
|
||||
use_ssl: false
|
||||
part_size: 5242880 # 5MB
|
||||
backup_interval: 1h
|
||||
full_scan_interval: 24h
|
||||
min_time_between_run: 15m
|
||||
index_path: /tmp/vaultik-integration-test.sqlite
|
||||
chunk_size: 10MB
|
||||
blob_size_limit: 10GB
|
||||
|
||||
Reference in New Issue
Block a user