All checks were successful
check / check (push) Successful in 1m50s
Split data storage into main application DB (config only) and
per-webhook event databases (one SQLite file per webhook).
Architecture changes:
- New WebhookDBManager component manages per-webhook DB lifecycle
(create, open, cache, delete) with lazy connection pooling via sync.Map
- Main DB (DBURL) stores only config: Users, Webhooks, Entrypoints,
Targets, APIKeys
- Per-webhook DBs (DATA_DIR) store Events, Deliveries, DeliveryResults
in files named events-{webhook_uuid}.db
- New DATA_DIR env var (default: ./data dev, /data/events prod)
Behavioral changes:
- Webhook creation creates per-webhook DB file
- Webhook deletion hard-deletes per-webhook DB file (config soft-deleted)
- Event ingestion writes to per-webhook DB, not main DB
- Delivery engine polls all per-webhook DBs for pending deliveries
- Database target type marks delivery as immediately successful (events
are already in the dedicated per-webhook DB)
- Event log UI reads from per-webhook DBs with targets from main DB
- Existing webhooks without DB files get them created lazily
Removed:
- ArchivedEvent model (was a half-measure, replaced by per-webhook DBs)
- Event/Delivery/DeliveryResult removed from main DB migrations
Added:
- Comprehensive tests for WebhookDBManager (create, delete, lazy
creation, delivery workflow, multiple webhooks, close all)
- Dockerfile creates /data/events directory
README updates:
- Per-webhook event databases documented as implemented (was Phase 2)
- DATA_DIR added to configuration table
- Docker instructions updated with data volume mount
- Data model diagram updated
- TODO updated (database separation moved to completed)
Closes #15
184 lines
5.4 KiB
Go
184 lines
5.4 KiB
Go
package database
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"go.uber.org/fx"
|
|
"gorm.io/driver/sqlite"
|
|
"gorm.io/gorm"
|
|
"sneak.berlin/go/webhooker/internal/config"
|
|
"sneak.berlin/go/webhooker/internal/logger"
|
|
)
|
|
|
|
// nolint:revive // WebhookDBManagerParams is a standard fx naming convention
|
|
type WebhookDBManagerParams struct {
|
|
fx.In
|
|
Config *config.Config
|
|
Logger *logger.Logger
|
|
}
|
|
|
|
// WebhookDBManager manages per-webhook SQLite database files for event storage.
|
|
// Each webhook gets its own dedicated database containing Events, Deliveries,
|
|
// and DeliveryResults. Database connections are opened lazily and cached.
|
|
type WebhookDBManager struct {
|
|
dataDir string
|
|
dbs sync.Map // map[webhookID]*gorm.DB
|
|
log *slog.Logger
|
|
}
|
|
|
|
// NewWebhookDBManager creates a new WebhookDBManager and registers lifecycle hooks.
|
|
func NewWebhookDBManager(lc fx.Lifecycle, params WebhookDBManagerParams) (*WebhookDBManager, error) {
|
|
m := &WebhookDBManager{
|
|
dataDir: params.Config.DataDir,
|
|
log: params.Logger.Get(),
|
|
}
|
|
|
|
// Create data directory if it doesn't exist
|
|
if err := os.MkdirAll(m.dataDir, 0750); err != nil {
|
|
return nil, fmt.Errorf("creating data directory %s: %w", m.dataDir, err)
|
|
}
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(_ context.Context) error { //nolint:revive // ctx unused but required by fx
|
|
return m.CloseAll()
|
|
},
|
|
})
|
|
|
|
m.log.Info("webhook database manager initialized", "data_dir", m.dataDir)
|
|
return m, nil
|
|
}
|
|
|
|
// dbPath returns the filesystem path for a webhook's database file.
|
|
func (m *WebhookDBManager) dbPath(webhookID string) string {
|
|
return filepath.Join(m.dataDir, fmt.Sprintf("events-%s.db", webhookID))
|
|
}
|
|
|
|
// openDB opens (or creates) a per-webhook SQLite database and runs migrations.
|
|
func (m *WebhookDBManager) openDB(webhookID string) (*gorm.DB, error) {
|
|
path := m.dbPath(webhookID)
|
|
dbURL := fmt.Sprintf("file:%s?cache=shared&mode=rwc", path)
|
|
|
|
sqlDB, err := sql.Open("sqlite", dbURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("opening webhook database %s: %w", webhookID, err)
|
|
}
|
|
|
|
db, err := gorm.Open(sqlite.Dialector{
|
|
Conn: sqlDB,
|
|
}, &gorm.Config{})
|
|
if err != nil {
|
|
sqlDB.Close()
|
|
return nil, fmt.Errorf("connecting to webhook database %s: %w", webhookID, err)
|
|
}
|
|
|
|
// Run migrations for event-tier models only
|
|
if err := db.AutoMigrate(&Event{}, &Delivery{}, &DeliveryResult{}); err != nil {
|
|
sqlDB.Close()
|
|
return nil, fmt.Errorf("migrating webhook database %s: %w", webhookID, err)
|
|
}
|
|
|
|
m.log.Info("opened per-webhook database", "webhook_id", webhookID, "path", path)
|
|
return db, nil
|
|
}
|
|
|
|
// GetDB returns the database connection for a webhook, creating the database
|
|
// file lazily if it doesn't exist. This handles both new webhooks and existing
|
|
// webhooks that were created before per-webhook databases were introduced.
|
|
func (m *WebhookDBManager) GetDB(webhookID string) (*gorm.DB, error) {
|
|
// Fast path: already open
|
|
if val, ok := m.dbs.Load(webhookID); ok {
|
|
cachedDB, castOK := val.(*gorm.DB)
|
|
if !castOK {
|
|
return nil, fmt.Errorf("invalid cached database type for webhook %s", webhookID)
|
|
}
|
|
return cachedDB, nil
|
|
}
|
|
|
|
// Slow path: open/create the database
|
|
db, err := m.openDB(webhookID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Store it; if another goroutine beat us, close ours and use theirs
|
|
actual, loaded := m.dbs.LoadOrStore(webhookID, db)
|
|
if loaded {
|
|
// Another goroutine created it first; close our duplicate
|
|
if sqlDB, closeErr := db.DB(); closeErr == nil {
|
|
sqlDB.Close()
|
|
}
|
|
existingDB, castOK := actual.(*gorm.DB)
|
|
if !castOK {
|
|
return nil, fmt.Errorf("invalid cached database type for webhook %s", webhookID)
|
|
}
|
|
return existingDB, nil
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
// CreateDB explicitly creates a new per-webhook database file and runs migrations.
|
|
// This is called when a new webhook is created.
|
|
func (m *WebhookDBManager) CreateDB(webhookID string) error {
|
|
_, err := m.GetDB(webhookID)
|
|
return err
|
|
}
|
|
|
|
// DBExists checks if a per-webhook database file exists on disk.
|
|
func (m *WebhookDBManager) DBExists(webhookID string) bool {
|
|
_, err := os.Stat(m.dbPath(webhookID))
|
|
return err == nil
|
|
}
|
|
|
|
// DeleteDB closes the connection and deletes the database file for a webhook.
|
|
// This performs a hard delete — the file is permanently removed.
|
|
func (m *WebhookDBManager) DeleteDB(webhookID string) error {
|
|
// Close and remove from cache
|
|
if val, ok := m.dbs.LoadAndDelete(webhookID); ok {
|
|
if gormDB, castOK := val.(*gorm.DB); castOK {
|
|
if sqlDB, err := gormDB.DB(); err == nil {
|
|
sqlDB.Close()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delete the main DB file and WAL/SHM files
|
|
path := m.dbPath(webhookID)
|
|
for _, suffix := range []string{"", "-wal", "-shm"} {
|
|
if err := os.Remove(path + suffix); err != nil && !os.IsNotExist(err) {
|
|
return fmt.Errorf("deleting webhook database file %s%s: %w", path, suffix, err)
|
|
}
|
|
}
|
|
|
|
m.log.Info("deleted per-webhook database", "webhook_id", webhookID)
|
|
return nil
|
|
}
|
|
|
|
// CloseAll closes all open per-webhook database connections.
|
|
// Called during application shutdown.
|
|
func (m *WebhookDBManager) CloseAll() error {
|
|
var lastErr error
|
|
m.dbs.Range(func(key, value interface{}) bool {
|
|
if gormDB, castOK := value.(*gorm.DB); castOK {
|
|
if sqlDB, err := gormDB.DB(); err == nil {
|
|
if closeErr := sqlDB.Close(); closeErr != nil {
|
|
lastErr = closeErr
|
|
m.log.Error("failed to close webhook database",
|
|
"webhook_id", key,
|
|
"error", closeErr,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
m.dbs.Delete(key)
|
|
return true
|
|
})
|
|
return lastErr
|
|
}
|