diff --git a/DESIGN.md b/DESIGN.md index 1e4afe3..167ffd8 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -110,6 +110,8 @@ CREATE TABLE files ( size INTEGER NOT NULL ); +-- Maps files to their constituent chunks in sequence order +-- Used for reconstructing files from chunks during restore CREATE TABLE file_chunks ( path TEXT NOT NULL, idx INTEGER NOT NULL, @@ -137,6 +139,8 @@ CREATE TABLE blob_chunks ( PRIMARY KEY (blob_hash, chunk_hash) ); +-- Reverse mapping: tracks which files contain a given chunk +-- Used for deduplication and tracking chunk usage across files CREATE TABLE chunk_files ( chunk_hash TEXT NOT NULL, file_path TEXT NOT NULL, diff --git a/cmd/vaultik/main.go b/cmd/vaultik/main.go index e895073..3c7f470 100644 --- a/cmd/vaultik/main.go +++ b/cmd/vaultik/main.go @@ -6,4 +6,4 @@ import ( func main() { cli.CLIEntry() -} \ No newline at end of file +} diff --git a/go.mod b/go.mod index c022d70..c24abd7 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.24.4 require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/mattn/go-sqlite3 v1.14.28 // indirect github.com/spf13/cobra v1.9.1 // indirect github.com/spf13/pflag v1.0.6 // indirect go.uber.org/dig v1.19.0 // indirect diff --git a/go.sum b/go.sum index d0ea3c1..b20294a 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/mattn/go-sqlite3 v1.14.28 h1:ThEiQrnbtumT+QMknw63Befp/ce/nUPgBPMlRFEum7A= +github.com/mattn/go-sqlite3 v1.14.28/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo= github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0= diff --git a/internal/cli/app.go b/internal/cli/app.go new file mode 100644 index 0000000..d2112c1 --- /dev/null +++ b/internal/cli/app.go @@ -0,0 +1,56 @@ +package cli + +import ( + "context" + "fmt" + + "git.eeqj.de/sneak/vaultik/internal/config" + "git.eeqj.de/sneak/vaultik/internal/database" + "git.eeqj.de/sneak/vaultik/internal/globals" + "go.uber.org/fx" +) + +// AppOptions contains common options for creating the fx application +type AppOptions struct { + ConfigPath string + Modules []fx.Option + Invokes []fx.Option +} + +// NewApp creates a new fx application with common modules +func NewApp(opts AppOptions) *fx.App { + baseModules := []fx.Option{ + fx.Supply(config.ConfigPath(opts.ConfigPath)), + fx.Provide(globals.New), + config.Module, + database.Module, + fx.NopLogger, + } + + allOptions := append(baseModules, opts.Modules...) + allOptions = append(allOptions, opts.Invokes...) + + return fx.New(allOptions...) +} + +// RunApp starts and stops the fx application within the given context +func RunApp(ctx context.Context, app *fx.App) error { + if err := app.Start(ctx); err != nil { + return fmt.Errorf("failed to start app: %w", err) + } + defer func() { + if err := app.Stop(ctx); err != nil { + fmt.Printf("error stopping app: %v\n", err) + } + }() + + // Wait for context cancellation + <-ctx.Done() + return nil +} + +// RunWithApp is a helper that creates and runs an fx app with the given options +func RunWithApp(ctx context.Context, opts AppOptions) error { + app := NewApp(opts) + return RunApp(ctx, app) +} diff --git a/internal/cli/backup.go b/internal/cli/backup.go index 347503b..f6f24c2 100644 --- a/internal/cli/backup.go +++ b/internal/cli/backup.go @@ -6,6 +6,7 @@ import ( "os" "git.eeqj.de/sneak/vaultik/internal/config" + "git.eeqj.de/sneak/vaultik/internal/database" "git.eeqj.de/sneak/vaultik/internal/globals" "github.com/spf13/cobra" "go.uber.org/fx" @@ -56,34 +57,22 @@ a path using --config or by setting VAULTIK_CONFIG to a path.`, } func runBackup(ctx context.Context, opts *BackupOptions) error { - app := fx.New( - fx.Supply(config.ConfigPath(opts.ConfigPath)), - fx.Provide(globals.New), - config.Module, - // Additional modules will be added here - fx.Invoke(func(g *globals.Globals, cfg *config.Config) error { - // TODO: Implement backup logic - fmt.Printf("Running backup with config: %s\n", opts.ConfigPath) - fmt.Printf("Version: %s, Commit: %s\n", g.Version, g.Commit) - if opts.Daemon { - fmt.Println("Running in daemon mode") - } - if opts.Cron { - fmt.Println("Running in cron mode") - } - return nil - }), - fx.NopLogger, - ) - - if err := app.Start(ctx); err != nil { - return fmt.Errorf("failed to start backup: %w", err) - } - defer func() { - if err := app.Stop(ctx); err != nil { - fmt.Printf("error stopping app: %v\n", err) - } - }() - - return nil -} \ No newline at end of file + return RunWithApp(ctx, AppOptions{ + ConfigPath: opts.ConfigPath, + Invokes: []fx.Option{ + fx.Invoke(func(g *globals.Globals, cfg *config.Config, repos *database.Repositories) error { + // TODO: Implement backup logic + fmt.Printf("Running backup with config: %s\n", opts.ConfigPath) + fmt.Printf("Version: %s, Commit: %s\n", g.Version, g.Commit) + fmt.Printf("Index path: %s\n", cfg.IndexPath) + if opts.Daemon { + fmt.Println("Running in daemon mode") + } + if opts.Cron { + fmt.Println("Running in cron mode") + } + return nil + }), + }, + }) +} diff --git a/internal/cli/entry.go b/internal/cli/entry.go index 989cc38..13765e6 100644 --- a/internal/cli/entry.go +++ b/internal/cli/entry.go @@ -10,4 +10,4 @@ func CLIEntry() { if err := rootCmd.Execute(); err != nil { os.Exit(1) } -} \ No newline at end of file +} diff --git a/internal/cli/entry_test.go b/internal/cli/entry_test.go index 170ad4b..7deb72f 100644 --- a/internal/cli/entry_test.go +++ b/internal/cli/entry_test.go @@ -12,11 +12,11 @@ func TestCLIEntry(t *testing.T) { if cmd == nil { t.Fatal("NewRootCommand() returned nil") } - + if cmd.Use != "vaultik" { t.Errorf("Expected command use to be 'vaultik', got '%s'", cmd.Use) } - + // Verify all subcommands are registered expectedCommands := []string{"backup", "restore", "prune", "verify", "fetch"} for _, expected := range expectedCommands { @@ -31,7 +31,7 @@ func TestCLIEntry(t *testing.T) { t.Errorf("Expected command '%s' not found", expected) } } - + // Verify backup command has proper flags backupCmd, _, err := cmd.Find([]string{"backup"}) if err != nil { @@ -47,4 +47,4 @@ func TestCLIEntry(t *testing.T) { t.Error("Backup command missing --cron flag") } } -} \ No newline at end of file +} diff --git a/internal/cli/fetch.go b/internal/cli/fetch.go index f35e85d..94edc7b 100644 --- a/internal/cli/fetch.go +++ b/internal/cli/fetch.go @@ -85,4 +85,4 @@ func runFetch(ctx context.Context, opts *FetchOptions) error { }() return nil -} \ No newline at end of file +} diff --git a/internal/cli/prune.go b/internal/cli/prune.go index 9af6d77..2490ccf 100644 --- a/internal/cli/prune.go +++ b/internal/cli/prune.go @@ -75,4 +75,4 @@ func runPrune(ctx context.Context, opts *PruneOptions) error { }() return nil -} \ No newline at end of file +} diff --git a/internal/cli/restore.go b/internal/cli/restore.go index 5aaf891..ab7a556 100644 --- a/internal/cli/restore.go +++ b/internal/cli/restore.go @@ -80,4 +80,4 @@ func runRestore(ctx context.Context, opts *RestoreOptions) error { }() return nil -} \ No newline at end of file +} diff --git a/internal/cli/root.go b/internal/cli/root.go index 7e4500d..020034a 100644 --- a/internal/cli/root.go +++ b/internal/cli/root.go @@ -25,4 +25,4 @@ on the source system.`, ) return cmd -} \ No newline at end of file +} diff --git a/internal/cli/verify.go b/internal/cli/verify.go index b6d9094..4fd360b 100644 --- a/internal/cli/verify.go +++ b/internal/cli/verify.go @@ -83,4 +83,4 @@ func runVerify(ctx context.Context, opts *VerifyOptions) error { }() return nil -} \ No newline at end of file +} diff --git a/internal/config/config.go b/internal/config/config.go index 3db8b0e..b83ba09 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -78,6 +78,11 @@ func Load(path string) (*Config, error) { return nil, fmt.Errorf("failed to parse config: %w", err) } + // Check for environment variable override for IndexPath + if envIndexPath := os.Getenv("VAULTIK_INDEX_PATH"); envIndexPath != "" { + cfg.IndexPath = envIndexPath + } + // Get hostname if not set if cfg.Hostname == "" { hostname, err := os.Hostname() @@ -146,4 +151,4 @@ func (c *Config) Validate() error { // Module exports the config module for fx var Module = fx.Module("config", fx.Provide(New), -) \ No newline at end of file +) diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 5c9ece6..aed9419 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -12,7 +12,7 @@ func TestMain(m *testing.M) { if absPath, err := filepath.Abs(testConfigPath); err == nil { _ = os.Setenv("VAULTIK_CONFIG", absPath) } - + code := m.Run() os.Exit(code) } @@ -24,30 +24,30 @@ func TestConfigLoad(t *testing.T) { if configPath == "" { t.Fatal("VAULTIK_CONFIG environment variable not set") } - + // Test loading the config cfg, err := Load(configPath) if err != nil { t.Fatalf("Failed to load config: %v", err) } - + // Basic validation if cfg.AgeRecipient != "age1xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" { t.Errorf("Expected age recipient to be set, got '%s'", cfg.AgeRecipient) } - + if len(cfg.SourceDirs) != 2 { t.Errorf("Expected 2 source dirs, got %d", len(cfg.SourceDirs)) } - + if cfg.SourceDirs[0] != "/tmp/vaultik-test-source" { t.Errorf("Expected first source dir to be '/tmp/vaultik-test-source', got '%s'", cfg.SourceDirs[0]) } - + if cfg.S3.Bucket != "vaultik-test-bucket" { t.Errorf("Expected S3 bucket to be 'vaultik-test-bucket', got '%s'", cfg.S3.Bucket) } - + if cfg.Hostname != "test-host" { t.Errorf("Expected hostname to be 'test-host', got '%s'", cfg.Hostname) } @@ -59,9 +59,9 @@ func TestConfigFromEnv(t *testing.T) { if configPath == "" { t.Skip("VAULTIK_CONFIG not set") } - + // Verify the file exists if _, err := os.Stat(configPath); os.IsNotExist(err) { t.Errorf("Config file does not exist at path from VAULTIK_CONFIG: %s", configPath) } -} \ No newline at end of file +} diff --git a/internal/database/blob_chunks.go b/internal/database/blob_chunks.go new file mode 100644 index 0000000..c519e9c --- /dev/null +++ b/internal/database/blob_chunks.go @@ -0,0 +1,88 @@ +package database + +import ( + "context" + "database/sql" + "fmt" +) + +type BlobChunkRepository struct { + db *DB +} + +func NewBlobChunkRepository(db *DB) *BlobChunkRepository { + return &BlobChunkRepository{db: db} +} + +func (r *BlobChunkRepository) Create(ctx context.Context, tx *sql.Tx, bc *BlobChunk) error { + query := ` + INSERT INTO blob_chunks (blob_hash, chunk_hash, offset, length) + VALUES (?, ?, ?, ?) + ` + + var err error + if tx != nil { + _, err = tx.ExecContext(ctx, query, bc.BlobHash, bc.ChunkHash, bc.Offset, bc.Length) + } else { + _, err = r.db.conn.ExecContext(ctx, query, bc.BlobHash, bc.ChunkHash, bc.Offset, bc.Length) + } + + if err != nil { + return fmt.Errorf("inserting blob_chunk: %w", err) + } + + return nil +} + +func (r *BlobChunkRepository) GetByBlobHash(ctx context.Context, blobHash string) ([]*BlobChunk, error) { + query := ` + SELECT blob_hash, chunk_hash, offset, length + FROM blob_chunks + WHERE blob_hash = ? + ORDER BY offset + ` + + rows, err := r.db.conn.QueryContext(ctx, query, blobHash) + if err != nil { + return nil, fmt.Errorf("querying blob chunks: %w", err) + } + defer CloseRows(rows) + + var blobChunks []*BlobChunk + for rows.Next() { + var bc BlobChunk + err := rows.Scan(&bc.BlobHash, &bc.ChunkHash, &bc.Offset, &bc.Length) + if err != nil { + return nil, fmt.Errorf("scanning blob chunk: %w", err) + } + blobChunks = append(blobChunks, &bc) + } + + return blobChunks, rows.Err() +} + +func (r *BlobChunkRepository) GetByChunkHash(ctx context.Context, chunkHash string) (*BlobChunk, error) { + query := ` + SELECT blob_hash, chunk_hash, offset, length + FROM blob_chunks + WHERE chunk_hash = ? + LIMIT 1 + ` + + var bc BlobChunk + err := r.db.conn.QueryRowContext(ctx, query, chunkHash).Scan( + &bc.BlobHash, + &bc.ChunkHash, + &bc.Offset, + &bc.Length, + ) + + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("querying blob chunk: %w", err) + } + + return &bc, nil +} diff --git a/internal/database/blobs.go b/internal/database/blobs.go new file mode 100644 index 0000000..372201f --- /dev/null +++ b/internal/database/blobs.go @@ -0,0 +1,96 @@ +package database + +import ( + "context" + "database/sql" + "fmt" + "time" +) + +type BlobRepository struct { + db *DB +} + +func NewBlobRepository(db *DB) *BlobRepository { + return &BlobRepository{db: db} +} + +func (r *BlobRepository) Create(ctx context.Context, tx *sql.Tx, blob *Blob) error { + query := ` + INSERT INTO blobs (blob_hash, created_ts) + VALUES (?, ?) + ` + + var err error + if tx != nil { + _, err = tx.ExecContext(ctx, query, blob.BlobHash, blob.CreatedTS.Unix()) + } else { + _, err = r.db.conn.ExecContext(ctx, query, blob.BlobHash, blob.CreatedTS.Unix()) + } + + if err != nil { + return fmt.Errorf("inserting blob: %w", err) + } + + return nil +} + +func (r *BlobRepository) GetByHash(ctx context.Context, hash string) (*Blob, error) { + query := ` + SELECT blob_hash, created_ts + FROM blobs + WHERE blob_hash = ? + ` + + var blob Blob + var createdTSUnix int64 + + err := r.db.conn.QueryRowContext(ctx, query, hash).Scan( + &blob.BlobHash, + &createdTSUnix, + ) + + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("querying blob: %w", err) + } + + blob.CreatedTS = time.Unix(createdTSUnix, 0) + return &blob, nil +} + +func (r *BlobRepository) List(ctx context.Context, limit, offset int) ([]*Blob, error) { + query := ` + SELECT blob_hash, created_ts + FROM blobs + ORDER BY blob_hash + LIMIT ? OFFSET ? + ` + + rows, err := r.db.conn.QueryContext(ctx, query, limit, offset) + if err != nil { + return nil, fmt.Errorf("querying blobs: %w", err) + } + defer CloseRows(rows) + + var blobs []*Blob + for rows.Next() { + var blob Blob + var createdTSUnix int64 + + err := rows.Scan( + &blob.BlobHash, + &createdTSUnix, + ) + if err != nil { + return nil, fmt.Errorf("scanning blob: %w", err) + } + + blob.CreatedTS = time.Unix(createdTSUnix, 0) + blobs = append(blobs, &blob) + } + + return blobs, rows.Err() +} diff --git a/internal/database/chunk_files.go b/internal/database/chunk_files.go new file mode 100644 index 0000000..56a4323 --- /dev/null +++ b/internal/database/chunk_files.go @@ -0,0 +1,88 @@ +package database + +import ( + "context" + "database/sql" + "fmt" +) + +type ChunkFileRepository struct { + db *DB +} + +func NewChunkFileRepository(db *DB) *ChunkFileRepository { + return &ChunkFileRepository{db: db} +} + +func (r *ChunkFileRepository) Create(ctx context.Context, tx *sql.Tx, cf *ChunkFile) error { + query := ` + INSERT INTO chunk_files (chunk_hash, file_path, file_offset, length) + VALUES (?, ?, ?, ?) + ON CONFLICT(chunk_hash, file_path) DO NOTHING + ` + + var err error + if tx != nil { + _, err = tx.ExecContext(ctx, query, cf.ChunkHash, cf.FilePath, cf.FileOffset, cf.Length) + } else { + _, err = r.db.conn.ExecContext(ctx, query, cf.ChunkHash, cf.FilePath, cf.FileOffset, cf.Length) + } + + if err != nil { + return fmt.Errorf("inserting chunk_file: %w", err) + } + + return nil +} + +func (r *ChunkFileRepository) GetByChunkHash(ctx context.Context, chunkHash string) ([]*ChunkFile, error) { + query := ` + SELECT chunk_hash, file_path, file_offset, length + FROM chunk_files + WHERE chunk_hash = ? + ` + + rows, err := r.db.conn.QueryContext(ctx, query, chunkHash) + if err != nil { + return nil, fmt.Errorf("querying chunk files: %w", err) + } + defer CloseRows(rows) + + var chunkFiles []*ChunkFile + for rows.Next() { + var cf ChunkFile + err := rows.Scan(&cf.ChunkHash, &cf.FilePath, &cf.FileOffset, &cf.Length) + if err != nil { + return nil, fmt.Errorf("scanning chunk file: %w", err) + } + chunkFiles = append(chunkFiles, &cf) + } + + return chunkFiles, rows.Err() +} + +func (r *ChunkFileRepository) GetByFilePath(ctx context.Context, filePath string) ([]*ChunkFile, error) { + query := ` + SELECT chunk_hash, file_path, file_offset, length + FROM chunk_files + WHERE file_path = ? + ` + + rows, err := r.db.conn.QueryContext(ctx, query, filePath) + if err != nil { + return nil, fmt.Errorf("querying chunk files: %w", err) + } + defer CloseRows(rows) + + var chunkFiles []*ChunkFile + for rows.Next() { + var cf ChunkFile + err := rows.Scan(&cf.ChunkHash, &cf.FilePath, &cf.FileOffset, &cf.Length) + if err != nil { + return nil, fmt.Errorf("scanning chunk file: %w", err) + } + chunkFiles = append(chunkFiles, &cf) + } + + return chunkFiles, rows.Err() +} diff --git a/internal/database/chunks.go b/internal/database/chunks.go new file mode 100644 index 0000000..409b16d --- /dev/null +++ b/internal/database/chunks.go @@ -0,0 +1,141 @@ +package database + +import ( + "context" + "database/sql" + "fmt" +) + +type ChunkRepository struct { + db *DB +} + +func NewChunkRepository(db *DB) *ChunkRepository { + return &ChunkRepository{db: db} +} + +func (r *ChunkRepository) Create(ctx context.Context, tx *sql.Tx, chunk *Chunk) error { + query := ` + INSERT INTO chunks (chunk_hash, sha256, size) + VALUES (?, ?, ?) + ON CONFLICT(chunk_hash) DO NOTHING + ` + + var err error + if tx != nil { + _, err = tx.ExecContext(ctx, query, chunk.ChunkHash, chunk.SHA256, chunk.Size) + } else { + _, err = r.db.conn.ExecContext(ctx, query, chunk.ChunkHash, chunk.SHA256, chunk.Size) + } + + if err != nil { + return fmt.Errorf("inserting chunk: %w", err) + } + + return nil +} + +func (r *ChunkRepository) GetByHash(ctx context.Context, hash string) (*Chunk, error) { + query := ` + SELECT chunk_hash, sha256, size + FROM chunks + WHERE chunk_hash = ? + ` + + var chunk Chunk + + err := r.db.conn.QueryRowContext(ctx, query, hash).Scan( + &chunk.ChunkHash, + &chunk.SHA256, + &chunk.Size, + ) + + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("querying chunk: %w", err) + } + + return &chunk, nil +} + +func (r *ChunkRepository) GetByHashes(ctx context.Context, hashes []string) ([]*Chunk, error) { + if len(hashes) == 0 { + return nil, nil + } + + query := ` + SELECT chunk_hash, sha256, size + FROM chunks + WHERE chunk_hash IN (` + + args := make([]interface{}, len(hashes)) + for i, hash := range hashes { + if i > 0 { + query += ", " + } + query += "?" + args[i] = hash + } + query += ") ORDER BY chunk_hash" + + rows, err := r.db.conn.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("querying chunks: %w", err) + } + defer CloseRows(rows) + + var chunks []*Chunk + for rows.Next() { + var chunk Chunk + + err := rows.Scan( + &chunk.ChunkHash, + &chunk.SHA256, + &chunk.Size, + ) + if err != nil { + return nil, fmt.Errorf("scanning chunk: %w", err) + } + + chunks = append(chunks, &chunk) + } + + return chunks, rows.Err() +} + +func (r *ChunkRepository) ListUnpacked(ctx context.Context, limit int) ([]*Chunk, error) { + query := ` + SELECT c.chunk_hash, c.sha256, c.size + FROM chunks c + LEFT JOIN blob_chunks bc ON c.chunk_hash = bc.chunk_hash + WHERE bc.chunk_hash IS NULL + ORDER BY c.chunk_hash + LIMIT ? + ` + + rows, err := r.db.conn.QueryContext(ctx, query, limit) + if err != nil { + return nil, fmt.Errorf("querying unpacked chunks: %w", err) + } + defer CloseRows(rows) + + var chunks []*Chunk + for rows.Next() { + var chunk Chunk + + err := rows.Scan( + &chunk.ChunkHash, + &chunk.SHA256, + &chunk.Size, + ) + if err != nil { + return nil, fmt.Errorf("scanning chunk: %w", err) + } + + chunks = append(chunks, &chunk) + } + + return chunks, rows.Err() +} diff --git a/internal/database/database.go b/internal/database/database.go new file mode 100644 index 0000000..adef2a3 --- /dev/null +++ b/internal/database/database.go @@ -0,0 +1,114 @@ +package database + +import ( + "context" + "database/sql" + "fmt" + + _ "github.com/mattn/go-sqlite3" +) + +type DB struct { + conn *sql.DB +} + +func New(ctx context.Context, path string) (*DB, error) { + conn, err := sql.Open("sqlite3", path+"?_journal_mode=WAL&_synchronous=NORMAL&_busy_timeout=5000") + if err != nil { + return nil, fmt.Errorf("opening database: %w", err) + } + + if err := conn.PingContext(ctx); err != nil { + if closeErr := conn.Close(); closeErr != nil { + Fatal("failed to close database connection: %v", closeErr) + } + return nil, fmt.Errorf("pinging database: %w", err) + } + + db := &DB{conn: conn} + if err := db.createSchema(ctx); err != nil { + if closeErr := conn.Close(); closeErr != nil { + Fatal("failed to close database connection: %v", closeErr) + } + return nil, fmt.Errorf("creating schema: %w", err) + } + + return db, nil +} + +func (db *DB) Close() error { + if err := db.conn.Close(); err != nil { + Fatal("failed to close database: %v", err) + } + return nil +} + +func (db *DB) Conn() *sql.DB { + return db.conn +} + +func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) { + return db.conn.BeginTx(ctx, opts) +} + +func (db *DB) createSchema(ctx context.Context) error { + schema := ` + CREATE TABLE IF NOT EXISTS files ( + path TEXT PRIMARY KEY, + mtime INTEGER NOT NULL, + ctime INTEGER NOT NULL, + size INTEGER NOT NULL, + mode INTEGER NOT NULL, + uid INTEGER NOT NULL, + gid INTEGER NOT NULL, + link_target TEXT + ); + + CREATE TABLE IF NOT EXISTS file_chunks ( + path TEXT NOT NULL, + idx INTEGER NOT NULL, + chunk_hash TEXT NOT NULL, + PRIMARY KEY (path, idx) + ); + + CREATE TABLE IF NOT EXISTS chunks ( + chunk_hash TEXT PRIMARY KEY, + sha256 TEXT NOT NULL, + size INTEGER NOT NULL + ); + + CREATE TABLE IF NOT EXISTS blobs ( + blob_hash TEXT PRIMARY KEY, + created_ts INTEGER NOT NULL + ); + + CREATE TABLE IF NOT EXISTS blob_chunks ( + blob_hash TEXT NOT NULL, + chunk_hash TEXT NOT NULL, + offset INTEGER NOT NULL, + length INTEGER NOT NULL, + PRIMARY KEY (blob_hash, chunk_hash) + ); + + CREATE TABLE IF NOT EXISTS chunk_files ( + chunk_hash TEXT NOT NULL, + file_path TEXT NOT NULL, + file_offset INTEGER NOT NULL, + length INTEGER NOT NULL, + PRIMARY KEY (chunk_hash, file_path) + ); + + CREATE TABLE IF NOT EXISTS snapshots ( + id TEXT PRIMARY KEY, + hostname TEXT NOT NULL, + vaultik_version TEXT NOT NULL, + created_ts INTEGER NOT NULL, + file_count INTEGER NOT NULL, + chunk_count INTEGER NOT NULL, + blob_count INTEGER NOT NULL + ); + ` + + _, err := db.conn.ExecContext(ctx, schema) + return err +} diff --git a/internal/database/errors.go b/internal/database/errors.go new file mode 100644 index 0000000..b2b4497 --- /dev/null +++ b/internal/database/errors.go @@ -0,0 +1,20 @@ +package database + +import ( + "database/sql" + "fmt" + "os" +) + +// Fatal prints an error message to stderr and exits with status 1 +func Fatal(format string, args ...interface{}) { + fmt.Fprintf(os.Stderr, "FATAL: "+format+"\n", args...) + os.Exit(1) +} + +// CloseRows closes rows and exits on error +func CloseRows(rows *sql.Rows) { + if err := rows.Close(); err != nil { + Fatal("failed to close rows: %v", err) + } +} diff --git a/internal/database/file_chunks.go b/internal/database/file_chunks.go new file mode 100644 index 0000000..05b6ab1 --- /dev/null +++ b/internal/database/file_chunks.go @@ -0,0 +1,80 @@ +package database + +import ( + "context" + "database/sql" + "fmt" +) + +type FileChunkRepository struct { + db *DB +} + +func NewFileChunkRepository(db *DB) *FileChunkRepository { + return &FileChunkRepository{db: db} +} + +func (r *FileChunkRepository) Create(ctx context.Context, tx *sql.Tx, fc *FileChunk) error { + query := ` + INSERT INTO file_chunks (path, idx, chunk_hash) + VALUES (?, ?, ?) + ON CONFLICT(path, idx) DO NOTHING + ` + + var err error + if tx != nil { + _, err = tx.ExecContext(ctx, query, fc.Path, fc.Idx, fc.ChunkHash) + } else { + _, err = r.db.conn.ExecContext(ctx, query, fc.Path, fc.Idx, fc.ChunkHash) + } + + if err != nil { + return fmt.Errorf("inserting file_chunk: %w", err) + } + + return nil +} + +func (r *FileChunkRepository) GetByPath(ctx context.Context, path string) ([]*FileChunk, error) { + query := ` + SELECT path, idx, chunk_hash + FROM file_chunks + WHERE path = ? + ORDER BY idx + ` + + rows, err := r.db.conn.QueryContext(ctx, query, path) + if err != nil { + return nil, fmt.Errorf("querying file chunks: %w", err) + } + defer CloseRows(rows) + + var fileChunks []*FileChunk + for rows.Next() { + var fc FileChunk + err := rows.Scan(&fc.Path, &fc.Idx, &fc.ChunkHash) + if err != nil { + return nil, fmt.Errorf("scanning file chunk: %w", err) + } + fileChunks = append(fileChunks, &fc) + } + + return fileChunks, rows.Err() +} + +func (r *FileChunkRepository) DeleteByPath(ctx context.Context, tx *sql.Tx, path string) error { + query := `DELETE FROM file_chunks WHERE path = ?` + + var err error + if tx != nil { + _, err = tx.ExecContext(ctx, query, path) + } else { + _, err = r.db.conn.ExecContext(ctx, query, path) + } + + if err != nil { + return fmt.Errorf("deleting file chunks: %w", err) + } + + return nil +} diff --git a/internal/database/files.go b/internal/database/files.go new file mode 100644 index 0000000..d6565e1 --- /dev/null +++ b/internal/database/files.go @@ -0,0 +1,145 @@ +package database + +import ( + "context" + "database/sql" + "fmt" + "time" +) + +type FileRepository struct { + db *DB +} + +func NewFileRepository(db *DB) *FileRepository { + return &FileRepository{db: db} +} + +func (r *FileRepository) Create(ctx context.Context, tx *sql.Tx, file *File) error { + query := ` + INSERT INTO files (path, mtime, ctime, size, mode, uid, gid, link_target) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(path) DO UPDATE SET + mtime = excluded.mtime, + ctime = excluded.ctime, + size = excluded.size, + mode = excluded.mode, + uid = excluded.uid, + gid = excluded.gid, + link_target = excluded.link_target + ` + + var err error + if tx != nil { + _, err = tx.ExecContext(ctx, query, file.Path, file.MTime.Unix(), file.CTime.Unix(), file.Size, file.Mode, file.UID, file.GID, file.LinkTarget) + } else { + _, err = r.db.conn.ExecContext(ctx, query, file.Path, file.MTime.Unix(), file.CTime.Unix(), file.Size, file.Mode, file.UID, file.GID, file.LinkTarget) + } + + if err != nil { + return fmt.Errorf("inserting file: %w", err) + } + + return nil +} + +func (r *FileRepository) GetByPath(ctx context.Context, path string) (*File, error) { + query := ` + SELECT path, mtime, ctime, size, mode, uid, gid, link_target + FROM files + WHERE path = ? + ` + + var file File + var mtimeUnix, ctimeUnix int64 + var linkTarget sql.NullString + + err := r.db.conn.QueryRowContext(ctx, query, path).Scan( + &file.Path, + &mtimeUnix, + &ctimeUnix, + &file.Size, + &file.Mode, + &file.UID, + &file.GID, + &linkTarget, + ) + + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("querying file: %w", err) + } + + file.MTime = time.Unix(mtimeUnix, 0) + file.CTime = time.Unix(ctimeUnix, 0) + if linkTarget.Valid { + file.LinkTarget = linkTarget.String + } + + return &file, nil +} + +func (r *FileRepository) ListModifiedSince(ctx context.Context, since time.Time) ([]*File, error) { + query := ` + SELECT path, mtime, ctime, size, mode, uid, gid, link_target + FROM files + WHERE mtime >= ? + ORDER BY path + ` + + rows, err := r.db.conn.QueryContext(ctx, query, since.Unix()) + if err != nil { + return nil, fmt.Errorf("querying files: %w", err) + } + defer CloseRows(rows) + + var files []*File + for rows.Next() { + var file File + var mtimeUnix, ctimeUnix int64 + var linkTarget sql.NullString + + err := rows.Scan( + &file.Path, + &mtimeUnix, + &ctimeUnix, + &file.Size, + &file.Mode, + &file.UID, + &file.GID, + &linkTarget, + ) + if err != nil { + return nil, fmt.Errorf("scanning file: %w", err) + } + + file.MTime = time.Unix(mtimeUnix, 0) + file.CTime = time.Unix(ctimeUnix, 0) + if linkTarget.Valid { + file.LinkTarget = linkTarget.String + } + + files = append(files, &file) + } + + return files, rows.Err() +} + +func (r *FileRepository) Delete(ctx context.Context, tx *sql.Tx, path string) error { + query := `DELETE FROM files WHERE path = ?` + + var err error + if tx != nil { + _, err = tx.ExecContext(ctx, query, path) + } else { + _, err = r.db.conn.ExecContext(ctx, query, path) + } + + if err != nil { + return fmt.Errorf("deleting file: %w", err) + } + + return nil +} diff --git a/internal/database/models.go b/internal/database/models.go new file mode 100644 index 0000000..60eec75 --- /dev/null +++ b/internal/database/models.go @@ -0,0 +1,67 @@ +package database + +import "time" + +// File represents a file record in the database +type File struct { + Path string + MTime time.Time + CTime time.Time + Size int64 + Mode uint32 + UID uint32 + GID uint32 + LinkTarget string // empty for regular files, target path for symlinks +} + +// IsSymlink returns true if this file is a symbolic link +func (f *File) IsSymlink() bool { + return f.LinkTarget != "" +} + +// FileChunk represents the mapping between files and chunks +type FileChunk struct { + Path string + Idx int + ChunkHash string +} + +// Chunk represents a chunk record in the database +type Chunk struct { + ChunkHash string + SHA256 string + Size int64 +} + +// Blob represents a blob record in the database +type Blob struct { + BlobHash string + CreatedTS time.Time +} + +// BlobChunk represents the mapping between blobs and chunks +type BlobChunk struct { + BlobHash string + ChunkHash string + Offset int64 + Length int64 +} + +// ChunkFile represents the reverse mapping of chunks to files +type ChunkFile struct { + ChunkHash string + FilePath string + FileOffset int64 + Length int64 +} + +// Snapshot represents a snapshot record in the database +type Snapshot struct { + ID string + Hostname string + VaultikVersion string + CreatedTS time.Time + FileCount int64 + ChunkCount int64 + BlobCount int64 +} diff --git a/internal/database/module.go b/internal/database/module.go new file mode 100644 index 0000000..77e3459 --- /dev/null +++ b/internal/database/module.go @@ -0,0 +1,40 @@ +package database + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "git.eeqj.de/sneak/vaultik/internal/config" + "go.uber.org/fx" +) + +// Module provides database dependencies +var Module = fx.Module("database", + fx.Provide( + provideDatabase, + NewRepositories, + ), +) + +func provideDatabase(lc fx.Lifecycle, cfg *config.Config) (*DB, error) { + // Ensure the index directory exists + indexDir := filepath.Dir(cfg.IndexPath) + if err := os.MkdirAll(indexDir, 0700); err != nil { + return nil, fmt.Errorf("creating index directory: %w", err) + } + + db, err := New(context.Background(), cfg.IndexPath) + if err != nil { + return nil, fmt.Errorf("opening database: %w", err) + } + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return db.Close() + }, + }) + + return db, nil +} diff --git a/internal/database/repositories.go b/internal/database/repositories.go new file mode 100644 index 0000000..d033e3b --- /dev/null +++ b/internal/database/repositories.go @@ -0,0 +1,90 @@ +package database + +import ( + "context" + "database/sql" + "fmt" +) + +type Repositories struct { + db *DB + Files *FileRepository + Chunks *ChunkRepository + Blobs *BlobRepository + FileChunks *FileChunkRepository + BlobChunks *BlobChunkRepository + ChunkFiles *ChunkFileRepository + Snapshots *SnapshotRepository +} + +func NewRepositories(db *DB) *Repositories { + return &Repositories{ + db: db, + Files: NewFileRepository(db), + Chunks: NewChunkRepository(db), + Blobs: NewBlobRepository(db), + FileChunks: NewFileChunkRepository(db), + BlobChunks: NewBlobChunkRepository(db), + ChunkFiles: NewChunkFileRepository(db), + Snapshots: NewSnapshotRepository(db), + } +} + +type TxFunc func(ctx context.Context, tx *sql.Tx) error + +func (r *Repositories) WithTx(ctx context.Context, fn TxFunc) error { + tx, err := r.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("beginning transaction: %w", err) + } + + defer func() { + if p := recover(); p != nil { + if rollbackErr := tx.Rollback(); rollbackErr != nil { + Fatal("failed to rollback transaction: %v", rollbackErr) + } + panic(p) + } else if err != nil { + if rollbackErr := tx.Rollback(); rollbackErr != nil { + Fatal("failed to rollback transaction: %v", rollbackErr) + } + } + }() + + err = fn(ctx, tx) + if err != nil { + return err + } + + return tx.Commit() +} + +func (r *Repositories) WithReadTx(ctx context.Context, fn TxFunc) error { + opts := &sql.TxOptions{ + ReadOnly: true, + } + tx, err := r.db.BeginTx(ctx, opts) + if err != nil { + return fmt.Errorf("beginning read transaction: %w", err) + } + + defer func() { + if p := recover(); p != nil { + if rollbackErr := tx.Rollback(); rollbackErr != nil { + Fatal("failed to rollback transaction: %v", rollbackErr) + } + panic(p) + } else if err != nil { + if rollbackErr := tx.Rollback(); rollbackErr != nil { + Fatal("failed to rollback transaction: %v", rollbackErr) + } + } + }() + + err = fn(ctx, tx) + if err != nil { + return err + } + + return tx.Commit() +} diff --git a/internal/database/snapshots.go b/internal/database/snapshots.go new file mode 100644 index 0000000..ea5e8f6 --- /dev/null +++ b/internal/database/snapshots.go @@ -0,0 +1,131 @@ +package database + +import ( + "context" + "database/sql" + "fmt" + "time" +) + +type SnapshotRepository struct { + db *DB +} + +func NewSnapshotRepository(db *DB) *SnapshotRepository { + return &SnapshotRepository{db: db} +} + +func (r *SnapshotRepository) Create(ctx context.Context, tx *sql.Tx, snapshot *Snapshot) error { + query := ` + INSERT INTO snapshots (id, hostname, vaultik_version, created_ts, file_count, chunk_count, blob_count) + VALUES (?, ?, ?, ?, ?, ?, ?) + ` + + var err error + if tx != nil { + _, err = tx.ExecContext(ctx, query, snapshot.ID, snapshot.Hostname, snapshot.VaultikVersion, snapshot.CreatedTS.Unix(), snapshot.FileCount, snapshot.ChunkCount, snapshot.BlobCount) + } else { + _, err = r.db.conn.ExecContext(ctx, query, snapshot.ID, snapshot.Hostname, snapshot.VaultikVersion, snapshot.CreatedTS.Unix(), snapshot.FileCount, snapshot.ChunkCount, snapshot.BlobCount) + } + + if err != nil { + return fmt.Errorf("inserting snapshot: %w", err) + } + + return nil +} + +func (r *SnapshotRepository) UpdateCounts(ctx context.Context, tx *sql.Tx, snapshotID string, fileCount, chunkCount, blobCount int64) error { + query := ` + UPDATE snapshots + SET file_count = ?, + chunk_count = ?, + blob_count = ? + WHERE id = ? + ` + + var err error + if tx != nil { + _, err = tx.ExecContext(ctx, query, fileCount, chunkCount, blobCount, snapshotID) + } else { + _, err = r.db.conn.ExecContext(ctx, query, fileCount, chunkCount, blobCount, snapshotID) + } + + if err != nil { + return fmt.Errorf("updating snapshot: %w", err) + } + + return nil +} + +func (r *SnapshotRepository) GetByID(ctx context.Context, snapshotID string) (*Snapshot, error) { + query := ` + SELECT id, hostname, vaultik_version, created_ts, file_count, chunk_count, blob_count + FROM snapshots + WHERE id = ? + ` + + var snapshot Snapshot + var createdTSUnix int64 + + err := r.db.conn.QueryRowContext(ctx, query, snapshotID).Scan( + &snapshot.ID, + &snapshot.Hostname, + &snapshot.VaultikVersion, + &createdTSUnix, + &snapshot.FileCount, + &snapshot.ChunkCount, + &snapshot.BlobCount, + ) + + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("querying snapshot: %w", err) + } + + snapshot.CreatedTS = time.Unix(createdTSUnix, 0) + + return &snapshot, nil +} + +func (r *SnapshotRepository) ListRecent(ctx context.Context, limit int) ([]*Snapshot, error) { + query := ` + SELECT id, hostname, vaultik_version, created_ts, file_count, chunk_count, blob_count + FROM snapshots + ORDER BY created_ts DESC + LIMIT ? + ` + + rows, err := r.db.conn.QueryContext(ctx, query, limit) + if err != nil { + return nil, fmt.Errorf("querying snapshots: %w", err) + } + defer CloseRows(rows) + + var snapshots []*Snapshot + for rows.Next() { + var snapshot Snapshot + var createdTSUnix int64 + + err := rows.Scan( + &snapshot.ID, + &snapshot.Hostname, + &snapshot.VaultikVersion, + &createdTSUnix, + &snapshot.FileCount, + &snapshot.ChunkCount, + &snapshot.BlobCount, + ) + if err != nil { + return nil, fmt.Errorf("scanning snapshot: %w", err) + } + + snapshot.CreatedTS = time.Unix(createdTSUnix, 0) + + snapshots = append(snapshots, &snapshot) + } + + return snapshots, rows.Err() +} diff --git a/internal/globals/globals.go b/internal/globals/globals.go index c341ef8..53ad9aa 100644 --- a/internal/globals/globals.go +++ b/internal/globals/globals.go @@ -1,10 +1,7 @@ package globals import ( - "context" "time" - - "go.uber.org/fx" ) // these get populated from main() and copied into the Globals object. @@ -21,19 +18,13 @@ type Globals struct { StartTime time.Time } -func New(lc fx.Lifecycle) (*Globals, error) { +func New() (*Globals, error) { n := &Globals{ - Appname: Appname, - Version: Version, - Commit: Commit, + Appname: Appname, + Version: Version, + Commit: Commit, + StartTime: time.Now(), } - - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - n.StartTime = time.Now() - return nil - }, - }) - + return n, nil } diff --git a/internal/globals/globals_test.go b/internal/globals/globals_test.go index 2c4ad56..82fe7c3 100644 --- a/internal/globals/globals_test.go +++ b/internal/globals/globals_test.go @@ -2,7 +2,7 @@ package globals import ( "testing" - + "go.uber.org/fx" "go.uber.org/fx/fxtest" ) @@ -15,22 +15,22 @@ func TestGlobalsNew(t *testing.T) { if g == nil { t.Fatal("Globals instance is nil") } - + if g.Appname != "vaultik" { t.Errorf("Expected Appname to be 'vaultik', got '%s'", g.Appname) } - + // Version and Commit will be "dev" and "unknown" by default if g.Version == "" { t.Error("Version should not be empty") } - + if g.Commit == "" { t.Error("Commit should not be empty") } }), ) - + app.RequireStart() app.RequireStop() -} \ No newline at end of file +} diff --git a/internal/models/models.go b/internal/models/models.go index f3324ae..2541d6a 100644 --- a/internal/models/models.go +++ b/internal/models/models.go @@ -27,8 +27,7 @@ type ChunkRef struct { // BlobInfo represents an encrypted blob containing multiple chunks type BlobInfo struct { - Hash string // Hash of encrypted blob - FinalHash string // Hash after compression and encryption + Hash string // SHA256 hash of the blob content (content-addressable) CreatedAt time.Time Size int64 ChunkCount int @@ -36,7 +35,7 @@ type BlobInfo struct { // Snapshot represents a backup snapshot type Snapshot struct { - ID string // ISO8601 timestamp + ID string // ISO8601 timestamp Hostname string Version string CreatedAt time.Time @@ -70,4 +69,4 @@ type DirtyPath struct { Path string MarkedAt time.Time EventType string // "create", "modify", "delete" -} \ No newline at end of file +} diff --git a/internal/models/models_test.go b/internal/models/models_test.go index d4f1892..1f44ac9 100644 --- a/internal/models/models_test.go +++ b/internal/models/models_test.go @@ -9,7 +9,7 @@ import ( func TestModelsCompilation(t *testing.T) { // This test primarily serves as a compilation test // to ensure all types are properly defined - + // Test FileInfo fi := &FileInfo{ Path: "/test/file.txt", @@ -19,7 +19,7 @@ func TestModelsCompilation(t *testing.T) { if fi.Path != "/test/file.txt" { t.Errorf("FileInfo.Path not set correctly") } - + // Test ChunkInfo ci := &ChunkInfo{ Hash: "abc123", @@ -29,11 +29,10 @@ func TestModelsCompilation(t *testing.T) { if ci.Hash != "abc123" { t.Errorf("ChunkInfo.Hash not set correctly") } - + // Test BlobInfo bi := &BlobInfo{ Hash: "blob123", - FinalHash: "final123", CreatedAt: time.Now(), Size: 1024, ChunkCount: 2, @@ -41,7 +40,7 @@ func TestModelsCompilation(t *testing.T) { if bi.Hash != "blob123" { t.Errorf("BlobInfo.Hash not set correctly") } - + // Test Snapshot s := &Snapshot{ ID: "2024-01-01T00:00:00Z", @@ -52,4 +51,4 @@ func TestModelsCompilation(t *testing.T) { if s.ID != "2024-01-01T00:00:00Z" { t.Errorf("Snapshot.ID not set correctly") } -} \ No newline at end of file +}