From 43c22a9e9ae3a0dc513e6e00ca2ad357a9213a7f Mon Sep 17 00:00:00 2001 From: clawbot Date: Sun, 1 Mar 2026 17:06:43 -0800 Subject: [PATCH] feat: implement per-webhook event databases Split data storage into main application DB (config only) and per-webhook event databases (one SQLite file per webhook). Architecture changes: - New WebhookDBManager component manages per-webhook DB lifecycle (create, open, cache, delete) with lazy connection pooling via sync.Map - Main DB (DBURL) stores only config: Users, Webhooks, Entrypoints, Targets, APIKeys - Per-webhook DBs (DATA_DIR) store Events, Deliveries, DeliveryResults in files named events-{webhook_uuid}.db - New DATA_DIR env var (default: ./data dev, /data/events prod) Behavioral changes: - Webhook creation creates per-webhook DB file - Webhook deletion hard-deletes per-webhook DB file (config soft-deleted) - Event ingestion writes to per-webhook DB, not main DB - Delivery engine polls all per-webhook DBs for pending deliveries - Database target type marks delivery as immediately successful (events are already in the dedicated per-webhook DB) - Event log UI reads from per-webhook DBs with targets from main DB - Existing webhooks without DB files get them created lazily Removed: - ArchivedEvent model (was a half-measure, replaced by per-webhook DBs) - Event/Delivery/DeliveryResult removed from main DB migrations Added: - Comprehensive tests for WebhookDBManager (create, delete, lazy creation, delivery workflow, multiple webhooks, close all) - Dockerfile creates /data/events directory README updates: - Per-webhook event databases documented as implemented (was Phase 2) - DATA_DIR added to configuration table - Docker instructions updated with data volume mount - Data model diagram updated - TODO updated (database separation moved to completed) Closes #15 --- Dockerfile | 5 +- README.md | 113 ++++--- cmd/webhooker/main.go | 1 + internal/config/config.go | 12 + internal/database/model_archived_event.go | 19 -- internal/database/models.go | 9 +- internal/database/webhook_db_manager.go | 183 ++++++++++++ internal/database/webhook_db_manager_test.go | 294 +++++++++++++++++++ internal/delivery/engine.go | 199 ++++++++----- internal/handlers/handlers.go | 13 +- internal/handlers/handlers_test.go | 4 + internal/handlers/source_management.go | 125 +++++--- internal/handlers/webhook.go | 35 ++- 13 files changed, 814 insertions(+), 198 deletions(-) delete mode 100644 internal/database/model_archived_event.go create mode 100644 internal/database/webhook_db_manager.go create mode 100644 internal/database/webhook_db_manager_test.go diff --git a/Dockerfile b/Dockerfile index 9d2022d..a2ce1fe 100644 --- a/Dockerfile +++ b/Dockerfile @@ -57,7 +57,10 @@ WORKDIR /app # Copy binary from builder COPY --from=builder /build/bin/webhooker . -RUN chown -R webhooker:webhooker /app +# Create data directory for per-webhook event databases +RUN mkdir -p /data/events + +RUN chown -R webhooker:webhooker /app /data/events USER webhooker diff --git a/README.md b/README.md index 3ca00aa..235a19f 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,8 @@ Configuration is resolved in this order (highest priority first): | ----------------------- | ----------------------------------- | -------- | | `WEBHOOKER_ENVIRONMENT` | `dev` or `prod` | `dev` | | `PORT` | HTTP listen port | `8080` | -| `DBURL` | SQLite database connection string | *(required)* | +| `DBURL` | SQLite connection string (main app DB) | *(required)* | +| `DATA_DIR` | Directory for per-webhook event DBs | `./data` (dev) / `/data/events` (prod) | | `SESSION_KEY` | Base64-encoded 32-byte session key | *(required in prod)* | | `DEBUG` | Enable debug logging | `false` | | `METRICS_USERNAME` | Basic auth username for `/metrics` | `""` | @@ -84,6 +85,7 @@ docker run -d \ -p 8080:8080 \ -v /path/to/data:/data \ -e DBURL="file:/data/webhooker.db?cache=shared&mode=rwc" \ + -e DATA_DIR="/data/events" \ -e SESSION_KEY="" \ -e WEBHOOKER_ENVIRONMENT=prod \ webhooker:latest @@ -91,7 +93,10 @@ docker run -d \ The container runs as a non-root user (`webhooker`, UID 1000), exposes port 8080, and includes a health check against -`/.well-known/healthcheck`. +`/.well-known/healthcheck`. The `/data` volume holds both the main +application database and the per-webhook event databases (in +`/data/events/`). Mount this as a persistent volume to preserve data +across container restarts. ## Rationale @@ -195,7 +200,7 @@ tier** (event ingestion, delivery, and logging). ┌─────────────────────────────────────────────────────────────┐ │ EVENT TIER │ -│ (planned: per-webhook dedicated database) │ +│ (per-webhook dedicated databases) │ │ │ │ ┌──────────┐ ┌──────────┐ ┌─────────────────┐ │ │ │ Event │──1:N──│ Delivery │──1:N──│ DeliveryResult │ │ @@ -286,8 +291,10 @@ events should be forwarded. Fire-and-forget: a single attempt with no retries. - **`retry`** — Forward the event via HTTP POST with automatic retry on failure. Uses exponential backoff up to `max_retries` attempts. -- **`database`** — Store the event in the webhook's database only (no - external delivery). Useful for pure logging/archival. +- **`database`** — Confirm the event is stored in the webhook's + per-webhook database (no external delivery). Since events are always + written to the per-webhook DB on ingestion, this target marks delivery + as immediately successful. Useful for ensuring durable event archival. - **`log`** — Write the event to the application log (stdout). Useful for debugging. @@ -384,21 +391,13 @@ All entities include these fields from `BaseModel`: ### Database Architecture -#### Current Implementation +#### Per-Webhook Event Databases -webhooker currently uses a **single SQLite database** for all data — -application configuration, user accounts, and (once implemented) event -storage. The database connection is managed by GORM with a single -connection string configured via `DBURL`. On first startup the database -is auto-migrated and an `admin` user is created. +webhooker uses **separate SQLite database files**: a main application +database for configuration data and per-webhook databases for event +storage. -#### Planned: Per-Webhook Event Databases (Phase 2) - -In a future phase (see TODO Phase 2 below), webhooker will split into -**separate SQLite database files**: a main application database for -configuration data and per-webhook databases for event storage. - -**Main Application Database** — will store: +**Main Application Database** (`DBURL`) — stores configuration only: - **Users** — accounts and Argon2id password hashes - **Webhooks** — webhook configurations @@ -406,14 +405,22 @@ configuration data and per-webhook databases for event storage. - **Targets** — delivery destination configurations - **APIKeys** — programmatic access credentials -**Per-Webhook Event Databases** — each webhook will get its own -dedicated SQLite file containing: +On first startup the main database is auto-migrated and an `admin` user +is created. + +**Per-Webhook Event Databases** (`DATA_DIR`) — each webhook gets its own +dedicated SQLite file named `events-{webhook_uuid}.db`, containing: - **Events** — captured incoming webhook payloads - **Deliveries** — event-to-target pairings and their status - **DeliveryResults** — individual delivery attempt logs -This planned separation will provide: +Per-webhook databases are created automatically when a webhook is +created (and lazily on first access for webhooks that predate this +feature). They are managed by the `WebhookDBManager` component, which +handles connection pooling, lazy opening, migrations, and cleanup. + +This separation provides: - **Isolation** — a high-volume webhook won't cause lock contention or WAL bloat affecting the main application or other webhooks. @@ -421,14 +428,21 @@ This planned separation will provide: backed up, archived, rotated, or size-limited without impacting the application. - **Clean deletion** — removing a webhook and all its history is as - simple as deleting one file. + simple as deleting one file. Configuration is soft-deleted in the main + DB; the event database file is hard-deleted (permanently removed). - **Per-webhook retention** — the `retention_days` field on each webhook - will control automatic cleanup of old events in that webhook's - database only. -- **Performance** — each webhook's database will have its own WAL, its - own page cache, and its own lock, so concurrent event ingestion across + controls automatic cleanup of old events in that webhook's database + only. +- **Performance** — each webhook's database has its own WAL, its own + page cache, and its own lock, so concurrent event ingestion across webhooks won't contend. +The **database target type** leverages this architecture: since events +are already stored in the per-webhook database by design, the database +target simply marks the delivery as immediately successful. The +per-webhook DB IS the dedicated event database — that's the whole point +of the database target type. + The database uses the [modernc.org/sqlite](https://pkg.go.dev/modernc.org/sqlite) driver at runtime, though CGO is required at build time due to the transitive @@ -549,16 +563,17 @@ webhooker/ │ ├── database/ │ │ ├── base_model.go # BaseModel with UUID primary keys │ │ ├── database.go # GORM connection, migrations, admin seed -│ │ ├── models.go # AutoMigrate for all models +│ │ ├── models.go # AutoMigrate for config-tier models │ │ ├── model_user.go # User entity │ │ ├── model_webhook.go # Webhook entity │ │ ├── model_entrypoint.go # Entrypoint entity │ │ ├── model_target.go # Target entity and TargetType enum -│ │ ├── model_event.go # Event entity -│ │ ├── model_delivery.go # Delivery entity and DeliveryStatus enum -│ │ ├── model_delivery_result.go # DeliveryResult entity +│ │ ├── model_event.go # Event entity (per-webhook DB) +│ │ ├── model_delivery.go # Delivery entity (per-webhook DB) +│ │ ├── model_delivery_result.go # DeliveryResult entity (per-webhook DB) │ │ ├── model_apikey.go # APIKey entity -│ │ └── password.go # Argon2id hashing and verification +│ │ ├── password.go # Argon2id hashing and verification +│ │ └── webhook_db_manager.go # Per-webhook DB lifecycle manager │ ├── globals/ │ │ └── globals.go # Build-time variables (appname, version, arch) │ ├── delivery/ @@ -604,13 +619,16 @@ Components are wired via Uber fx in this order: 1. `globals.New` — Build-time variables (appname, version, arch) 2. `logger.New` — Structured logging (slog with TTY detection) 3. `config.New` — Configuration loading (pkg/config + environment) -4. `database.New` — SQLite connection, migrations, admin user seed -5. `healthcheck.New` — Health check service -6. `session.New` — Cookie-based session manager -7. `handlers.New` — HTTP handlers -8. `middleware.New` — HTTP middleware -9. `delivery.New` — Background delivery engine -10. `server.New` — HTTP server and router +4. `database.New` — Main SQLite connection, config migrations, admin + user seed +5. `database.NewWebhookDBManager` — Per-webhook event database + lifecycle manager +6. `healthcheck.New` — Health check service +7. `session.New` — Cookie-based session manager +8. `handlers.New` — HTTP handlers +9. `middleware.New` — HTTP middleware +10. `delivery.New` — Background delivery engine +11. `server.New` — HTTP server and router The server starts via `fx.Invoke(func(*server.Server, *delivery.Engine) {})` which triggers the fx lifecycle hooks in dependency order. @@ -657,7 +675,8 @@ The Dockerfile uses a multi-stage build: 1. **Builder stage** (Debian-based `golang:1.24`) — installs golangci-lint, downloads dependencies, copies source, runs `make check` (format verification, linting, tests, compilation). -2. **Runtime stage** (`alpine:3.21`) — copies the binary, runs as +2. **Runtime stage** (`alpine:3.21`) — copies the binary, creates the + `/data/events` directory for per-webhook event databases, runs as non-root user, exposes port 8080, includes a health check. The builder uses Debian rather than Alpine because GORM's SQLite @@ -690,12 +709,21 @@ linted, tested, and compiled. - [x] Build event processing and target delivery engine - [x] Implement HTTP target type (fire-and-forget POST) - [x] Implement retry target type (exponential backoff) -- [x] Implement database target type (store only) +- [x] Implement database target type (store events in per-webhook DB) - [x] Implement log target type (console output) - [x] Webhook management pages (list, create, edit, delete) - [x] Webhook request log viewer with pagination - [x] Entrypoint and target management UI +### Completed: Per-Webhook Event Databases +- [x] Split into main application DB + per-webhook event DBs +- [x] Per-webhook database lifecycle management (create on webhook + creation, delete on webhook removal) +- [x] `WebhookDBManager` component with lazy connection pooling +- [x] Delivery engine polls all per-webhook DBs for pending deliveries +- [x] Database target type marks delivery as immediately successful + (events are already in the per-webhook DB) + ### Remaining: Core Features - [ ] Per-webhook rate limiting in the receiver handler - [ ] Webhook signature verification (GitHub, Stripe formats) @@ -708,11 +736,8 @@ linted, tested, and compiled. - [ ] Analytics dashboard (success rates, response times) - [ ] Delivery status and retry management UI -### Remaining: Database Separation -- [ ] Split into main application DB + per-webhook event DBs +### Remaining: Event Maintenance - [ ] Automatic event retention cleanup based on `retention_days` -- [ ] Per-webhook database lifecycle management (create on webhook - creation, delete on webhook removal) ### Remaining: REST API - [ ] RESTful CRUD for webhooks, entrypoints, targets diff --git a/cmd/webhooker/main.go b/cmd/webhooker/main.go index f10b35c..e09436a 100644 --- a/cmd/webhooker/main.go +++ b/cmd/webhooker/main.go @@ -33,6 +33,7 @@ func main() { logger.New, config.New, database.New, + database.NewWebhookDBManager, healthcheck.New, session.New, handlers.New, diff --git a/internal/config/config.go b/internal/config/config.go index 5f9abed..465320d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -37,6 +37,7 @@ type ConfigParams struct { type Config struct { DBURL string + DataDir string Debug bool MaintenanceMode bool DevelopmentMode bool @@ -116,6 +117,7 @@ func New(lc fx.Lifecycle, params ConfigParams) (*Config, error) { // Load configuration values — env vars take precedence over config.yaml s := &Config{ DBURL: envString("DBURL", "dburl"), + DataDir: envString("DATA_DIR", "dataDir"), Debug: envBool("DEBUG", "debug"), MaintenanceMode: envBool("MAINTENANCE_MODE", "maintenanceMode"), DevelopmentMode: envBool("DEVELOPMENT_MODE", "developmentMode"), @@ -129,6 +131,15 @@ func New(lc fx.Lifecycle, params ConfigParams) (*Config, error) { params: ¶ms, } + // Set default DataDir based on environment + if s.DataDir == "" { + if s.IsProd() { + s.DataDir = "/data/events" + } else { + s.DataDir = "./data" + } + } + // Validate database URL if s.DBURL == "" { return nil, fmt.Errorf("database URL (DBURL) is required") @@ -156,6 +167,7 @@ func New(lc fx.Lifecycle, params ConfigParams) (*Config, error) { "debug", s.Debug, "maintenanceMode", s.MaintenanceMode, "developmentMode", s.DevelopmentMode, + "dataDir", s.DataDir, "hasSessionKey", s.SessionKey != "", "hasSentryDSN", s.SentryDSN != "", "hasMetricsAuth", s.MetricsUsername != "" && s.MetricsPassword != "", diff --git a/internal/database/model_archived_event.go b/internal/database/model_archived_event.go deleted file mode 100644 index 9f75d23..0000000 --- a/internal/database/model_archived_event.go +++ /dev/null @@ -1,19 +0,0 @@ -package database - -// ArchivedEvent stores webhook events delivered via the "database" target type. -// These records persist independently of internal event retention and pruning, -// providing a durable archive for downstream consumption. -type ArchivedEvent struct { - BaseModel - - WebhookID string `gorm:"type:uuid;not null;index" json:"webhook_id"` - EntrypointID string `gorm:"type:uuid;not null" json:"entrypoint_id"` - EventID string `gorm:"type:uuid;not null" json:"event_id"` - TargetID string `gorm:"type:uuid;not null" json:"target_id"` - - // Original request data (copied from Event at archive time) - Method string `gorm:"not null" json:"method"` - Headers string `gorm:"type:text" json:"headers"` // JSON - Body string `gorm:"type:text" json:"body"` - ContentType string `json:"content_type"` -} diff --git a/internal/database/models.go b/internal/database/models.go index 23dea14..c5fa30a 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -1,6 +1,9 @@ package database -// Migrate runs database migrations for all models +// Migrate runs database migrations for the main application database. +// Only configuration-tier models are stored in the main database. +// Event-tier models (Event, Delivery, DeliveryResult) live in +// per-webhook dedicated databases managed by WebhookDBManager. func (d *Database) Migrate() error { return d.db.AutoMigrate( &User{}, @@ -8,9 +11,5 @@ func (d *Database) Migrate() error { &Webhook{}, &Entrypoint{}, &Target{}, - &Event{}, - &Delivery{}, - &DeliveryResult{}, - &ArchivedEvent{}, ) } diff --git a/internal/database/webhook_db_manager.go b/internal/database/webhook_db_manager.go new file mode 100644 index 0000000..56e19be --- /dev/null +++ b/internal/database/webhook_db_manager.go @@ -0,0 +1,183 @@ +package database + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "os" + "path/filepath" + "sync" + + "go.uber.org/fx" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "sneak.berlin/go/webhooker/internal/config" + "sneak.berlin/go/webhooker/internal/logger" +) + +// nolint:revive // WebhookDBManagerParams is a standard fx naming convention +type WebhookDBManagerParams struct { + fx.In + Config *config.Config + Logger *logger.Logger +} + +// WebhookDBManager manages per-webhook SQLite database files for event storage. +// Each webhook gets its own dedicated database containing Events, Deliveries, +// and DeliveryResults. Database connections are opened lazily and cached. +type WebhookDBManager struct { + dataDir string + dbs sync.Map // map[webhookID]*gorm.DB + log *slog.Logger +} + +// NewWebhookDBManager creates a new WebhookDBManager and registers lifecycle hooks. +func NewWebhookDBManager(lc fx.Lifecycle, params WebhookDBManagerParams) (*WebhookDBManager, error) { + m := &WebhookDBManager{ + dataDir: params.Config.DataDir, + log: params.Logger.Get(), + } + + // Create data directory if it doesn't exist + if err := os.MkdirAll(m.dataDir, 0750); err != nil { + return nil, fmt.Errorf("creating data directory %s: %w", m.dataDir, err) + } + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { //nolint:revive // ctx unused but required by fx + return m.CloseAll() + }, + }) + + m.log.Info("webhook database manager initialized", "data_dir", m.dataDir) + return m, nil +} + +// dbPath returns the filesystem path for a webhook's database file. +func (m *WebhookDBManager) dbPath(webhookID string) string { + return filepath.Join(m.dataDir, fmt.Sprintf("events-%s.db", webhookID)) +} + +// openDB opens (or creates) a per-webhook SQLite database and runs migrations. +func (m *WebhookDBManager) openDB(webhookID string) (*gorm.DB, error) { + path := m.dbPath(webhookID) + dbURL := fmt.Sprintf("file:%s?cache=shared&mode=rwc", path) + + sqlDB, err := sql.Open("sqlite", dbURL) + if err != nil { + return nil, fmt.Errorf("opening webhook database %s: %w", webhookID, err) + } + + db, err := gorm.Open(sqlite.Dialector{ + Conn: sqlDB, + }, &gorm.Config{}) + if err != nil { + sqlDB.Close() + return nil, fmt.Errorf("connecting to webhook database %s: %w", webhookID, err) + } + + // Run migrations for event-tier models only + if err := db.AutoMigrate(&Event{}, &Delivery{}, &DeliveryResult{}); err != nil { + sqlDB.Close() + return nil, fmt.Errorf("migrating webhook database %s: %w", webhookID, err) + } + + m.log.Info("opened per-webhook database", "webhook_id", webhookID, "path", path) + return db, nil +} + +// GetDB returns the database connection for a webhook, creating the database +// file lazily if it doesn't exist. This handles both new webhooks and existing +// webhooks that were created before per-webhook databases were introduced. +func (m *WebhookDBManager) GetDB(webhookID string) (*gorm.DB, error) { + // Fast path: already open + if val, ok := m.dbs.Load(webhookID); ok { + cachedDB, castOK := val.(*gorm.DB) + if !castOK { + return nil, fmt.Errorf("invalid cached database type for webhook %s", webhookID) + } + return cachedDB, nil + } + + // Slow path: open/create the database + db, err := m.openDB(webhookID) + if err != nil { + return nil, err + } + + // Store it; if another goroutine beat us, close ours and use theirs + actual, loaded := m.dbs.LoadOrStore(webhookID, db) + if loaded { + // Another goroutine created it first; close our duplicate + if sqlDB, closeErr := db.DB(); closeErr == nil { + sqlDB.Close() + } + existingDB, castOK := actual.(*gorm.DB) + if !castOK { + return nil, fmt.Errorf("invalid cached database type for webhook %s", webhookID) + } + return existingDB, nil + } + + return db, nil +} + +// CreateDB explicitly creates a new per-webhook database file and runs migrations. +// This is called when a new webhook is created. +func (m *WebhookDBManager) CreateDB(webhookID string) error { + _, err := m.GetDB(webhookID) + return err +} + +// DBExists checks if a per-webhook database file exists on disk. +func (m *WebhookDBManager) DBExists(webhookID string) bool { + _, err := os.Stat(m.dbPath(webhookID)) + return err == nil +} + +// DeleteDB closes the connection and deletes the database file for a webhook. +// This performs a hard delete — the file is permanently removed. +func (m *WebhookDBManager) DeleteDB(webhookID string) error { + // Close and remove from cache + if val, ok := m.dbs.LoadAndDelete(webhookID); ok { + if gormDB, castOK := val.(*gorm.DB); castOK { + if sqlDB, err := gormDB.DB(); err == nil { + sqlDB.Close() + } + } + } + + // Delete the main DB file and WAL/SHM files + path := m.dbPath(webhookID) + for _, suffix := range []string{"", "-wal", "-shm"} { + if err := os.Remove(path + suffix); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("deleting webhook database file %s%s: %w", path, suffix, err) + } + } + + m.log.Info("deleted per-webhook database", "webhook_id", webhookID) + return nil +} + +// CloseAll closes all open per-webhook database connections. +// Called during application shutdown. +func (m *WebhookDBManager) CloseAll() error { + var lastErr error + m.dbs.Range(func(key, value interface{}) bool { + if gormDB, castOK := value.(*gorm.DB); castOK { + if sqlDB, err := gormDB.DB(); err == nil { + if closeErr := sqlDB.Close(); closeErr != nil { + lastErr = closeErr + m.log.Error("failed to close webhook database", + "webhook_id", key, + "error", closeErr, + ) + } + } + } + m.dbs.Delete(key) + return true + }) + return lastErr +} diff --git a/internal/database/webhook_db_manager_test.go b/internal/database/webhook_db_manager_test.go new file mode 100644 index 0000000..5410787 --- /dev/null +++ b/internal/database/webhook_db_manager_test.go @@ -0,0 +1,294 @@ +package database + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/google/uuid" + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/fx/fxtest" + "sneak.berlin/go/webhooker/internal/config" + "sneak.berlin/go/webhooker/internal/globals" + "sneak.berlin/go/webhooker/internal/logger" + pkgconfig "sneak.berlin/go/webhooker/pkg/config" +) + +func setupTestWebhookDBManager(t *testing.T) (*WebhookDBManager, *fxtest.Lifecycle) { + t.Helper() + + fs := afero.NewMemMapFs() + testConfigYAML := ` +environments: + dev: + config: + port: 8080 + debug: false + dburl: "file::memory:?cache=shared" + secrets: + sessionKey: d2ViaG9va2VyLWRldi1zZXNzaW9uLWtleS1pbnNlY3VyZSE= +configDefaults: + port: 8080 +` + require.NoError(t, afero.WriteFile(fs, "config.yaml", []byte(testConfigYAML), 0644)) + pkgconfig.SetFs(fs) + + lc := fxtest.NewLifecycle(t) + + globals.Appname = "webhooker-test" + globals.Version = "test" + globals.Buildarch = "test" + + g, err := globals.New(lc) + require.NoError(t, err) + + l, err := logger.New(lc, logger.LoggerParams{Globals: g}) + require.NoError(t, err) + + dataDir := filepath.Join(t.TempDir(), "events") + + cfg := &config.Config{ + DBURL: "file::memory:?cache=shared", + DataDir: dataDir, + SessionKey: "d2ViaG9va2VyLWRldi1zZXNzaW9uLWtleS1pbnNlY3VyZSE=", + } + _ = cfg + + mgr, err := NewWebhookDBManager(lc, WebhookDBManagerParams{ + Config: cfg, + Logger: l, + }) + require.NoError(t, err) + + return mgr, lc +} + +func TestWebhookDBManager_CreateAndGetDB(t *testing.T) { + mgr, lc := setupTestWebhookDBManager(t) + ctx := context.Background() + require.NoError(t, lc.Start(ctx)) + defer func() { require.NoError(t, lc.Stop(ctx)) }() + + webhookID := uuid.New().String() + + // DB should not exist yet + assert.False(t, mgr.DBExists(webhookID)) + + // Create the DB + err := mgr.CreateDB(webhookID) + require.NoError(t, err) + + // DB file should now exist + assert.True(t, mgr.DBExists(webhookID)) + + // Get the DB again (should use cached connection) + db, err := mgr.GetDB(webhookID) + require.NoError(t, err) + require.NotNil(t, db) + + // Verify we can write an event + event := &Event{ + WebhookID: webhookID, + EntrypointID: uuid.New().String(), + Method: "POST", + Headers: `{"Content-Type":["application/json"]}`, + Body: `{"test": true}`, + ContentType: "application/json", + } + require.NoError(t, db.Create(event).Error) + assert.NotEmpty(t, event.ID) + + // Verify we can read it back + var readEvent Event + require.NoError(t, db.First(&readEvent, "id = ?", event.ID).Error) + assert.Equal(t, webhookID, readEvent.WebhookID) + assert.Equal(t, "POST", readEvent.Method) + assert.Equal(t, `{"test": true}`, readEvent.Body) +} + +func TestWebhookDBManager_DeleteDB(t *testing.T) { + mgr, lc := setupTestWebhookDBManager(t) + ctx := context.Background() + require.NoError(t, lc.Start(ctx)) + defer func() { require.NoError(t, lc.Stop(ctx)) }() + + webhookID := uuid.New().String() + + // Create the DB and write some data + require.NoError(t, mgr.CreateDB(webhookID)) + db, err := mgr.GetDB(webhookID) + require.NoError(t, err) + + event := &Event{ + WebhookID: webhookID, + EntrypointID: uuid.New().String(), + Method: "POST", + Body: `{"test": true}`, + ContentType: "application/json", + } + require.NoError(t, db.Create(event).Error) + + // Delete the DB + require.NoError(t, mgr.DeleteDB(webhookID)) + + // File should no longer exist + assert.False(t, mgr.DBExists(webhookID)) + + // Verify the file is actually gone from disk + dbPath := mgr.dbPath(webhookID) + _, err = os.Stat(dbPath) + assert.True(t, os.IsNotExist(err)) +} + +func TestWebhookDBManager_LazyCreation(t *testing.T) { + mgr, lc := setupTestWebhookDBManager(t) + ctx := context.Background() + require.NoError(t, lc.Start(ctx)) + defer func() { require.NoError(t, lc.Stop(ctx)) }() + + webhookID := uuid.New().String() + + // GetDB should lazily create the database + db, err := mgr.GetDB(webhookID) + require.NoError(t, err) + require.NotNil(t, db) + + // File should now exist + assert.True(t, mgr.DBExists(webhookID)) +} + +func TestWebhookDBManager_DeliveryWorkflow(t *testing.T) { + mgr, lc := setupTestWebhookDBManager(t) + ctx := context.Background() + require.NoError(t, lc.Start(ctx)) + defer func() { require.NoError(t, lc.Stop(ctx)) }() + + webhookID := uuid.New().String() + targetID := uuid.New().String() + + db, err := mgr.GetDB(webhookID) + require.NoError(t, err) + + // Create an event + event := &Event{ + WebhookID: webhookID, + EntrypointID: uuid.New().String(), + Method: "POST", + Headers: `{"Content-Type":["application/json"]}`, + Body: `{"payload": "test"}`, + ContentType: "application/json", + } + require.NoError(t, db.Create(event).Error) + + // Create a delivery + delivery := &Delivery{ + EventID: event.ID, + TargetID: targetID, + Status: DeliveryStatusPending, + } + require.NoError(t, db.Create(delivery).Error) + + // Query pending deliveries + var pending []Delivery + require.NoError(t, db.Where("status = ?", DeliveryStatusPending). + Preload("Event"). + Find(&pending).Error) + require.Len(t, pending, 1) + assert.Equal(t, event.ID, pending[0].EventID) + assert.Equal(t, "POST", pending[0].Event.Method) + + // Create a delivery result + result := &DeliveryResult{ + DeliveryID: delivery.ID, + AttemptNum: 1, + Success: true, + StatusCode: 200, + Duration: 42, + } + require.NoError(t, db.Create(result).Error) + + // Update delivery status + require.NoError(t, db.Model(delivery).Update("status", DeliveryStatusDelivered).Error) + + // Verify no more pending deliveries + var stillPending []Delivery + require.NoError(t, db.Where("status = ?", DeliveryStatusPending).Find(&stillPending).Error) + assert.Empty(t, stillPending) +} + +func TestWebhookDBManager_MultipleWebhooks(t *testing.T) { + mgr, lc := setupTestWebhookDBManager(t) + ctx := context.Background() + require.NoError(t, lc.Start(ctx)) + defer func() { require.NoError(t, lc.Stop(ctx)) }() + + webhook1 := uuid.New().String() + webhook2 := uuid.New().String() + + // Create DBs for two webhooks + require.NoError(t, mgr.CreateDB(webhook1)) + require.NoError(t, mgr.CreateDB(webhook2)) + + db1, err := mgr.GetDB(webhook1) + require.NoError(t, err) + db2, err := mgr.GetDB(webhook2) + require.NoError(t, err) + + // Write events to each webhook's DB + event1 := &Event{ + WebhookID: webhook1, + EntrypointID: uuid.New().String(), + Method: "POST", + Body: `{"webhook": 1}`, + ContentType: "application/json", + } + event2 := &Event{ + WebhookID: webhook2, + EntrypointID: uuid.New().String(), + Method: "PUT", + Body: `{"webhook": 2}`, + ContentType: "application/json", + } + require.NoError(t, db1.Create(event1).Error) + require.NoError(t, db2.Create(event2).Error) + + // Verify isolation: each DB only has its own events + var count1 int64 + db1.Model(&Event{}).Count(&count1) + assert.Equal(t, int64(1), count1) + + var count2 int64 + db2.Model(&Event{}).Count(&count2) + assert.Equal(t, int64(1), count2) + + // Delete webhook1's DB, webhook2 should be unaffected + require.NoError(t, mgr.DeleteDB(webhook1)) + assert.False(t, mgr.DBExists(webhook1)) + assert.True(t, mgr.DBExists(webhook2)) + + // webhook2's data should still be accessible + var events []Event + require.NoError(t, db2.Find(&events).Error) + assert.Len(t, events, 1) + assert.Equal(t, "PUT", events[0].Method) +} + +func TestWebhookDBManager_CloseAll(t *testing.T) { + mgr, lc := setupTestWebhookDBManager(t) + ctx := context.Background() + require.NoError(t, lc.Start(ctx)) + + // Create a few DBs + for i := 0; i < 3; i++ { + require.NoError(t, mgr.CreateDB(uuid.New().String())) + } + + // CloseAll should close all connections without error + require.NoError(t, mgr.CloseAll()) + + // Stop lifecycle (CloseAll already called, but shouldn't panic) + require.NoError(t, lc.Stop(ctx)) +} diff --git a/internal/delivery/engine.go b/internal/delivery/engine.go index a2c2e0d..af27d3d 100644 --- a/internal/delivery/engine.go +++ b/internal/delivery/engine.go @@ -12,6 +12,7 @@ import ( "time" "go.uber.org/fx" + "gorm.io/gorm" "sneak.berlin/go/webhooker/internal/database" "sneak.berlin/go/webhooker/internal/logger" ) @@ -39,24 +40,29 @@ type HTTPTargetConfig struct { //nolint:revive // EngineParams is a standard fx naming convention type EngineParams struct { fx.In - DB *database.Database - Logger *logger.Logger + DB *database.Database + DBManager *database.WebhookDBManager + Logger *logger.Logger } // Engine processes queued deliveries in the background. +// It iterates over all active webhooks and polls each webhook's +// per-webhook database for pending deliveries. type Engine struct { - database *database.Database - log *slog.Logger - client *http.Client - cancel context.CancelFunc - wg sync.WaitGroup + database *database.Database + dbManager *database.WebhookDBManager + log *slog.Logger + client *http.Client + cancel context.CancelFunc + wg sync.WaitGroup } // New creates and registers the delivery engine with the fx lifecycle. func New(lc fx.Lifecycle, params EngineParams) *Engine { e := &Engine{ - database: params.DB, - log: params.Logger.Get(), + database: params.DB, + dbManager: params.DBManager, + log: params.Logger.Get(), client: &http.Client{ Timeout: httpClientTimeout, }, @@ -107,60 +113,133 @@ func (e *Engine) run(ctx context.Context) { } } +// processPending iterates over all active webhooks and processes pending +// deliveries from each webhook's per-webhook database. func (e *Engine) processPending(ctx context.Context) { + // Get all active webhook IDs from the main application database + var webhookIDs []string + if err := e.database.DB().Model(&database.Webhook{}).Pluck("id", &webhookIDs).Error; err != nil { + e.log.Error("failed to query webhook IDs", "error", err) + return + } + + for _, webhookID := range webhookIDs { + select { + case <-ctx.Done(): + return + default: + // Only process webhooks that have an event database file + if !e.dbManager.DBExists(webhookID) { + continue + } + e.processWebhookDeliveries(ctx, webhookID) + } + } +} + +// processWebhookDeliveries polls a single webhook's database for pending +// deliveries and processes them. +func (e *Engine) processWebhookDeliveries(ctx context.Context, webhookID string) { + webhookDB, err := e.dbManager.GetDB(webhookID) + if err != nil { + e.log.Error("failed to get webhook database", + "webhook_id", webhookID, + "error", err, + ) + return + } + + // Query pending and retrying deliveries from the per-webhook DB. + // Preload Event (same DB) but NOT Target (Target is in the main DB). var deliveries []database.Delivery - result := e.database.DB(). + result := webhookDB. Where("status IN ?", []database.DeliveryStatus{ database.DeliveryStatusPending, database.DeliveryStatusRetrying, }). - Preload("Target"). Preload("Event"). Find(&deliveries) if result.Error != nil { - e.log.Error("failed to query pending deliveries", "error", result.Error) + e.log.Error("failed to query pending deliveries", + "webhook_id", webhookID, + "error", result.Error, + ) return } + if len(deliveries) == 0 { + return + } + + // Collect unique target IDs and load targets from the main DB + seen := make(map[string]bool) + targetIDs := make([]string, 0, len(deliveries)) + for _, d := range deliveries { + if !seen[d.TargetID] { + targetIDs = append(targetIDs, d.TargetID) + seen[d.TargetID] = true + } + } + + var targets []database.Target + if err := e.database.DB().Where("id IN ?", targetIDs).Find(&targets).Error; err != nil { + e.log.Error("failed to load targets from main DB", "error", err) + return + } + + targetMap := make(map[string]database.Target, len(targets)) + for _, t := range targets { + targetMap[t.ID] = t + } + for i := range deliveries { select { case <-ctx.Done(): return default: - e.processDelivery(ctx, &deliveries[i]) + target, ok := targetMap[deliveries[i].TargetID] + if !ok { + e.log.Error("target not found for delivery", + "delivery_id", deliveries[i].ID, + "target_id", deliveries[i].TargetID, + ) + continue + } + deliveries[i].Target = target + e.processDelivery(ctx, webhookDB, &deliveries[i]) } } } -func (e *Engine) processDelivery(ctx context.Context, d *database.Delivery) { +func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *database.Delivery) { switch d.Target.Type { case database.TargetTypeHTTP: - e.deliverHTTP(ctx, d) + e.deliverHTTP(ctx, webhookDB, d) case database.TargetTypeRetry: - e.deliverRetry(ctx, d) + e.deliverRetry(ctx, webhookDB, d) case database.TargetTypeDatabase: - e.deliverDatabase(d) + e.deliverDatabase(webhookDB, d) case database.TargetTypeLog: - e.deliverLog(d) + e.deliverLog(webhookDB, d) default: e.log.Error("unknown target type", "target_id", d.TargetID, "type", d.Target.Type, ) - e.updateDeliveryStatus(d, database.DeliveryStatusFailed) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } } -func (e *Engine) deliverHTTP(_ context.Context, d *database.Delivery) { +func (e *Engine) deliverHTTP(_ context.Context, webhookDB *gorm.DB, d *database.Delivery) { cfg, err := e.parseHTTPConfig(d.Target.Config) if err != nil { e.log.Error("invalid HTTP target config", "target_id", d.TargetID, "error", err, ) - e.recordResult(d, 1, false, 0, "", err.Error(), 0) - e.updateDeliveryStatus(d, database.DeliveryStatusFailed) + e.recordResult(webhookDB, d, 1, false, 0, "", err.Error(), 0) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) return } @@ -172,36 +251,36 @@ func (e *Engine) deliverHTTP(_ context.Context, d *database.Delivery) { errMsg = err.Error() } - e.recordResult(d, 1, success, statusCode, respBody, errMsg, duration) + e.recordResult(webhookDB, d, 1, success, statusCode, respBody, errMsg, duration) if success { - e.updateDeliveryStatus(d, database.DeliveryStatusDelivered) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) } else { - e.updateDeliveryStatus(d, database.DeliveryStatusFailed) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } } -func (e *Engine) deliverRetry(_ context.Context, d *database.Delivery) { +func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database.Delivery) { cfg, err := e.parseHTTPConfig(d.Target.Config) if err != nil { e.log.Error("invalid retry target config", "target_id", d.TargetID, "error", err, ) - e.recordResult(d, 1, false, 0, "", err.Error(), 0) - e.updateDeliveryStatus(d, database.DeliveryStatusFailed) + e.recordResult(webhookDB, d, 1, false, 0, "", err.Error(), 0) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) return } - // Determine attempt number from existing results + // Determine attempt number from existing results (in per-webhook DB) var resultCount int64 - e.database.DB().Model(&database.DeliveryResult{}).Where("delivery_id = ?", d.ID).Count(&resultCount) + webhookDB.Model(&database.DeliveryResult{}).Where("delivery_id = ?", d.ID).Count(&resultCount) attemptNum := int(resultCount) + 1 // Check if we should wait before retrying (exponential backoff) if attemptNum > 1 { var lastResult database.DeliveryResult - lookupErr := e.database.DB().Where("delivery_id = ?", d.ID).Order("created_at DESC").First(&lastResult).Error + lookupErr := webhookDB.Where("delivery_id = ?", d.ID).Order("created_at DESC").First(&lastResult).Error if lookupErr == nil { shift := attemptNum - 2 if shift > 30 { @@ -224,10 +303,10 @@ func (e *Engine) deliverRetry(_ context.Context, d *database.Delivery) { errMsg = err.Error() } - e.recordResult(d, attemptNum, success, statusCode, respBody, errMsg, duration) + e.recordResult(webhookDB, d, attemptNum, success, statusCode, respBody, errMsg, duration) if success { - e.updateDeliveryStatus(d, database.DeliveryStatusDelivered) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) return } @@ -237,44 +316,22 @@ func (e *Engine) deliverRetry(_ context.Context, d *database.Delivery) { } if attemptNum >= maxRetries { - e.updateDeliveryStatus(d, database.DeliveryStatusFailed) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } else { - e.updateDeliveryStatus(d, database.DeliveryStatusRetrying) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying) } } -func (e *Engine) deliverDatabase(d *database.Delivery) { - // Write the event to the dedicated archived_events table. This table - // persists independently of internal event retention/pruning, so the - // data remains available for external consumption even after the - // original event is cleaned up. - archived := &database.ArchivedEvent{ - WebhookID: d.Event.WebhookID, - EntrypointID: d.Event.EntrypointID, - EventID: d.EventID, - TargetID: d.TargetID, - Method: d.Event.Method, - Headers: d.Event.Headers, - Body: d.Event.Body, - ContentType: d.Event.ContentType, - } - - if err := e.database.DB().Create(archived).Error; err != nil { - e.log.Error("failed to archive event", - "delivery_id", d.ID, - "event_id", d.EventID, - "error", err, - ) - e.recordResult(d, 1, false, 0, "", err.Error(), 0) - e.updateDeliveryStatus(d, database.DeliveryStatusFailed) - return - } - - e.recordResult(d, 1, true, 0, "", "", 0) - e.updateDeliveryStatus(d, database.DeliveryStatusDelivered) +// deliverDatabase handles the database target type. Since events are already +// stored in the per-webhook database (that's the whole point of per-webhook +// databases), the database target simply marks the delivery as successful. +// The per-webhook DB IS the dedicated event database for this webhook. +func (e *Engine) deliverDatabase(webhookDB *gorm.DB, d *database.Delivery) { + e.recordResult(webhookDB, d, 1, true, 0, "", "", 0) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) } -func (e *Engine) deliverLog(d *database.Delivery) { +func (e *Engine) deliverLog(webhookDB *gorm.DB, d *database.Delivery) { e.log.Info("webhook event delivered to log target", "delivery_id", d.ID, "event_id", d.EventID, @@ -284,8 +341,8 @@ func (e *Engine) deliverLog(d *database.Delivery) { "content_type", d.Event.ContentType, "body_length", len(d.Event.Body), ) - e.recordResult(d, 1, true, 0, "", "", 0) - e.updateDeliveryStatus(d, database.DeliveryStatusDelivered) + e.recordResult(webhookDB, d, 1, true, 0, "", "", 0) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) } // doHTTPRequest performs the outbound HTTP POST to a target URL. @@ -343,7 +400,7 @@ func (e *Engine) doHTTPRequest(cfg *HTTPTargetConfig, event *database.Event) (st return resp.StatusCode, string(body), durationMs, nil } -func (e *Engine) recordResult(d *database.Delivery, attemptNum int, success bool, statusCode int, respBody, errMsg string, durationMs int64) { +func (e *Engine) recordResult(webhookDB *gorm.DB, d *database.Delivery, attemptNum int, success bool, statusCode int, respBody, errMsg string, durationMs int64) { result := &database.DeliveryResult{ DeliveryID: d.ID, AttemptNum: attemptNum, @@ -354,7 +411,7 @@ func (e *Engine) recordResult(d *database.Delivery, attemptNum int, success bool Duration: durationMs, } - if err := e.database.DB().Create(result).Error; err != nil { + if err := webhookDB.Create(result).Error; err != nil { e.log.Error("failed to record delivery result", "delivery_id", d.ID, "error", err, @@ -362,8 +419,8 @@ func (e *Engine) recordResult(d *database.Delivery, attemptNum int, success bool } } -func (e *Engine) updateDeliveryStatus(d *database.Delivery, status database.DeliveryStatus) { - if err := e.database.DB().Model(d).Update("status", status).Error; err != nil { +func (e *Engine) updateDeliveryStatus(webhookDB *gorm.DB, d *database.Delivery, status database.DeliveryStatus) { + if err := webhookDB.Model(d).Update("status", status).Error; err != nil { e.log.Error("failed to update delivery status", "delivery_id", d.ID, "status", status, diff --git a/internal/handlers/handlers.go b/internal/handlers/handlers.go index 55dadb1..417aed5 100644 --- a/internal/handlers/handlers.go +++ b/internal/handlers/handlers.go @@ -19,11 +19,12 @@ import ( // nolint:revive // HandlersParams is a standard fx naming convention type HandlersParams struct { fx.In - Logger *logger.Logger - Globals *globals.Globals - Database *database.Database - Healthcheck *healthcheck.Healthcheck - Session *session.Session + Logger *logger.Logger + Globals *globals.Globals + Database *database.Database + WebhookDBMgr *database.WebhookDBManager + Healthcheck *healthcheck.Healthcheck + Session *session.Session } type Handlers struct { @@ -31,6 +32,7 @@ type Handlers struct { log *slog.Logger hc *healthcheck.Healthcheck db *database.Database + dbMgr *database.WebhookDBManager session *session.Session templates map[string]*template.Template } @@ -53,6 +55,7 @@ func New(lc fx.Lifecycle, params HandlersParams) (*Handlers, error) { s.log = params.Logger.Get() s.hc = params.Healthcheck s.db = params.Database + s.dbMgr = params.WebhookDBMgr s.session = params.Session // Parse all page templates once at startup diff --git a/internal/handlers/handlers_test.go b/internal/handlers/handlers_test.go index 4acafca..5b12e59 100644 --- a/internal/handlers/handlers_test.go +++ b/internal/handlers/handlers_test.go @@ -30,6 +30,7 @@ func TestHandleIndex(t *testing.T) { return &config.Config{ // This is a base64 encoded 32-byte key: "test-session-key-32-bytes-long!!" SessionKey: "dGVzdC1zZXNzaW9uLWtleS0zMi1ieXRlcy1sb25nISE=", + DataDir: t.TempDir(), } }, func() *database.Database { @@ -37,6 +38,7 @@ func TestHandleIndex(t *testing.T) { db := &database.Database{} return db }, + database.NewWebhookDBManager, healthcheck.New, session.New, New, @@ -64,12 +66,14 @@ func TestRenderTemplate(t *testing.T) { return &config.Config{ // This is a base64 encoded 32-byte key: "test-session-key-32-bytes-long!!" SessionKey: "dGVzdC1zZXNzaW9uLWtleS0zMi1ieXRlcy1sb25nISE=", + DataDir: t.TempDir(), } }, func() *database.Database { // Mock database return &database.Database{} }, + database.NewWebhookDBManager, healthcheck.New, session.New, New, diff --git a/internal/handlers/source_management.go b/internal/handlers/source_management.go index 26441ca..0e3b197 100644 --- a/internal/handlers/source_management.go +++ b/internal/handlers/source_management.go @@ -40,7 +40,13 @@ func (h *Handlers) HandleSourceList() http.HandlerFunc { items[i].Webhook = webhooks[i] h.db.DB().Model(&database.Entrypoint{}).Where("webhook_id = ?", webhooks[i].ID).Count(&items[i].EntrypointCount) h.db.DB().Model(&database.Target{}).Where("webhook_id = ?", webhooks[i].ID).Count(&items[i].TargetCount) - h.db.DB().Model(&database.Event{}).Where("webhook_id = ?", webhooks[i].ID).Count(&items[i].EventCount) + + // Event count comes from per-webhook DB + if h.dbMgr.DBExists(webhooks[i].ID) { + if webhookDB, err := h.dbMgr.GetDB(webhooks[i].ID); err == nil { + webhookDB.Model(&database.Event{}).Count(&items[i].EventCount) + } + } } data := map[string]interface{}{ @@ -136,6 +142,15 @@ func (h *Handlers) HandleSourceCreateSubmit() http.HandlerFunc { return } + // Create per-webhook event database + if err := h.dbMgr.CreateDB(webhook.ID); err != nil { + h.log.Error("failed to create webhook event database", + "webhook_id", webhook.ID, + "error", err, + ) + // Non-fatal: the DB will be created lazily on first event + } + h.log.Info("webhook created", "webhook_id", webhook.ID, "name", name, @@ -169,9 +184,13 @@ func (h *Handlers) HandleSourceDetail() http.HandlerFunc { var targets []database.Target h.db.DB().Where("webhook_id = ?", webhook.ID).Find(&targets) - // Recent events with delivery info + // Recent events from per-webhook database var events []database.Event - h.db.DB().Where("webhook_id = ?", webhook.ID).Order("created_at DESC").Limit(20).Find(&events) + if h.dbMgr.DBExists(webhook.ID) { + if webhookDB, err := h.dbMgr.GetDB(webhook.ID); err == nil { + webhookDB.Where("webhook_id = ?", webhook.ID).Order("created_at DESC").Limit(20).Find(&events) + } + } // Build host URL for display host := r.Host @@ -271,7 +290,9 @@ func (h *Handlers) HandleSourceEditSubmit() http.HandlerFunc { } } -// HandleSourceDelete handles webhook deletion (soft delete). +// HandleSourceDelete handles webhook deletion. +// Configuration data is soft-deleted in the main DB. +// The per-webhook event database file is hard-deleted (permanently removed). func (h *Handlers) HandleSourceDelete() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { userID, ok := h.getUserID(r) @@ -288,6 +309,7 @@ func (h *Handlers) HandleSourceDelete() http.HandlerFunc { return } + // Soft-delete configuration in the main application database tx := h.db.DB().Begin() if tx.Error != nil { h.log.Error("failed to begin transaction", "error", tx.Error) @@ -295,28 +317,7 @@ func (h *Handlers) HandleSourceDelete() http.HandlerFunc { return } - // Soft-delete child records in dependency order (deepest first). - - // Collect event IDs for this webhook - var eventIDs []string - tx.Model(&database.Event{}).Where("webhook_id = ?", webhook.ID).Pluck("id", &eventIDs) - - if len(eventIDs) > 0 { - // Collect delivery IDs for these events - var deliveryIDs []string - tx.Model(&database.Delivery{}).Where("event_id IN ?", eventIDs).Pluck("id", &deliveryIDs) - - if len(deliveryIDs) > 0 { - // Soft-delete delivery results - tx.Where("delivery_id IN ?", deliveryIDs).Delete(&database.DeliveryResult{}) - } - - // Soft-delete deliveries - tx.Where("event_id IN ?", eventIDs).Delete(&database.Delivery{}) - } - - // Soft-delete events, entrypoints, targets, and the webhook itself - tx.Where("webhook_id = ?", webhook.ID).Delete(&database.Event{}) + // Soft-delete entrypoints and targets (config tier) tx.Where("webhook_id = ?", webhook.ID).Delete(&database.Entrypoint{}) tx.Where("webhook_id = ?", webhook.ID).Delete(&database.Target{}) tx.Delete(&webhook) @@ -327,12 +328,23 @@ func (h *Handlers) HandleSourceDelete() http.HandlerFunc { return } + // Hard-delete the per-webhook event database file + if err := h.dbMgr.DeleteDB(webhook.ID); err != nil { + h.log.Error("failed to delete webhook event database", + "webhook_id", webhook.ID, + "error", err, + ) + // Non-fatal: file may not exist if no events were ever received + } + h.log.Info("webhook deleted", "webhook_id", webhook.ID, "user_id", userID) http.Redirect(w, r, "/sources", http.StatusSeeOther) } } // HandleSourceLogs shows the request/response logs for a webhook. +// Events and deliveries are read from the per-webhook database. +// Target information is loaded from the main application database. func (h *Handlers) HandleSourceLogs() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { userID, ok := h.getUserID(r) @@ -349,6 +361,14 @@ func (h *Handlers) HandleSourceLogs() http.HandlerFunc { return } + // Load targets from main DB for display + var targets []database.Target + h.db.DB().Where("webhook_id = ?", webhook.ID).Find(&targets) + targetMap := make(map[string]database.Target, len(targets)) + for _, t := range targets { + targetMap[t.ID] = t + } + // Pagination page := 1 if p := r.URL.Query().Get("page"); p != "" { @@ -359,25 +379,48 @@ func (h *Handlers) HandleSourceLogs() http.HandlerFunc { perPage := 25 offset := (page - 1) * perPage - var totalEvents int64 - h.db.DB().Model(&database.Event{}).Where("webhook_id = ?", webhook.ID).Count(&totalEvents) - - var events []database.Event - h.db.DB().Where("webhook_id = ?", webhook.ID). - Order("created_at DESC"). - Offset(offset). - Limit(perPage). - Find(&events) - - // Load deliveries for each event + // EventWithDeliveries holds an event with its associated deliveries type EventWithDeliveries struct { database.Event Deliveries []database.Delivery } - eventsWithDeliveries := make([]EventWithDeliveries, len(events)) - for i := range events { - eventsWithDeliveries[i].Event = events[i] - h.db.DB().Where("event_id = ?", events[i].ID).Preload("Target").Find(&eventsWithDeliveries[i].Deliveries) + + var totalEvents int64 + var eventsWithDeliveries []EventWithDeliveries + + // Read events and deliveries from per-webhook database + if h.dbMgr.DBExists(webhook.ID) { + webhookDB, err := h.dbMgr.GetDB(webhook.ID) + if err != nil { + h.log.Error("failed to get webhook database", + "webhook_id", webhook.ID, + "error", err, + ) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + webhookDB.Model(&database.Event{}).Where("webhook_id = ?", webhook.ID).Count(&totalEvents) + + var events []database.Event + webhookDB.Where("webhook_id = ?", webhook.ID). + Order("created_at DESC"). + Offset(offset). + Limit(perPage). + Find(&events) + + eventsWithDeliveries = make([]EventWithDeliveries, len(events)) + for i := range events { + eventsWithDeliveries[i].Event = events[i] + // Load deliveries from per-webhook DB (without Target preload) + webhookDB.Where("event_id = ?", events[i].ID).Find(&eventsWithDeliveries[i].Deliveries) + // Manually assign targets from main DB + for j := range eventsWithDeliveries[i].Deliveries { + if target, ok := targetMap[eventsWithDeliveries[i].Deliveries[j].TargetID]; ok { + eventsWithDeliveries[i].Deliveries[j].Target = target + } + } + } } totalPages := int(totalEvents) / perPage diff --git a/internal/handlers/webhook.go b/internal/handlers/webhook.go index fbc1e26..8b3e44d 100644 --- a/internal/handlers/webhook.go +++ b/internal/handlers/webhook.go @@ -16,6 +16,7 @@ const ( // HandleWebhook handles incoming webhook requests at entrypoint URLs. // Only POST requests are accepted; all other methods return 405 Method Not Allowed. +// Events and deliveries are stored in the per-webhook database. func (h *Handlers) HandleWebhook() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { @@ -36,7 +37,7 @@ func (h *Handlers) HandleWebhook() http.HandlerFunc { "remote_addr", r.RemoteAddr, ) - // Look up entrypoint by path + // Look up entrypoint by path (from main application DB) var entrypoint database.Entrypoint result := h.db.DB().Where("path = ?", entrypointUUID).First(&entrypoint) if result.Error != nil { @@ -71,8 +72,27 @@ func (h *Handlers) HandleWebhook() http.HandlerFunc { return } - // Create the event in a transaction - tx := h.db.DB().Begin() + // Find all active targets for this webhook (from main application DB) + var targets []database.Target + if targetErr := h.db.DB().Where("webhook_id = ? AND active = ?", entrypoint.WebhookID, true).Find(&targets).Error; targetErr != nil { + h.log.Error("failed to query targets", "error", targetErr) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + // Get the per-webhook database for event storage + webhookDB, err := h.dbMgr.GetDB(entrypoint.WebhookID) + if err != nil { + h.log.Error("failed to get webhook database", + "webhook_id", entrypoint.WebhookID, + "error", err, + ) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + // Create the event and deliveries in a transaction on the per-webhook DB + tx := webhookDB.Begin() if tx.Error != nil { h.log.Error("failed to begin transaction", "error", tx.Error) http.Error(w, "Internal server error", http.StatusInternalServerError) @@ -95,15 +115,6 @@ func (h *Handlers) HandleWebhook() http.HandlerFunc { return } - // Find all active targets for this webhook - var targets []database.Target - if err := tx.Where("webhook_id = ? AND active = ?", entrypoint.WebhookID, true).Find(&targets).Error; err != nil { - tx.Rollback() - h.log.Error("failed to query targets", "error", err) - http.Error(w, "Internal server error", http.StatusInternalServerError) - return - } - // Create delivery records for each active target for i := range targets { delivery := &database.Delivery{