diff --git a/README.md b/README.md index 235a19f..2e5adbb 100644 --- a/README.md +++ b/README.md @@ -463,11 +463,12 @@ External Service 1. Look up Entrypoint by UUID 2. Capture full request as Event 3. Queue Delivery to each active Target + 4. Notify Engine via channel │ ▼ ┌──────────────┐ - │ Delivery │ - │ Engine │ + │ Delivery │◄── retry timers + │ Engine │ (backoff) └──────┬───────┘ │ ┌────────────────────┼────────────────────┐ @@ -577,7 +578,7 @@ webhooker/ │ ├── globals/ │ │ └── globals.go # Build-time variables (appname, version, arch) │ ├── delivery/ -│ │ └── engine.go # Background delivery engine (fx lifecycle) +│ │ └── engine.go # Event-driven delivery engine (channel + timer based) │ ├── handlers/ │ │ ├── handlers.go # Base handler struct, JSON helpers, template rendering │ │ ├── auth.go # Login, logout handlers @@ -627,11 +628,14 @@ Components are wired via Uber fx in this order: 7. `session.New` — Cookie-based session manager 8. `handlers.New` — HTTP handlers 9. `middleware.New` — HTTP middleware -10. `delivery.New` — Background delivery engine -11. `server.New` — HTTP server and router +10. `delivery.New` — Event-driven delivery engine +11. `delivery.Engine` → `handlers.DeliveryNotifier` — interface bridge +12. `server.New` — HTTP server and router The server starts via `fx.Invoke(func(*server.Server, *delivery.Engine) -{})` which triggers the fx lifecycle hooks in dependency order. +{})` which triggers the fx lifecycle hooks in dependency order. The +`DeliveryNotifier` interface allows the webhook handler to notify the +delivery engine of new work without a direct package dependency. ### Middleware Stack @@ -720,7 +724,7 @@ linted, tested, and compiled. - [x] Per-webhook database lifecycle management (create on webhook creation, delete on webhook removal) - [x] `WebhookDBManager` component with lazy connection pooling -- [x] Delivery engine polls all per-webhook DBs for pending deliveries +- [x] Event-driven delivery engine (channel notifications + timer-based retries) - [x] Database target type marks delivery as immediately successful (events are already in the per-webhook DB) diff --git a/cmd/webhooker/main.go b/cmd/webhooker/main.go index e09436a..9273dd8 100644 --- a/cmd/webhooker/main.go +++ b/cmd/webhooker/main.go @@ -39,6 +39,9 @@ func main() { handlers.New, middleware.New, delivery.New, + // Wire *delivery.Engine as delivery.Notifier so the + // webhook handler can notify the engine of new deliveries. + func(e *delivery.Engine) delivery.Notifier { return e }, server.New, ), fx.Invoke(func(*server.Server, *delivery.Engine) {}), diff --git a/internal/delivery/engine.go b/internal/delivery/engine.go index af27d3d..dffc053 100644 --- a/internal/delivery/engine.go +++ b/internal/delivery/engine.go @@ -18,8 +18,19 @@ import ( ) const ( - // pollInterval is how often the engine checks for pending deliveries. - pollInterval = 2 * time.Second + // notifyChannelSize is the buffer size for the delivery notification channel. + // Sized large enough that the webhook handler should never block. + notifyChannelSize = 1000 + + // retryChannelSize is the buffer size for the retry channel. Timer-fired + // retries are sent here for processing by the engine goroutine. + retryChannelSize = 1000 + + // MaxInlineBodySize is the maximum event body size that will be carried + // inline in a Notification through the channel. Bodies at or above this + // size are left nil and fetched from the per-webhook database on demand. + // This keeps channel buffer memory bounded under high traffic. + MaxInlineBodySize = 16 * 1024 // httpClientTimeout is the timeout for outbound HTTP requests. httpClientTimeout = 30 * time.Second @@ -28,6 +39,33 @@ const ( maxBodyLog = 4096 ) +// Notification carries event data through the delivery notification channel. +// The Body field is a pointer: non-nil for payloads under MaxInlineBodySize +// (16 KB), nil for larger payloads. When nil, the engine fetches the body +// from the per-webhook database using EventID. This keeps channel buffer +// memory bounded regardless of payload sizes during high traffic. +type Notification struct { + WebhookID string + EventID string + Method string + Headers string + ContentType string + Body *string // nil if body >= MaxInlineBodySize; fetch from DB by EventID +} + +// Notifier is the interface for notifying the delivery engine about new +// deliveries. Implemented by Engine and injected into handlers. +type Notifier interface { + Notify(n Notification) +} + +// retryRequest carries the information needed to retry a specific delivery. +// Sent from timer goroutines to the engine's retry channel. +type retryRequest struct { + webhookID string + deliveryID string +} + // HTTPTargetConfig holds configuration for http and retry target types. type HTTPTargetConfig struct { URL string `json:"url"` @@ -45,9 +83,14 @@ type EngineParams struct { Logger *logger.Logger } -// Engine processes queued deliveries in the background. -// It iterates over all active webhooks and polls each webhook's -// per-webhook database for pending deliveries. +// Engine processes queued deliveries in the background using an +// event-driven architecture. New deliveries are signaled via a buffered +// channel from the webhook handler and processed immediately. Failed +// deliveries that need retry are scheduled via Go timers with exponential +// backoff — each timer fires into a separate retry channel when the +// backoff period expires. The database stores delivery status for crash +// recovery only; on startup the engine scans for interrupted deliveries +// and re-queues them. type Engine struct { database *database.Database dbManager *database.WebhookDBManager @@ -55,6 +98,8 @@ type Engine struct { client *http.Client cancel context.CancelFunc wg sync.WaitGroup + notifyCh chan Notification + retryCh chan retryRequest } // New creates and registers the delivery engine with the fx lifecycle. @@ -66,6 +111,8 @@ func New(lc fx.Lifecycle, params EngineParams) *Engine { client: &http.Client{ Timeout: httpClientTimeout, }, + notifyCh: make(chan Notification, notifyChannelSize), + retryCh: make(chan retryRequest, retryChannelSize), } lc.Append(fx.Hook{ @@ -97,29 +144,52 @@ func (e *Engine) stop() { e.log.Info("delivery engine stopped") } +// Notify signals the delivery engine that new deliveries are available. +// This is called by the webhook handler after creating delivery records. +// The notification carries the event data inline (with body pointer +// semantics for memory efficiency). The call is non-blocking; if the +// channel is full, a warning is logged and the deliveries will be +// recovered on the next engine restart. +func (e *Engine) Notify(n Notification) { + select { + case e.notifyCh <- n: + default: + e.log.Warn("delivery notification channel full, deliveries will be recovered on restart", + "webhook_id", n.WebhookID, + "event_id", n.EventID, + ) + } +} + func (e *Engine) run(ctx context.Context) { defer e.wg.Done() - ticker := time.NewTicker(pollInterval) - defer ticker.Stop() + // On startup, recover any pending or retrying deliveries that were + // interrupted by an unexpected shutdown. Pending deliveries are + // processed immediately; retrying deliveries get timers scheduled + // for their remaining backoff. + e.recoverInFlight(ctx) for { select { case <-ctx.Done(): return - case <-ticker.C: - e.processPending(ctx) + case n := <-e.notifyCh: + e.processNotification(ctx, n) + case req := <-e.retryCh: + e.processRetryDelivery(ctx, req) } } } -// processPending iterates over all active webhooks and processes pending -// deliveries from each webhook's per-webhook database. -func (e *Engine) processPending(ctx context.Context) { - // Get all active webhook IDs from the main application database +// recoverInFlight scans all webhooks on startup for deliveries that were +// interrupted by an unexpected shutdown. Pending deliveries are processed +// immediately; retrying deliveries get timers scheduled for their +// remaining backoff period. +func (e *Engine) recoverInFlight(ctx context.Context) { var webhookIDs []string if err := e.database.DB().Model(&database.Webhook{}).Pluck("id", &webhookIDs).Error; err != nil { - e.log.Error("failed to query webhook IDs", "error", err) + e.log.Error("failed to query webhook IDs for recovery", "error", err) return } @@ -128,18 +198,200 @@ func (e *Engine) processPending(ctx context.Context) { case <-ctx.Done(): return default: - // Only process webhooks that have an event database file - if !e.dbManager.DBExists(webhookID) { + } + + if !e.dbManager.DBExists(webhookID) { + continue + } + + e.recoverWebhookDeliveries(ctx, webhookID) + } +} + +// recoverWebhookDeliveries recovers pending and retrying deliveries for +// a single webhook. Pending deliveries are processed directly (loading +// event data from DB); retrying deliveries get timers scheduled based on +// the elapsed time since the last attempt. +func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) { + webhookDB, err := e.dbManager.GetDB(webhookID) + if err != nil { + e.log.Error("failed to get webhook database for recovery", + "webhook_id", webhookID, + "error", err, + ) + return + } + + // Check for pending deliveries and process them immediately + var pendingCount int64 + webhookDB.Model(&database.Delivery{}). + Where("status = ?", database.DeliveryStatusPending). + Count(&pendingCount) + + if pendingCount > 0 { + e.log.Info("recovering pending deliveries", + "webhook_id", webhookID, + "count", pendingCount, + ) + e.processWebhookPendingDeliveries(ctx, webhookID) + } + + // Schedule timers for retrying deliveries based on remaining backoff + var retrying []database.Delivery + if err := webhookDB.Where("status = ?", database.DeliveryStatusRetrying). + Find(&retrying).Error; err != nil { + e.log.Error("failed to query retrying deliveries for recovery", + "webhook_id", webhookID, + "error", err, + ) + return + } + + for i := range retrying { + d := &retrying[i] + + var resultCount int64 + webhookDB.Model(&database.DeliveryResult{}). + Where("delivery_id = ?", d.ID). + Count(&resultCount) + attemptNum := int(resultCount) + + // Calculate remaining backoff from last attempt + remaining := time.Duration(0) + + var lastResult database.DeliveryResult + if err := webhookDB.Where("delivery_id = ?", d.ID). + Order("created_at DESC"). + First(&lastResult).Error; err == nil { + shift := attemptNum - 1 + if shift < 0 { + shift = 0 + } + if shift > 30 { + shift = 30 + } + backoff := time.Duration(1< 1 { - var lastResult database.DeliveryResult - lookupErr := webhookDB.Where("delivery_id = ?", d.ID).Order("created_at DESC").First(&lastResult).Error - if lookupErr == nil { - shift := attemptNum - 2 - if shift > 30 { - shift = 30 - } - backoff := time.Duration(1<= 200 && statusCode < 300 @@ -319,6 +623,16 @@ func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } else { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying) + + // Schedule a timer for the next retry with exponential backoff. + // The timer will fire and send a retryRequest to the engine's + // retry channel, which triggers processRetryDelivery. + shift := attemptNum - 1 + if shift > 30 { + shift = 30 + } + backoff := time.Duration(1<= 16KB) are left nil to keep channel memory + // bounded; the engine fetches them from DB on demand. + n := delivery.Notification{ + WebhookID: entrypoint.WebhookID, + EventID: event.ID, + Method: event.Method, + Headers: event.Headers, + ContentType: event.ContentType, + } + bodyStr := string(body) + if len(body) < delivery.MaxInlineBodySize { + n.Body = &bodyStr + } + h.notifier.Notify(n) + h.log.Info("webhook event created", "event_id", event.ID, "webhook_id", entrypoint.WebhookID,