All checks were successful
check / check (pull_request) Successful in 2m30s
Replace the monolithic schema.sql embed with a numbered migration system following the pixa pattern: - Add schema/000.sql: bootstrap migration creating schema_migrations table with INTEGER PRIMARY KEY version column and applied_at timestamp - Move full schema to schema/001.sql (renamed from schema.sql) - Remove redundant schema/008_uploads.sql (uploads table already in main schema) - Rewrite database.go migration logic: - Embed schema/*.sql directory instead of single schema.sql - bootstrapMigrationsTable() checks for table existence, applies 000.sql - applyMigrations() iterates numbered files, skips already-applied versions - ParseMigrationVersion() extracts version from filenames (e.g. 001.sql) - Go code does zero INSERTs for bootstrap — 000.sql is self-contained - Update database_test.go to verify schema_migrations table exists
399 lines
13 KiB
Go
399 lines
13 KiB
Go
// Package database provides the local SQLite index for Vaultik backup operations.
|
|
// The database tracks files, chunks, and their associations with blobs.
|
|
//
|
|
// Blobs in Vaultik are the final storage units uploaded to S3. Each blob is a
|
|
// large (up to 10GB) file containing many compressed and encrypted chunks from
|
|
// multiple source files. Blobs are content-addressed, meaning their filename
|
|
// is derived from their SHA256 hash after compression and encryption.
|
|
//
|
|
// Schema is managed via numbered SQL migrations embedded in the schema/
|
|
// directory. Migration 000.sql bootstraps the schema_migrations tracking
|
|
// table; subsequent migrations (001, 002, …) are applied in order.
|
|
package database
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"embed"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"git.eeqj.de/sneak/vaultik/internal/log"
|
|
_ "modernc.org/sqlite"
|
|
)
|
|
|
|
//go:embed schema/*.sql
|
|
var schemaFS embed.FS
|
|
|
|
// bootstrapVersion is the migration that creates the schema_migrations
|
|
// table itself. It is applied before the normal migration loop.
|
|
const bootstrapVersion = 0
|
|
|
|
// DB represents the Vaultik local index database connection.
|
|
// It uses SQLite to track file metadata, content-defined chunks, and blob associations.
|
|
// The database enables incremental backups by detecting changed files and
|
|
// supports deduplication by tracking which chunks are already stored in blobs.
|
|
// Write operations are synchronized through a mutex to ensure thread safety.
|
|
type DB struct {
|
|
conn *sql.DB
|
|
path string
|
|
}
|
|
|
|
// ParseMigrationVersion extracts the numeric version prefix from a migration
|
|
// filename. Filenames must follow the pattern "<version>.sql" or
|
|
// "<version>_<description>.sql", where version is a zero-padded numeric
|
|
// string (e.g. "001", "002"). Returns the version as an integer and an
|
|
// error if the filename does not match the expected pattern.
|
|
func ParseMigrationVersion(filename string) (int, error) {
|
|
name := strings.TrimSuffix(filename, filepath.Ext(filename))
|
|
if name == "" {
|
|
return 0, fmt.Errorf("invalid migration filename %q: empty name", filename)
|
|
}
|
|
|
|
// Split on underscore to separate version from description.
|
|
// If there's no underscore, the entire stem is the version.
|
|
versionStr := name
|
|
if idx := strings.IndexByte(name, '_'); idx >= 0 {
|
|
versionStr = name[:idx]
|
|
}
|
|
|
|
if versionStr == "" {
|
|
return 0, fmt.Errorf("invalid migration filename %q: empty version prefix", filename)
|
|
}
|
|
|
|
// Validate the version is purely numeric.
|
|
for _, ch := range versionStr {
|
|
if ch < '0' || ch > '9' {
|
|
return 0, fmt.Errorf(
|
|
"invalid migration filename %q: version %q contains non-numeric character %q",
|
|
filename, versionStr, string(ch),
|
|
)
|
|
}
|
|
}
|
|
|
|
version, err := strconv.Atoi(versionStr)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("invalid migration filename %q: %w", filename, err)
|
|
}
|
|
|
|
return version, nil
|
|
}
|
|
|
|
// New creates a new database connection at the specified path.
|
|
// It creates the schema if needed and configures SQLite with WAL mode for
|
|
// better concurrency. SQLite handles crash recovery automatically when
|
|
// opening a database with journal/WAL files present.
|
|
// The path parameter can be a file path for persistent storage or ":memory:"
|
|
// for an in-memory database (useful for testing).
|
|
func New(ctx context.Context, path string) (*DB, error) {
|
|
log.Debug("Opening database connection", "path", path)
|
|
|
|
// Note: We do NOT delete journal/WAL files before opening.
|
|
// SQLite handles crash recovery automatically when the database is opened.
|
|
// Deleting these files would corrupt the database after an unclean shutdown.
|
|
|
|
// First attempt with standard WAL mode
|
|
log.Debug("Attempting to open database with WAL mode", "path", path)
|
|
conn, err := sql.Open(
|
|
"sqlite",
|
|
path+"?_journal_mode=WAL&_synchronous=NORMAL&_busy_timeout=10000&_locking_mode=NORMAL&_foreign_keys=ON",
|
|
)
|
|
if err == nil {
|
|
// Set connection pool settings
|
|
// SQLite can handle multiple readers but only one writer at a time.
|
|
// Setting MaxOpenConns to 1 ensures all writes are serialized through
|
|
// a single connection, preventing SQLITE_BUSY errors.
|
|
conn.SetMaxOpenConns(1)
|
|
conn.SetMaxIdleConns(1)
|
|
|
|
if err := conn.PingContext(ctx); err == nil {
|
|
// Success on first try
|
|
log.Debug("Database opened successfully with WAL mode", "path", path)
|
|
|
|
// Enable foreign keys explicitly
|
|
if _, err := conn.ExecContext(ctx, "PRAGMA foreign_keys = ON"); err != nil {
|
|
log.Warn("Failed to enable foreign keys", "error", err)
|
|
}
|
|
|
|
db := &DB{conn: conn, path: path}
|
|
if err := applyMigrations(ctx, conn); err != nil {
|
|
_ = conn.Close()
|
|
return nil, fmt.Errorf("applying migrations: %w", err)
|
|
}
|
|
return db, nil
|
|
}
|
|
log.Debug("Failed to ping database, closing connection", "path", path, "error", err)
|
|
_ = conn.Close()
|
|
}
|
|
|
|
// If first attempt failed, try with TRUNCATE mode to clear any locks
|
|
log.Info(
|
|
"Database appears locked, attempting recovery with TRUNCATE mode",
|
|
"path", path,
|
|
)
|
|
conn, err = sql.Open(
|
|
"sqlite",
|
|
path+"?_journal_mode=TRUNCATE&_synchronous=NORMAL&_busy_timeout=10000&_foreign_keys=ON",
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("opening database in recovery mode: %w", err)
|
|
}
|
|
|
|
// Set connection pool settings
|
|
// SQLite can handle multiple readers but only one writer at a time.
|
|
// Setting MaxOpenConns to 1 ensures all writes are serialized through
|
|
// a single connection, preventing SQLITE_BUSY errors.
|
|
conn.SetMaxOpenConns(1)
|
|
conn.SetMaxIdleConns(1)
|
|
|
|
if err := conn.PingContext(ctx); err != nil {
|
|
log.Debug("Failed to ping database in recovery mode, closing", "path", path, "error", err)
|
|
_ = conn.Close()
|
|
return nil, fmt.Errorf(
|
|
"database still locked after recovery attempt: %w",
|
|
err,
|
|
)
|
|
}
|
|
|
|
log.Debug("Database opened in TRUNCATE mode", "path", path)
|
|
|
|
// Switch back to WAL mode
|
|
log.Debug("Switching database back to WAL mode", "path", path)
|
|
if _, err := conn.ExecContext(ctx, "PRAGMA journal_mode=WAL"); err != nil {
|
|
log.Warn("Failed to switch back to WAL mode", "path", path, "error", err)
|
|
}
|
|
|
|
// Ensure foreign keys are enabled
|
|
if _, err := conn.ExecContext(ctx, "PRAGMA foreign_keys=ON"); err != nil {
|
|
log.Warn("Failed to enable foreign keys", "path", path, "error", err)
|
|
}
|
|
|
|
db := &DB{conn: conn, path: path}
|
|
if err := applyMigrations(ctx, conn); err != nil {
|
|
_ = conn.Close()
|
|
return nil, fmt.Errorf("applying migrations: %w", err)
|
|
}
|
|
|
|
log.Debug("Database connection established successfully", "path", path)
|
|
return db, nil
|
|
}
|
|
|
|
// Close closes the database connection.
|
|
// It ensures all pending operations are completed before closing.
|
|
// Returns an error if the database connection cannot be closed properly.
|
|
func (db *DB) Close() error {
|
|
log.Debug("Closing database connection", "path", db.path)
|
|
if err := db.conn.Close(); err != nil {
|
|
log.Error("Failed to close database", "path", db.path, "error", err)
|
|
return fmt.Errorf("failed to close database: %w", err)
|
|
}
|
|
log.Debug("Database connection closed successfully", "path", db.path)
|
|
return nil
|
|
}
|
|
|
|
// Conn returns the underlying *sql.DB connection.
|
|
// This should be used sparingly and primarily for read operations.
|
|
// For write operations, prefer using the ExecWithLog method.
|
|
func (db *DB) Conn() *sql.DB {
|
|
return db.conn
|
|
}
|
|
|
|
// Path returns the path to the database file.
|
|
func (db *DB) Path() string {
|
|
return db.path
|
|
}
|
|
|
|
// BeginTx starts a new database transaction with the given options.
|
|
// The caller is responsible for committing or rolling back the transaction.
|
|
// For write transactions, consider using the Repositories.WithTx method instead,
|
|
// which handles locking and rollback automatically.
|
|
func (db *DB) BeginTx(
|
|
ctx context.Context,
|
|
opts *sql.TxOptions,
|
|
) (*sql.Tx, error) {
|
|
return db.conn.BeginTx(ctx, opts)
|
|
}
|
|
|
|
// Note: LockForWrite and UnlockWrite methods have been removed.
|
|
// SQLite handles its own locking internally, so explicit locking is not needed.
|
|
|
|
// ExecWithLog executes a write query with SQL logging.
|
|
// SQLite handles its own locking internally, so we just pass through to ExecContext.
|
|
// The query and args parameters follow the same format as sql.DB.ExecContext.
|
|
func (db *DB) ExecWithLog(
|
|
ctx context.Context,
|
|
query string,
|
|
args ...interface{},
|
|
) (sql.Result, error) {
|
|
LogSQL("Execute", query, args...)
|
|
return db.conn.ExecContext(ctx, query, args...)
|
|
}
|
|
|
|
// QueryRowWithLog executes a query that returns at most one row with SQL logging.
|
|
// This is useful for queries that modify data and return values (e.g., INSERT ... RETURNING).
|
|
// SQLite handles its own locking internally.
|
|
// The query and args parameters follow the same format as sql.DB.QueryRowContext.
|
|
func (db *DB) QueryRowWithLog(
|
|
ctx context.Context,
|
|
query string,
|
|
args ...interface{},
|
|
) *sql.Row {
|
|
LogSQL("QueryRow", query, args...)
|
|
return db.conn.QueryRowContext(ctx, query, args...)
|
|
}
|
|
|
|
// collectMigrations reads the embedded schema directory and returns
|
|
// migration filenames sorted lexicographically.
|
|
func collectMigrations() ([]string, error) {
|
|
entries, err := schemaFS.ReadDir("schema")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read schema directory: %w", err)
|
|
}
|
|
|
|
var migrations []string
|
|
|
|
for _, entry := range entries {
|
|
if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".sql") {
|
|
migrations = append(migrations, entry.Name())
|
|
}
|
|
}
|
|
|
|
sort.Strings(migrations)
|
|
|
|
return migrations, nil
|
|
}
|
|
|
|
// bootstrapMigrationsTable ensures the schema_migrations table exists
|
|
// by applying 000.sql if the table is missing.
|
|
func bootstrapMigrationsTable(ctx context.Context, db *sql.DB) error {
|
|
var tableExists int
|
|
|
|
err := db.QueryRowContext(ctx,
|
|
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_migrations'",
|
|
).Scan(&tableExists)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check for migrations table: %w", err)
|
|
}
|
|
|
|
if tableExists > 0 {
|
|
return nil
|
|
}
|
|
|
|
content, err := schemaFS.ReadFile("schema/000.sql")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read bootstrap migration 000.sql: %w", err)
|
|
}
|
|
|
|
log.Info("applying bootstrap migration", "version", bootstrapVersion)
|
|
|
|
_, err = db.ExecContext(ctx, string(content))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to apply bootstrap migration: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// applyMigrations applies all pending migrations to db. It first bootstraps
|
|
// the schema_migrations table via 000.sql, then iterates through remaining
|
|
// migration files in order.
|
|
func applyMigrations(ctx context.Context, db *sql.DB) error {
|
|
if err := bootstrapMigrationsTable(ctx, db); err != nil {
|
|
return err
|
|
}
|
|
|
|
migrations, err := collectMigrations()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, migration := range migrations {
|
|
version, parseErr := ParseMigrationVersion(migration)
|
|
if parseErr != nil {
|
|
return parseErr
|
|
}
|
|
|
|
// Check if already applied.
|
|
var count int
|
|
|
|
err := db.QueryRowContext(ctx,
|
|
"SELECT COUNT(*) FROM schema_migrations WHERE version = ?",
|
|
version,
|
|
).Scan(&count)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check migration status: %w", err)
|
|
}
|
|
|
|
if count > 0 {
|
|
log.Debug("migration already applied", "version", version)
|
|
|
|
continue
|
|
}
|
|
|
|
// Read and apply migration.
|
|
content, readErr := schemaFS.ReadFile(filepath.Join("schema", migration))
|
|
if readErr != nil {
|
|
return fmt.Errorf("failed to read migration %s: %w", migration, readErr)
|
|
}
|
|
|
|
log.Info("applying migration", "version", version)
|
|
|
|
_, execErr := db.ExecContext(ctx, string(content))
|
|
if execErr != nil {
|
|
return fmt.Errorf("failed to apply migration %s: %w", migration, execErr)
|
|
}
|
|
|
|
// Record migration as applied.
|
|
_, recErr := db.ExecContext(ctx,
|
|
"INSERT INTO schema_migrations (version) VALUES (?)",
|
|
version,
|
|
)
|
|
if recErr != nil {
|
|
return fmt.Errorf("failed to record migration %s: %w", migration, recErr)
|
|
}
|
|
|
|
log.Info("migration applied successfully", "version", version)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// NewTestDB creates an in-memory SQLite database for testing purposes.
|
|
// The database is automatically initialized with the schema and is ready for use.
|
|
// Each call creates a new independent database instance.
|
|
func NewTestDB() (*DB, error) {
|
|
return New(context.Background(), ":memory:")
|
|
}
|
|
|
|
// repeatPlaceholder generates a string of ", ?" repeated n times for IN clause construction.
|
|
// For example, repeatPlaceholder(2) returns ", ?, ?".
|
|
func repeatPlaceholder(n int) string {
|
|
if n <= 0 {
|
|
return ""
|
|
}
|
|
return strings.Repeat(", ?", n)
|
|
}
|
|
|
|
// LogSQL logs SQL queries and their arguments when debug mode is enabled.
|
|
// Debug mode is activated by setting the GODEBUG environment variable to include "vaultik".
|
|
// This is useful for troubleshooting database operations and understanding query patterns.
|
|
//
|
|
// The operation parameter describes the type of SQL operation (e.g., "Execute", "Query").
|
|
// The query parameter is the SQL statement being executed.
|
|
// The args parameter contains the query arguments that will be interpolated.
|
|
func LogSQL(operation, query string, args ...interface{}) {
|
|
if strings.Contains(os.Getenv("GODEBUG"), "vaultik") {
|
|
log.Debug(
|
|
"SQL "+operation,
|
|
"query",
|
|
strings.TrimSpace(query),
|
|
"args",
|
|
fmt.Sprintf("%v", args),
|
|
)
|
|
}
|
|
}
|