Implement local SQLite index database with repositories
- Add SQLite database connection management with proper error handling - Implement schema for files, chunks, blobs, and snapshots tables - Create repository pattern for each database table - Add transaction support with proper rollback handling - Integrate database module with fx dependency injection - Make index path configurable via VAULTIK_INDEX_PATH env var - Add fatal error handling for database integrity issues - Update DESIGN.md to clarify file_chunks vs chunk_files distinction - Remove FinalHash from BlobInfo (blobs are content-addressable) - Add file metadata support (mtime, ctime, mode, uid, gid, symlinks)
This commit is contained in:
88
internal/database/blob_chunks.go
Normal file
88
internal/database/blob_chunks.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type BlobChunkRepository struct {
|
||||
db *DB
|
||||
}
|
||||
|
||||
func NewBlobChunkRepository(db *DB) *BlobChunkRepository {
|
||||
return &BlobChunkRepository{db: db}
|
||||
}
|
||||
|
||||
func (r *BlobChunkRepository) Create(ctx context.Context, tx *sql.Tx, bc *BlobChunk) error {
|
||||
query := `
|
||||
INSERT INTO blob_chunks (blob_hash, chunk_hash, offset, length)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`
|
||||
|
||||
var err error
|
||||
if tx != nil {
|
||||
_, err = tx.ExecContext(ctx, query, bc.BlobHash, bc.ChunkHash, bc.Offset, bc.Length)
|
||||
} else {
|
||||
_, err = r.db.conn.ExecContext(ctx, query, bc.BlobHash, bc.ChunkHash, bc.Offset, bc.Length)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("inserting blob_chunk: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *BlobChunkRepository) GetByBlobHash(ctx context.Context, blobHash string) ([]*BlobChunk, error) {
|
||||
query := `
|
||||
SELECT blob_hash, chunk_hash, offset, length
|
||||
FROM blob_chunks
|
||||
WHERE blob_hash = ?
|
||||
ORDER BY offset
|
||||
`
|
||||
|
||||
rows, err := r.db.conn.QueryContext(ctx, query, blobHash)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying blob chunks: %w", err)
|
||||
}
|
||||
defer CloseRows(rows)
|
||||
|
||||
var blobChunks []*BlobChunk
|
||||
for rows.Next() {
|
||||
var bc BlobChunk
|
||||
err := rows.Scan(&bc.BlobHash, &bc.ChunkHash, &bc.Offset, &bc.Length)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scanning blob chunk: %w", err)
|
||||
}
|
||||
blobChunks = append(blobChunks, &bc)
|
||||
}
|
||||
|
||||
return blobChunks, rows.Err()
|
||||
}
|
||||
|
||||
func (r *BlobChunkRepository) GetByChunkHash(ctx context.Context, chunkHash string) (*BlobChunk, error) {
|
||||
query := `
|
||||
SELECT blob_hash, chunk_hash, offset, length
|
||||
FROM blob_chunks
|
||||
WHERE chunk_hash = ?
|
||||
LIMIT 1
|
||||
`
|
||||
|
||||
var bc BlobChunk
|
||||
err := r.db.conn.QueryRowContext(ctx, query, chunkHash).Scan(
|
||||
&bc.BlobHash,
|
||||
&bc.ChunkHash,
|
||||
&bc.Offset,
|
||||
&bc.Length,
|
||||
)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying blob chunk: %w", err)
|
||||
}
|
||||
|
||||
return &bc, nil
|
||||
}
|
||||
96
internal/database/blobs.go
Normal file
96
internal/database/blobs.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
type BlobRepository struct {
|
||||
db *DB
|
||||
}
|
||||
|
||||
func NewBlobRepository(db *DB) *BlobRepository {
|
||||
return &BlobRepository{db: db}
|
||||
}
|
||||
|
||||
func (r *BlobRepository) Create(ctx context.Context, tx *sql.Tx, blob *Blob) error {
|
||||
query := `
|
||||
INSERT INTO blobs (blob_hash, created_ts)
|
||||
VALUES (?, ?)
|
||||
`
|
||||
|
||||
var err error
|
||||
if tx != nil {
|
||||
_, err = tx.ExecContext(ctx, query, blob.BlobHash, blob.CreatedTS.Unix())
|
||||
} else {
|
||||
_, err = r.db.conn.ExecContext(ctx, query, blob.BlobHash, blob.CreatedTS.Unix())
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("inserting blob: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *BlobRepository) GetByHash(ctx context.Context, hash string) (*Blob, error) {
|
||||
query := `
|
||||
SELECT blob_hash, created_ts
|
||||
FROM blobs
|
||||
WHERE blob_hash = ?
|
||||
`
|
||||
|
||||
var blob Blob
|
||||
var createdTSUnix int64
|
||||
|
||||
err := r.db.conn.QueryRowContext(ctx, query, hash).Scan(
|
||||
&blob.BlobHash,
|
||||
&createdTSUnix,
|
||||
)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying blob: %w", err)
|
||||
}
|
||||
|
||||
blob.CreatedTS = time.Unix(createdTSUnix, 0)
|
||||
return &blob, nil
|
||||
}
|
||||
|
||||
func (r *BlobRepository) List(ctx context.Context, limit, offset int) ([]*Blob, error) {
|
||||
query := `
|
||||
SELECT blob_hash, created_ts
|
||||
FROM blobs
|
||||
ORDER BY blob_hash
|
||||
LIMIT ? OFFSET ?
|
||||
`
|
||||
|
||||
rows, err := r.db.conn.QueryContext(ctx, query, limit, offset)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying blobs: %w", err)
|
||||
}
|
||||
defer CloseRows(rows)
|
||||
|
||||
var blobs []*Blob
|
||||
for rows.Next() {
|
||||
var blob Blob
|
||||
var createdTSUnix int64
|
||||
|
||||
err := rows.Scan(
|
||||
&blob.BlobHash,
|
||||
&createdTSUnix,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scanning blob: %w", err)
|
||||
}
|
||||
|
||||
blob.CreatedTS = time.Unix(createdTSUnix, 0)
|
||||
blobs = append(blobs, &blob)
|
||||
}
|
||||
|
||||
return blobs, rows.Err()
|
||||
}
|
||||
88
internal/database/chunk_files.go
Normal file
88
internal/database/chunk_files.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type ChunkFileRepository struct {
|
||||
db *DB
|
||||
}
|
||||
|
||||
func NewChunkFileRepository(db *DB) *ChunkFileRepository {
|
||||
return &ChunkFileRepository{db: db}
|
||||
}
|
||||
|
||||
func (r *ChunkFileRepository) Create(ctx context.Context, tx *sql.Tx, cf *ChunkFile) error {
|
||||
query := `
|
||||
INSERT INTO chunk_files (chunk_hash, file_path, file_offset, length)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(chunk_hash, file_path) DO NOTHING
|
||||
`
|
||||
|
||||
var err error
|
||||
if tx != nil {
|
||||
_, err = tx.ExecContext(ctx, query, cf.ChunkHash, cf.FilePath, cf.FileOffset, cf.Length)
|
||||
} else {
|
||||
_, err = r.db.conn.ExecContext(ctx, query, cf.ChunkHash, cf.FilePath, cf.FileOffset, cf.Length)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("inserting chunk_file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ChunkFileRepository) GetByChunkHash(ctx context.Context, chunkHash string) ([]*ChunkFile, error) {
|
||||
query := `
|
||||
SELECT chunk_hash, file_path, file_offset, length
|
||||
FROM chunk_files
|
||||
WHERE chunk_hash = ?
|
||||
`
|
||||
|
||||
rows, err := r.db.conn.QueryContext(ctx, query, chunkHash)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying chunk files: %w", err)
|
||||
}
|
||||
defer CloseRows(rows)
|
||||
|
||||
var chunkFiles []*ChunkFile
|
||||
for rows.Next() {
|
||||
var cf ChunkFile
|
||||
err := rows.Scan(&cf.ChunkHash, &cf.FilePath, &cf.FileOffset, &cf.Length)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scanning chunk file: %w", err)
|
||||
}
|
||||
chunkFiles = append(chunkFiles, &cf)
|
||||
}
|
||||
|
||||
return chunkFiles, rows.Err()
|
||||
}
|
||||
|
||||
func (r *ChunkFileRepository) GetByFilePath(ctx context.Context, filePath string) ([]*ChunkFile, error) {
|
||||
query := `
|
||||
SELECT chunk_hash, file_path, file_offset, length
|
||||
FROM chunk_files
|
||||
WHERE file_path = ?
|
||||
`
|
||||
|
||||
rows, err := r.db.conn.QueryContext(ctx, query, filePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying chunk files: %w", err)
|
||||
}
|
||||
defer CloseRows(rows)
|
||||
|
||||
var chunkFiles []*ChunkFile
|
||||
for rows.Next() {
|
||||
var cf ChunkFile
|
||||
err := rows.Scan(&cf.ChunkHash, &cf.FilePath, &cf.FileOffset, &cf.Length)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scanning chunk file: %w", err)
|
||||
}
|
||||
chunkFiles = append(chunkFiles, &cf)
|
||||
}
|
||||
|
||||
return chunkFiles, rows.Err()
|
||||
}
|
||||
141
internal/database/chunks.go
Normal file
141
internal/database/chunks.go
Normal file
@@ -0,0 +1,141 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type ChunkRepository struct {
|
||||
db *DB
|
||||
}
|
||||
|
||||
func NewChunkRepository(db *DB) *ChunkRepository {
|
||||
return &ChunkRepository{db: db}
|
||||
}
|
||||
|
||||
func (r *ChunkRepository) Create(ctx context.Context, tx *sql.Tx, chunk *Chunk) error {
|
||||
query := `
|
||||
INSERT INTO chunks (chunk_hash, sha256, size)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT(chunk_hash) DO NOTHING
|
||||
`
|
||||
|
||||
var err error
|
||||
if tx != nil {
|
||||
_, err = tx.ExecContext(ctx, query, chunk.ChunkHash, chunk.SHA256, chunk.Size)
|
||||
} else {
|
||||
_, err = r.db.conn.ExecContext(ctx, query, chunk.ChunkHash, chunk.SHA256, chunk.Size)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("inserting chunk: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ChunkRepository) GetByHash(ctx context.Context, hash string) (*Chunk, error) {
|
||||
query := `
|
||||
SELECT chunk_hash, sha256, size
|
||||
FROM chunks
|
||||
WHERE chunk_hash = ?
|
||||
`
|
||||
|
||||
var chunk Chunk
|
||||
|
||||
err := r.db.conn.QueryRowContext(ctx, query, hash).Scan(
|
||||
&chunk.ChunkHash,
|
||||
&chunk.SHA256,
|
||||
&chunk.Size,
|
||||
)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying chunk: %w", err)
|
||||
}
|
||||
|
||||
return &chunk, nil
|
||||
}
|
||||
|
||||
func (r *ChunkRepository) GetByHashes(ctx context.Context, hashes []string) ([]*Chunk, error) {
|
||||
if len(hashes) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
query := `
|
||||
SELECT chunk_hash, sha256, size
|
||||
FROM chunks
|
||||
WHERE chunk_hash IN (`
|
||||
|
||||
args := make([]interface{}, len(hashes))
|
||||
for i, hash := range hashes {
|
||||
if i > 0 {
|
||||
query += ", "
|
||||
}
|
||||
query += "?"
|
||||
args[i] = hash
|
||||
}
|
||||
query += ") ORDER BY chunk_hash"
|
||||
|
||||
rows, err := r.db.conn.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying chunks: %w", err)
|
||||
}
|
||||
defer CloseRows(rows)
|
||||
|
||||
var chunks []*Chunk
|
||||
for rows.Next() {
|
||||
var chunk Chunk
|
||||
|
||||
err := rows.Scan(
|
||||
&chunk.ChunkHash,
|
||||
&chunk.SHA256,
|
||||
&chunk.Size,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scanning chunk: %w", err)
|
||||
}
|
||||
|
||||
chunks = append(chunks, &chunk)
|
||||
}
|
||||
|
||||
return chunks, rows.Err()
|
||||
}
|
||||
|
||||
func (r *ChunkRepository) ListUnpacked(ctx context.Context, limit int) ([]*Chunk, error) {
|
||||
query := `
|
||||
SELECT c.chunk_hash, c.sha256, c.size
|
||||
FROM chunks c
|
||||
LEFT JOIN blob_chunks bc ON c.chunk_hash = bc.chunk_hash
|
||||
WHERE bc.chunk_hash IS NULL
|
||||
ORDER BY c.chunk_hash
|
||||
LIMIT ?
|
||||
`
|
||||
|
||||
rows, err := r.db.conn.QueryContext(ctx, query, limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying unpacked chunks: %w", err)
|
||||
}
|
||||
defer CloseRows(rows)
|
||||
|
||||
var chunks []*Chunk
|
||||
for rows.Next() {
|
||||
var chunk Chunk
|
||||
|
||||
err := rows.Scan(
|
||||
&chunk.ChunkHash,
|
||||
&chunk.SHA256,
|
||||
&chunk.Size,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scanning chunk: %w", err)
|
||||
}
|
||||
|
||||
chunks = append(chunks, &chunk)
|
||||
}
|
||||
|
||||
return chunks, rows.Err()
|
||||
}
|
||||
114
internal/database/database.go
Normal file
114
internal/database/database.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
type DB struct {
|
||||
conn *sql.DB
|
||||
}
|
||||
|
||||
func New(ctx context.Context, path string) (*DB, error) {
|
||||
conn, err := sql.Open("sqlite3", path+"?_journal_mode=WAL&_synchronous=NORMAL&_busy_timeout=5000")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("opening database: %w", err)
|
||||
}
|
||||
|
||||
if err := conn.PingContext(ctx); err != nil {
|
||||
if closeErr := conn.Close(); closeErr != nil {
|
||||
Fatal("failed to close database connection: %v", closeErr)
|
||||
}
|
||||
return nil, fmt.Errorf("pinging database: %w", err)
|
||||
}
|
||||
|
||||
db := &DB{conn: conn}
|
||||
if err := db.createSchema(ctx); err != nil {
|
||||
if closeErr := conn.Close(); closeErr != nil {
|
||||
Fatal("failed to close database connection: %v", closeErr)
|
||||
}
|
||||
return nil, fmt.Errorf("creating schema: %w", err)
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func (db *DB) Close() error {
|
||||
if err := db.conn.Close(); err != nil {
|
||||
Fatal("failed to close database: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) Conn() *sql.DB {
|
||||
return db.conn
|
||||
}
|
||||
|
||||
func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
|
||||
return db.conn.BeginTx(ctx, opts)
|
||||
}
|
||||
|
||||
func (db *DB) createSchema(ctx context.Context) error {
|
||||
schema := `
|
||||
CREATE TABLE IF NOT EXISTS files (
|
||||
path TEXT PRIMARY KEY,
|
||||
mtime INTEGER NOT NULL,
|
||||
ctime INTEGER NOT NULL,
|
||||
size INTEGER NOT NULL,
|
||||
mode INTEGER NOT NULL,
|
||||
uid INTEGER NOT NULL,
|
||||
gid INTEGER NOT NULL,
|
||||
link_target TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS file_chunks (
|
||||
path TEXT NOT NULL,
|
||||
idx INTEGER NOT NULL,
|
||||
chunk_hash TEXT NOT NULL,
|
||||
PRIMARY KEY (path, idx)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS chunks (
|
||||
chunk_hash TEXT PRIMARY KEY,
|
||||
sha256 TEXT NOT NULL,
|
||||
size INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS blobs (
|
||||
blob_hash TEXT PRIMARY KEY,
|
||||
created_ts INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS blob_chunks (
|
||||
blob_hash TEXT NOT NULL,
|
||||
chunk_hash TEXT NOT NULL,
|
||||
offset INTEGER NOT NULL,
|
||||
length INTEGER NOT NULL,
|
||||
PRIMARY KEY (blob_hash, chunk_hash)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS chunk_files (
|
||||
chunk_hash TEXT NOT NULL,
|
||||
file_path TEXT NOT NULL,
|
||||
file_offset INTEGER NOT NULL,
|
||||
length INTEGER NOT NULL,
|
||||
PRIMARY KEY (chunk_hash, file_path)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS snapshots (
|
||||
id TEXT PRIMARY KEY,
|
||||
hostname TEXT NOT NULL,
|
||||
vaultik_version TEXT NOT NULL,
|
||||
created_ts INTEGER NOT NULL,
|
||||
file_count INTEGER NOT NULL,
|
||||
chunk_count INTEGER NOT NULL,
|
||||
blob_count INTEGER NOT NULL
|
||||
);
|
||||
`
|
||||
|
||||
_, err := db.conn.ExecContext(ctx, schema)
|
||||
return err
|
||||
}
|
||||
20
internal/database/errors.go
Normal file
20
internal/database/errors.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
// Fatal prints an error message to stderr and exits with status 1
|
||||
func Fatal(format string, args ...interface{}) {
|
||||
fmt.Fprintf(os.Stderr, "FATAL: "+format+"\n", args...)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// CloseRows closes rows and exits on error
|
||||
func CloseRows(rows *sql.Rows) {
|
||||
if err := rows.Close(); err != nil {
|
||||
Fatal("failed to close rows: %v", err)
|
||||
}
|
||||
}
|
||||
80
internal/database/file_chunks.go
Normal file
80
internal/database/file_chunks.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type FileChunkRepository struct {
|
||||
db *DB
|
||||
}
|
||||
|
||||
func NewFileChunkRepository(db *DB) *FileChunkRepository {
|
||||
return &FileChunkRepository{db: db}
|
||||
}
|
||||
|
||||
func (r *FileChunkRepository) Create(ctx context.Context, tx *sql.Tx, fc *FileChunk) error {
|
||||
query := `
|
||||
INSERT INTO file_chunks (path, idx, chunk_hash)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT(path, idx) DO NOTHING
|
||||
`
|
||||
|
||||
var err error
|
||||
if tx != nil {
|
||||
_, err = tx.ExecContext(ctx, query, fc.Path, fc.Idx, fc.ChunkHash)
|
||||
} else {
|
||||
_, err = r.db.conn.ExecContext(ctx, query, fc.Path, fc.Idx, fc.ChunkHash)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("inserting file_chunk: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *FileChunkRepository) GetByPath(ctx context.Context, path string) ([]*FileChunk, error) {
|
||||
query := `
|
||||
SELECT path, idx, chunk_hash
|
||||
FROM file_chunks
|
||||
WHERE path = ?
|
||||
ORDER BY idx
|
||||
`
|
||||
|
||||
rows, err := r.db.conn.QueryContext(ctx, query, path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying file chunks: %w", err)
|
||||
}
|
||||
defer CloseRows(rows)
|
||||
|
||||
var fileChunks []*FileChunk
|
||||
for rows.Next() {
|
||||
var fc FileChunk
|
||||
err := rows.Scan(&fc.Path, &fc.Idx, &fc.ChunkHash)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scanning file chunk: %w", err)
|
||||
}
|
||||
fileChunks = append(fileChunks, &fc)
|
||||
}
|
||||
|
||||
return fileChunks, rows.Err()
|
||||
}
|
||||
|
||||
func (r *FileChunkRepository) DeleteByPath(ctx context.Context, tx *sql.Tx, path string) error {
|
||||
query := `DELETE FROM file_chunks WHERE path = ?`
|
||||
|
||||
var err error
|
||||
if tx != nil {
|
||||
_, err = tx.ExecContext(ctx, query, path)
|
||||
} else {
|
||||
_, err = r.db.conn.ExecContext(ctx, query, path)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleting file chunks: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
145
internal/database/files.go
Normal file
145
internal/database/files.go
Normal file
@@ -0,0 +1,145 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
type FileRepository struct {
|
||||
db *DB
|
||||
}
|
||||
|
||||
func NewFileRepository(db *DB) *FileRepository {
|
||||
return &FileRepository{db: db}
|
||||
}
|
||||
|
||||
func (r *FileRepository) Create(ctx context.Context, tx *sql.Tx, file *File) error {
|
||||
query := `
|
||||
INSERT INTO files (path, mtime, ctime, size, mode, uid, gid, link_target)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(path) DO UPDATE SET
|
||||
mtime = excluded.mtime,
|
||||
ctime = excluded.ctime,
|
||||
size = excluded.size,
|
||||
mode = excluded.mode,
|
||||
uid = excluded.uid,
|
||||
gid = excluded.gid,
|
||||
link_target = excluded.link_target
|
||||
`
|
||||
|
||||
var err error
|
||||
if tx != nil {
|
||||
_, err = tx.ExecContext(ctx, query, file.Path, file.MTime.Unix(), file.CTime.Unix(), file.Size, file.Mode, file.UID, file.GID, file.LinkTarget)
|
||||
} else {
|
||||
_, err = r.db.conn.ExecContext(ctx, query, file.Path, file.MTime.Unix(), file.CTime.Unix(), file.Size, file.Mode, file.UID, file.GID, file.LinkTarget)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("inserting file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *FileRepository) GetByPath(ctx context.Context, path string) (*File, error) {
|
||||
query := `
|
||||
SELECT path, mtime, ctime, size, mode, uid, gid, link_target
|
||||
FROM files
|
||||
WHERE path = ?
|
||||
`
|
||||
|
||||
var file File
|
||||
var mtimeUnix, ctimeUnix int64
|
||||
var linkTarget sql.NullString
|
||||
|
||||
err := r.db.conn.QueryRowContext(ctx, query, path).Scan(
|
||||
&file.Path,
|
||||
&mtimeUnix,
|
||||
&ctimeUnix,
|
||||
&file.Size,
|
||||
&file.Mode,
|
||||
&file.UID,
|
||||
&file.GID,
|
||||
&linkTarget,
|
||||
)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying file: %w", err)
|
||||
}
|
||||
|
||||
file.MTime = time.Unix(mtimeUnix, 0)
|
||||
file.CTime = time.Unix(ctimeUnix, 0)
|
||||
if linkTarget.Valid {
|
||||
file.LinkTarget = linkTarget.String
|
||||
}
|
||||
|
||||
return &file, nil
|
||||
}
|
||||
|
||||
func (r *FileRepository) ListModifiedSince(ctx context.Context, since time.Time) ([]*File, error) {
|
||||
query := `
|
||||
SELECT path, mtime, ctime, size, mode, uid, gid, link_target
|
||||
FROM files
|
||||
WHERE mtime >= ?
|
||||
ORDER BY path
|
||||
`
|
||||
|
||||
rows, err := r.db.conn.QueryContext(ctx, query, since.Unix())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying files: %w", err)
|
||||
}
|
||||
defer CloseRows(rows)
|
||||
|
||||
var files []*File
|
||||
for rows.Next() {
|
||||
var file File
|
||||
var mtimeUnix, ctimeUnix int64
|
||||
var linkTarget sql.NullString
|
||||
|
||||
err := rows.Scan(
|
||||
&file.Path,
|
||||
&mtimeUnix,
|
||||
&ctimeUnix,
|
||||
&file.Size,
|
||||
&file.Mode,
|
||||
&file.UID,
|
||||
&file.GID,
|
||||
&linkTarget,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scanning file: %w", err)
|
||||
}
|
||||
|
||||
file.MTime = time.Unix(mtimeUnix, 0)
|
||||
file.CTime = time.Unix(ctimeUnix, 0)
|
||||
if linkTarget.Valid {
|
||||
file.LinkTarget = linkTarget.String
|
||||
}
|
||||
|
||||
files = append(files, &file)
|
||||
}
|
||||
|
||||
return files, rows.Err()
|
||||
}
|
||||
|
||||
func (r *FileRepository) Delete(ctx context.Context, tx *sql.Tx, path string) error {
|
||||
query := `DELETE FROM files WHERE path = ?`
|
||||
|
||||
var err error
|
||||
if tx != nil {
|
||||
_, err = tx.ExecContext(ctx, query, path)
|
||||
} else {
|
||||
_, err = r.db.conn.ExecContext(ctx, query, path)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("deleting file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
67
internal/database/models.go
Normal file
67
internal/database/models.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package database
|
||||
|
||||
import "time"
|
||||
|
||||
// File represents a file record in the database
|
||||
type File struct {
|
||||
Path string
|
||||
MTime time.Time
|
||||
CTime time.Time
|
||||
Size int64
|
||||
Mode uint32
|
||||
UID uint32
|
||||
GID uint32
|
||||
LinkTarget string // empty for regular files, target path for symlinks
|
||||
}
|
||||
|
||||
// IsSymlink returns true if this file is a symbolic link
|
||||
func (f *File) IsSymlink() bool {
|
||||
return f.LinkTarget != ""
|
||||
}
|
||||
|
||||
// FileChunk represents the mapping between files and chunks
|
||||
type FileChunk struct {
|
||||
Path string
|
||||
Idx int
|
||||
ChunkHash string
|
||||
}
|
||||
|
||||
// Chunk represents a chunk record in the database
|
||||
type Chunk struct {
|
||||
ChunkHash string
|
||||
SHA256 string
|
||||
Size int64
|
||||
}
|
||||
|
||||
// Blob represents a blob record in the database
|
||||
type Blob struct {
|
||||
BlobHash string
|
||||
CreatedTS time.Time
|
||||
}
|
||||
|
||||
// BlobChunk represents the mapping between blobs and chunks
|
||||
type BlobChunk struct {
|
||||
BlobHash string
|
||||
ChunkHash string
|
||||
Offset int64
|
||||
Length int64
|
||||
}
|
||||
|
||||
// ChunkFile represents the reverse mapping of chunks to files
|
||||
type ChunkFile struct {
|
||||
ChunkHash string
|
||||
FilePath string
|
||||
FileOffset int64
|
||||
Length int64
|
||||
}
|
||||
|
||||
// Snapshot represents a snapshot record in the database
|
||||
type Snapshot struct {
|
||||
ID string
|
||||
Hostname string
|
||||
VaultikVersion string
|
||||
CreatedTS time.Time
|
||||
FileCount int64
|
||||
ChunkCount int64
|
||||
BlobCount int64
|
||||
}
|
||||
40
internal/database/module.go
Normal file
40
internal/database/module.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"git.eeqj.de/sneak/vaultik/internal/config"
|
||||
"go.uber.org/fx"
|
||||
)
|
||||
|
||||
// Module provides database dependencies
|
||||
var Module = fx.Module("database",
|
||||
fx.Provide(
|
||||
provideDatabase,
|
||||
NewRepositories,
|
||||
),
|
||||
)
|
||||
|
||||
func provideDatabase(lc fx.Lifecycle, cfg *config.Config) (*DB, error) {
|
||||
// Ensure the index directory exists
|
||||
indexDir := filepath.Dir(cfg.IndexPath)
|
||||
if err := os.MkdirAll(indexDir, 0700); err != nil {
|
||||
return nil, fmt.Errorf("creating index directory: %w", err)
|
||||
}
|
||||
|
||||
db, err := New(context.Background(), cfg.IndexPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("opening database: %w", err)
|
||||
}
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(ctx context.Context) error {
|
||||
return db.Close()
|
||||
},
|
||||
})
|
||||
|
||||
return db, nil
|
||||
}
|
||||
90
internal/database/repositories.go
Normal file
90
internal/database/repositories.go
Normal file
@@ -0,0 +1,90 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type Repositories struct {
|
||||
db *DB
|
||||
Files *FileRepository
|
||||
Chunks *ChunkRepository
|
||||
Blobs *BlobRepository
|
||||
FileChunks *FileChunkRepository
|
||||
BlobChunks *BlobChunkRepository
|
||||
ChunkFiles *ChunkFileRepository
|
||||
Snapshots *SnapshotRepository
|
||||
}
|
||||
|
||||
func NewRepositories(db *DB) *Repositories {
|
||||
return &Repositories{
|
||||
db: db,
|
||||
Files: NewFileRepository(db),
|
||||
Chunks: NewChunkRepository(db),
|
||||
Blobs: NewBlobRepository(db),
|
||||
FileChunks: NewFileChunkRepository(db),
|
||||
BlobChunks: NewBlobChunkRepository(db),
|
||||
ChunkFiles: NewChunkFileRepository(db),
|
||||
Snapshots: NewSnapshotRepository(db),
|
||||
}
|
||||
}
|
||||
|
||||
type TxFunc func(ctx context.Context, tx *sql.Tx) error
|
||||
|
||||
func (r *Repositories) WithTx(ctx context.Context, fn TxFunc) error {
|
||||
tx, err := r.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("beginning transaction: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
if rollbackErr := tx.Rollback(); rollbackErr != nil {
|
||||
Fatal("failed to rollback transaction: %v", rollbackErr)
|
||||
}
|
||||
panic(p)
|
||||
} else if err != nil {
|
||||
if rollbackErr := tx.Rollback(); rollbackErr != nil {
|
||||
Fatal("failed to rollback transaction: %v", rollbackErr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err = fn(ctx, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (r *Repositories) WithReadTx(ctx context.Context, fn TxFunc) error {
|
||||
opts := &sql.TxOptions{
|
||||
ReadOnly: true,
|
||||
}
|
||||
tx, err := r.db.BeginTx(ctx, opts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("beginning read transaction: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
if rollbackErr := tx.Rollback(); rollbackErr != nil {
|
||||
Fatal("failed to rollback transaction: %v", rollbackErr)
|
||||
}
|
||||
panic(p)
|
||||
} else if err != nil {
|
||||
if rollbackErr := tx.Rollback(); rollbackErr != nil {
|
||||
Fatal("failed to rollback transaction: %v", rollbackErr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err = fn(ctx, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
131
internal/database/snapshots.go
Normal file
131
internal/database/snapshots.go
Normal file
@@ -0,0 +1,131 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
type SnapshotRepository struct {
|
||||
db *DB
|
||||
}
|
||||
|
||||
func NewSnapshotRepository(db *DB) *SnapshotRepository {
|
||||
return &SnapshotRepository{db: db}
|
||||
}
|
||||
|
||||
func (r *SnapshotRepository) Create(ctx context.Context, tx *sql.Tx, snapshot *Snapshot) error {
|
||||
query := `
|
||||
INSERT INTO snapshots (id, hostname, vaultik_version, created_ts, file_count, chunk_count, blob_count)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
`
|
||||
|
||||
var err error
|
||||
if tx != nil {
|
||||
_, err = tx.ExecContext(ctx, query, snapshot.ID, snapshot.Hostname, snapshot.VaultikVersion, snapshot.CreatedTS.Unix(), snapshot.FileCount, snapshot.ChunkCount, snapshot.BlobCount)
|
||||
} else {
|
||||
_, err = r.db.conn.ExecContext(ctx, query, snapshot.ID, snapshot.Hostname, snapshot.VaultikVersion, snapshot.CreatedTS.Unix(), snapshot.FileCount, snapshot.ChunkCount, snapshot.BlobCount)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("inserting snapshot: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *SnapshotRepository) UpdateCounts(ctx context.Context, tx *sql.Tx, snapshotID string, fileCount, chunkCount, blobCount int64) error {
|
||||
query := `
|
||||
UPDATE snapshots
|
||||
SET file_count = ?,
|
||||
chunk_count = ?,
|
||||
blob_count = ?
|
||||
WHERE id = ?
|
||||
`
|
||||
|
||||
var err error
|
||||
if tx != nil {
|
||||
_, err = tx.ExecContext(ctx, query, fileCount, chunkCount, blobCount, snapshotID)
|
||||
} else {
|
||||
_, err = r.db.conn.ExecContext(ctx, query, fileCount, chunkCount, blobCount, snapshotID)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("updating snapshot: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *SnapshotRepository) GetByID(ctx context.Context, snapshotID string) (*Snapshot, error) {
|
||||
query := `
|
||||
SELECT id, hostname, vaultik_version, created_ts, file_count, chunk_count, blob_count
|
||||
FROM snapshots
|
||||
WHERE id = ?
|
||||
`
|
||||
|
||||
var snapshot Snapshot
|
||||
var createdTSUnix int64
|
||||
|
||||
err := r.db.conn.QueryRowContext(ctx, query, snapshotID).Scan(
|
||||
&snapshot.ID,
|
||||
&snapshot.Hostname,
|
||||
&snapshot.VaultikVersion,
|
||||
&createdTSUnix,
|
||||
&snapshot.FileCount,
|
||||
&snapshot.ChunkCount,
|
||||
&snapshot.BlobCount,
|
||||
)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying snapshot: %w", err)
|
||||
}
|
||||
|
||||
snapshot.CreatedTS = time.Unix(createdTSUnix, 0)
|
||||
|
||||
return &snapshot, nil
|
||||
}
|
||||
|
||||
func (r *SnapshotRepository) ListRecent(ctx context.Context, limit int) ([]*Snapshot, error) {
|
||||
query := `
|
||||
SELECT id, hostname, vaultik_version, created_ts, file_count, chunk_count, blob_count
|
||||
FROM snapshots
|
||||
ORDER BY created_ts DESC
|
||||
LIMIT ?
|
||||
`
|
||||
|
||||
rows, err := r.db.conn.QueryContext(ctx, query, limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying snapshots: %w", err)
|
||||
}
|
||||
defer CloseRows(rows)
|
||||
|
||||
var snapshots []*Snapshot
|
||||
for rows.Next() {
|
||||
var snapshot Snapshot
|
||||
var createdTSUnix int64
|
||||
|
||||
err := rows.Scan(
|
||||
&snapshot.ID,
|
||||
&snapshot.Hostname,
|
||||
&snapshot.VaultikVersion,
|
||||
&createdTSUnix,
|
||||
&snapshot.FileCount,
|
||||
&snapshot.ChunkCount,
|
||||
&snapshot.BlobCount,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scanning snapshot: %w", err)
|
||||
}
|
||||
|
||||
snapshot.CreatedTS = time.Unix(createdTSUnix, 0)
|
||||
|
||||
snapshots = append(snapshots, &snapshot)
|
||||
}
|
||||
|
||||
return snapshots, rows.Err()
|
||||
}
|
||||
Reference in New Issue
Block a user