diff --git a/README.md b/README.md index 3237579..b97ad7e 100644 --- a/README.md +++ b/README.md @@ -496,9 +496,11 @@ External Service ┌──────────────┐ │ Delivery │◄── retry timers │ Engine │ (backoff) + │ (worker │ + │ pool) │ └──────┬───────┘ │ - ┌─── parallel goroutines (fan-out) ───┐ + ┌── bounded worker pool (N workers) ──┐ ▼ ▼ ▼ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ HTTP Target│ │Retry Target│ │ Log Target │ @@ -508,28 +510,56 @@ External Service └────────────┘ ``` -### Parallel Fan-Out Delivery +### Bounded Worker Pool -When the delivery engine receives a batch of tasks for an event, it -fans out **all targets in parallel** — each `DeliveryTask` is dispatched -in its own goroutine immediately. An HTTP target, a retry target, and -a log target for the same event all start delivering simultaneously -with no sequential bottleneck. +The delivery engine uses a **fixed-size worker pool** (default: 10 +workers) to process all deliveries. At most N deliveries are in-flight +at any time, preventing goroutine explosions regardless of queue depth. + +**Architecture:** + +- **Channels as queues:** Two buffered channels serve as bounded queues: + a delivery channel (new tasks from the webhook handler) and a retry + channel (tasks from backoff timers). Both are buffered to 10,000. +- **Fan-out via channel, not goroutines:** When an event arrives with + multiple targets, each `DeliveryTask` is sent to the delivery channel. + Workers pick them up and process them — no goroutine-per-target. +- **Worker goroutines:** A fixed number of worker goroutines select from + both channels. Each worker processes one task at a time, then picks up + the next. Workers are the ONLY goroutines doing actual HTTP delivery. +- **Retry backpressure with DB fallback:** When a retry timer fires and + the retry channel is full, the timer is dropped — the delivery stays + in `retrying` status in the database. A periodic sweep (every 60s) + scans for these "orphaned" retries and re-queues them. No blocked + goroutines, no unbounded timer chains. +- **Bounded concurrency:** At most N deliveries (N = number of workers) + are in-flight simultaneously. Even if a circuit breaker is open for + hours and thousands of retries queue up in the channels, the workers + drain them at a controlled rate when the circuit closes. This means: -- **No head-of-line blocking** — a slow HTTP target doesn't delay the - log target or other targets. -- **Maximum throughput** — all targets receive the event as quickly as - possible. -- **Independent results** — each goroutine records its own delivery - result in the per-webhook database without coordination. -- **Fire-and-forget** — the engine doesn't wait for all goroutines to - finish; each delivery is completely independent. +- **No goroutine explosion** — even with 10,000 queued retries, only + N worker goroutines exist. +- **Natural backpressure** — if workers are busy, new tasks wait in the + channel buffer rather than spawning more goroutines. +- **Independent results** — each worker records its own delivery result + in the per-webhook database without coordination. +- **Graceful shutdown** — cancel the context, workers finish their + current task and exit. `WaitGroup.Wait()` ensures clean shutdown. -The same parallel fan-out applies to crash recovery: when the engine -restarts and finds pending deliveries in per-webhook databases, it -recovers them and fans them out in parallel just like fresh deliveries. +**Recovery paths:** + +1. **Startup recovery:** When the engine starts, it scans all per-webhook + databases for `pending` and `retrying` deliveries. Pending deliveries + are sent to the delivery channel; retrying deliveries get backoff + timers scheduled. +2. **Periodic retry sweep (DB-mediated fallback):** Every 60 seconds the + engine scans for `retrying` deliveries whose backoff period has + elapsed. This catches "orphaned" retries — ones whose in-memory timer + was dropped because the retry channel was full. The database is the + durable fallback that ensures no retry is permanently lost, even under + extreme backpressure. ### Circuit Breaker (Retry Targets) diff --git a/internal/delivery/engine.go b/internal/delivery/engine.go index 235283b..029752a 100644 --- a/internal/delivery/engine.go +++ b/internal/delivery/engine.go @@ -18,13 +18,26 @@ import ( ) const ( - // notifyChannelSize is the buffer size for the delivery notification channel. - // Sized large enough that the webhook handler should never block. - notifyChannelSize = 1000 + // deliveryChannelSize is the buffer size for the delivery channel. + // New DeliveryTasks from the webhook handler are sent here. Workers + // drain this channel. Sized large enough that the webhook handler + // should never block under normal load. + deliveryChannelSize = 10000 - // retryChannelSize is the buffer size for the retry channel. Timer-fired - // retries are sent here for processing by the engine goroutine. - retryChannelSize = 1000 + // retryChannelSize is the buffer size for the retry channel. + // Timer-fired retries are sent here for processing by workers. + retryChannelSize = 10000 + + // defaultWorkers is the number of worker goroutines in the delivery + // engine pool. At most this many deliveries are in-flight at any + // time, preventing goroutine explosions regardless of queue depth. + defaultWorkers = 10 + + // retrySweepInterval is how often the periodic retry sweep runs. + // The sweep scans all per-webhook databases for "orphaned" retrying + // deliveries — ones whose in-memory timer was dropped because the + // retry channel was full. This is the DB-mediated fallback path. + retrySweepInterval = 60 * time.Second // MaxInlineBodySize is the maximum event body size that will be carried // inline in a DeliveryTask through the channel. Bodies at or above this @@ -94,30 +107,30 @@ type EngineParams struct { Logger *logger.Logger } -// Engine processes queued deliveries in the background using an -// event-driven architecture. New deliveries arrive as self-contained -// DeliveryTask slices via a buffered channel from the webhook handler. -// In the happy path (body ≤ 16KB), the engine delivers without reading -// from any database — it only writes to record results. Failed deliveries -// that need retry are scheduled via Go timers with exponential backoff; -// each timer fires into a separate retry channel carrying the full -// DeliveryTask so retries also avoid unnecessary DB reads. The database -// stores delivery status for crash recovery only; on startup the engine -// scans for interrupted deliveries and re-queues them. +// Engine processes queued deliveries in the background using a bounded +// worker pool architecture. New deliveries arrive as individual +// DeliveryTask values via a buffered delivery channel from the webhook +// handler. Failed deliveries that need retry are scheduled via Go timers +// with exponential backoff; each timer fires into a separate retry +// channel. A fixed number of worker goroutines drain both channels, +// ensuring at most N deliveries are in-flight at any time (N = number +// of workers). This prevents goroutine explosions when a circuit breaker +// is open for a long period and many retries queue up. // -// All targets for a single event are delivered in parallel — each -// DeliveryTask is dispatched in its own goroutine for maximum fan-out -// speed. Retry targets are protected by a per-target circuit breaker -// that stops hammering a down target after consecutive failures. +// In the happy path (body ≤ 16KB), a worker delivers without reading +// from any database — it only writes to record results. The database +// stores delivery status for crash recovery only; on startup the engine +// scans for interrupted deliveries and re-queues them into the channels. type Engine struct { - database *database.Database - dbManager *database.WebhookDBManager - log *slog.Logger - client *http.Client - cancel context.CancelFunc - wg sync.WaitGroup - notifyCh chan []DeliveryTask - retryCh chan DeliveryTask + database *database.Database + dbManager *database.WebhookDBManager + log *slog.Logger + client *http.Client + cancel context.CancelFunc + wg sync.WaitGroup + deliveryCh chan DeliveryTask + retryCh chan DeliveryTask + workers int // circuitBreakers stores a *CircuitBreaker per target ID. Only used // for retry targets — HTTP, database, and log targets do not need @@ -134,8 +147,9 @@ func New(lc fx.Lifecycle, params EngineParams) *Engine { client: &http.Client{ Timeout: httpClientTimeout, }, - notifyCh: make(chan []DeliveryTask, notifyChannelSize), - retryCh: make(chan DeliveryTask, retryChannelSize), + deliveryCh: make(chan DeliveryTask, deliveryChannelSize), + retryCh: make(chan DeliveryTask, retryChannelSize), + workers: defaultWorkers, } lc.Append(fx.Hook{ @@ -155,9 +169,25 @@ func New(lc fx.Lifecycle, params EngineParams) *Engine { func (e *Engine) start() { ctx, cancel := context.WithCancel(context.Background()) e.cancel = cancel + + // Start the worker pool. These are the ONLY goroutines that + // perform HTTP delivery. Bounded concurrency is guaranteed. + for i := 0; i < e.workers; i++ { + e.wg.Add(1) + go e.worker(ctx) + } + + // Start recovery scan in a separate goroutine. Recovered tasks + // are sent into the delivery/retry channels and picked up by workers. e.wg.Add(1) - go e.run(ctx) - e.log.Info("delivery engine started") + go e.recoverPending(ctx) + + // Start the periodic retry sweep. This is the DB-mediated fallback + // for retries whose timers were dropped due to channel overflow. + e.wg.Add(1) + go e.retrySweep(ctx) + + e.log.Info("delivery engine started", "workers", e.workers) } func (e *Engine) stop() { @@ -170,43 +200,191 @@ func (e *Engine) stop() { // Notify signals the delivery engine that new deliveries are ready. // Called by the webhook handler after creating delivery records. Each // DeliveryTask carries all data needed for delivery in the ≤16KB case. -// The call is non-blocking; if the channel is full, a warning is logged -// and the deliveries will be recovered on the next engine restart. +// Tasks are sent individually to the delivery channel. The call is +// non-blocking; if the channel is full, a warning is logged and the +// delivery will be recovered on the next engine restart. func (e *Engine) Notify(tasks []DeliveryTask) { - select { - case e.notifyCh <- tasks: - default: - e.log.Warn("delivery notification channel full, deliveries will be recovered on restart", - "task_count", len(tasks), - ) - } -} - -func (e *Engine) run(ctx context.Context) { - defer e.wg.Done() - - // On startup, recover any pending or retrying deliveries that were - // interrupted by an unexpected shutdown. Pending deliveries are - // processed immediately; retrying deliveries get timers scheduled - // for their remaining backoff. - e.recoverInFlight(ctx) - - for { + for i := range tasks { select { - case <-ctx.Done(): - return - case tasks := <-e.notifyCh: - e.processDeliveryTasks(ctx, tasks) - case task := <-e.retryCh: - e.processRetryTask(ctx, task) + case e.deliveryCh <- tasks[i]: + default: + e.log.Warn("delivery channel full, task will be recovered on restart", + "delivery_id", tasks[i].DeliveryID, + "event_id", tasks[i].EventID, + ) } } } +// worker is the main loop for a worker goroutine. It selects from both +// the delivery channel (new tasks from the handler) and the retry channel +// (tasks from backoff timers). At most e.workers deliveries are in-flight +// at any time. +func (e *Engine) worker(ctx context.Context) { + defer e.wg.Done() + for { + select { + case <-ctx.Done(): + return + case task := <-e.deliveryCh: + e.processNewTask(ctx, &task) + case task := <-e.retryCh: + e.processRetryTask(ctx, &task) + } + } +} + +// recoverPending runs on startup to recover any pending or retrying +// deliveries that were interrupted by an unexpected shutdown. Recovered +// tasks are sent into the delivery/retry channels for workers to pick up. +func (e *Engine) recoverPending(ctx context.Context) { + defer e.wg.Done() + e.recoverInFlight(ctx) +} + +// processNewTask handles a single new delivery task from the delivery +// channel. It builds the event and target context from the task's inline +// data and executes the delivery. For large bodies (≥ MaxInlineBodySize), +// the body is fetched from the per-webhook database on demand. +func (e *Engine) processNewTask(ctx context.Context, task *DeliveryTask) { + webhookDB, err := e.dbManager.GetDB(task.WebhookID) + if err != nil { + e.log.Error("failed to get webhook database", + "webhook_id", task.WebhookID, + "error", err, + ) + return + } + + // Build Event from task data + event := database.Event{ + Method: task.Method, + Headers: task.Headers, + ContentType: task.ContentType, + } + event.ID = task.EventID + event.WebhookID = task.WebhookID + + if task.Body != nil { + event.Body = *task.Body + } else { + // Large body: fetch from per-webhook DB + var dbEvent database.Event + if err := webhookDB.Select("body"). + First(&dbEvent, "id = ?", task.EventID).Error; err != nil { + e.log.Error("failed to fetch event body from database", + "event_id", task.EventID, + "error", err, + ) + return + } + event.Body = dbEvent.Body + } + + // Build Target from task data (no main DB query needed) + target := database.Target{ + Name: task.TargetName, + Type: task.TargetType, + Config: task.TargetConfig, + MaxRetries: task.MaxRetries, + } + target.ID = task.TargetID + + // Build Delivery struct for the processing chain + d := &database.Delivery{ + EventID: task.EventID, + TargetID: task.TargetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: target, + } + d.ID = task.DeliveryID + + e.processDelivery(ctx, webhookDB, d, task) +} + +// processRetryTask handles a single delivery task fired by a retry timer. +// The task carries all data needed for delivery (same as the initial +// notification). The only DB read is a status check to verify the delivery +// hasn't been cancelled or resolved while the timer was pending. +func (e *Engine) processRetryTask(ctx context.Context, task *DeliveryTask) { + webhookDB, err := e.dbManager.GetDB(task.WebhookID) + if err != nil { + e.log.Error("failed to get webhook database for retry", + "webhook_id", task.WebhookID, + "delivery_id", task.DeliveryID, + "error", err, + ) + return + } + + // Verify delivery is still in retrying status (may have been + // cancelled or manually resolved while the timer was pending) + var d database.Delivery + if err := webhookDB.Select("id", "status"). + First(&d, "id = ?", task.DeliveryID).Error; err != nil { + e.log.Error("failed to load delivery for retry", + "delivery_id", task.DeliveryID, + "error", err, + ) + return + } + + if d.Status != database.DeliveryStatusRetrying { + e.log.Debug("skipping retry for delivery no longer in retrying status", + "delivery_id", d.ID, + "status", d.Status, + ) + return + } + + // Build Event from task data + event := database.Event{ + Method: task.Method, + Headers: task.Headers, + ContentType: task.ContentType, + } + event.ID = task.EventID + event.WebhookID = task.WebhookID + + if task.Body != nil { + event.Body = *task.Body + } else { + // Large body: fetch from per-webhook DB + var dbEvent database.Event + if err := webhookDB.Select("body"). + First(&dbEvent, "id = ?", task.EventID).Error; err != nil { + e.log.Error("failed to fetch event body for retry", + "event_id", task.EventID, + "error", err, + ) + return + } + event.Body = dbEvent.Body + } + + // Build Target from task data + target := database.Target{ + Name: task.TargetName, + Type: task.TargetType, + Config: task.TargetConfig, + MaxRetries: task.MaxRetries, + } + target.ID = task.TargetID + + // Populate the delivery with event and target for processing + d.EventID = task.EventID + d.TargetID = task.TargetID + d.Event = event + d.Target = target + + e.processDelivery(ctx, webhookDB, &d, task) +} + // recoverInFlight scans all webhooks on startup for deliveries that were -// interrupted by an unexpected shutdown. Pending deliveries are processed -// immediately; retrying deliveries get timers scheduled for their -// remaining backoff period. +// interrupted by an unexpected shutdown. Pending deliveries are sent to +// the delivery channel; retrying deliveries get timers scheduled for +// their remaining backoff period. func (e *Engine) recoverInFlight(ctx context.Context) { var webhookIDs []string if err := e.database.DB().Model(&database.Webhook{}).Pluck("id", &webhookIDs).Error; err != nil { @@ -230,9 +408,8 @@ func (e *Engine) recoverInFlight(ctx context.Context) { } // recoverWebhookDeliveries recovers pending and retrying deliveries for -// a single webhook. This is the recovery path — it reads everything from -// the database since there are no in-memory notifications available after -// a restart. +// a single webhook. Pending deliveries are sent to the delivery channel; +// retrying deliveries get timers scheduled for their remaining backoff. func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) { webhookDB, err := e.dbManager.GetDB(webhookID) if err != nil { @@ -243,19 +420,8 @@ func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) return } - // Check for pending deliveries and process them immediately - var pendingCount int64 - webhookDB.Model(&database.Delivery{}). - Where("status = ?", database.DeliveryStatusPending). - Count(&pendingCount) - - if pendingCount > 0 { - e.log.Info("recovering pending deliveries", - "webhook_id", webhookID, - "count", pendingCount, - ) - e.processWebhookPendingDeliveries(ctx, webhookID) - } + // Recover pending deliveries by sending them to the delivery channel + e.recoverPendingDeliveries(ctx, webhookDB, webhookID) // Schedule timers for retrying deliveries based on remaining backoff var retrying []database.Delivery @@ -356,201 +522,11 @@ func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) } } -// processDeliveryTasks handles a batch of delivery tasks from the webhook -// handler. Each task is dispatched in its own goroutine for parallel -// fan-out — all targets for a single event start delivering simultaneously. -// In the happy path (body ≤ 16KB), the engine delivers without reading -// from any database — it trusts the handler's inline data and only touches -// the DB to record results. For large bodies (body > 16KB), the body is -// fetched once and shared across all goroutines in the batch. -func (e *Engine) processDeliveryTasks(ctx context.Context, tasks []DeliveryTask) { - if len(tasks) == 0 { - return - } - - // All tasks in a batch share the same webhook ID - webhookID := tasks[0].WebhookID - webhookDB, err := e.dbManager.GetDB(webhookID) - if err != nil { - e.log.Error("failed to get webhook database", - "webhook_id", webhookID, - "error", err, - ) - return - } - - // For the large-body case, pre-fetch the event body once before - // fanning out so all goroutines share the same data. - var fetchedBody *string - if tasks[0].Body == nil { - var dbEvent database.Event - if err := webhookDB.Select("body"). - First(&dbEvent, "id = ?", tasks[0].EventID).Error; err != nil { - e.log.Error("failed to fetch event body from database", - "event_id", tasks[0].EventID, - "error", err, - ) - return - } - fetchedBody = &dbEvent.Body - } - - // Fan out: spin up a goroutine per task for parallel delivery. - // Each goroutine is independent (fire-and-forget) and records its - // own result. No need to wait for all goroutines to finish. - for i := range tasks { - select { - case <-ctx.Done(): - return - default: - } - - task := tasks[i] // copy for goroutine closure safety - - go func() { - e.deliverTask(ctx, webhookDB, &task, fetchedBody) - }() - } -} - -// deliverTask prepares and executes a single delivery task. Called from -// a dedicated goroutine for parallel fan-out. -func (e *Engine) deliverTask(ctx context.Context, webhookDB *gorm.DB, task *DeliveryTask, fetchedBody *string) { - // Build Event from task data - event := database.Event{ - Method: task.Method, - Headers: task.Headers, - ContentType: task.ContentType, - } - event.ID = task.EventID - event.WebhookID = task.WebhookID - - switch { - case task.Body != nil: - event.Body = *task.Body - case fetchedBody != nil: - event.Body = *fetchedBody - default: - e.log.Error("no body available for delivery task", - "delivery_id", task.DeliveryID, - "event_id", task.EventID, - ) - return - } - - // Build Target from task data (no main DB query needed) - target := database.Target{ - Name: task.TargetName, - Type: task.TargetType, - Config: task.TargetConfig, - MaxRetries: task.MaxRetries, - } - target.ID = task.TargetID - - // Build Delivery struct for the processing chain - d := &database.Delivery{ - EventID: task.EventID, - TargetID: task.TargetID, - Status: database.DeliveryStatusPending, - Event: event, - Target: target, - } - d.ID = task.DeliveryID - - e.processDelivery(ctx, webhookDB, d, task) -} - -// processRetryTask handles a single delivery task fired by a retry timer. -// The task carries all data needed for delivery (same as the initial -// notification). The only DB read is a status check to verify the delivery -// hasn't been cancelled or resolved while the timer was pending. -func (e *Engine) processRetryTask(ctx context.Context, task DeliveryTask) { - webhookDB, err := e.dbManager.GetDB(task.WebhookID) - if err != nil { - e.log.Error("failed to get webhook database for retry", - "webhook_id", task.WebhookID, - "delivery_id", task.DeliveryID, - "error", err, - ) - return - } - - // Verify delivery is still in retrying status (may have been - // cancelled or manually resolved while the timer was pending) - var d database.Delivery - if err := webhookDB.Select("id", "status"). - First(&d, "id = ?", task.DeliveryID).Error; err != nil { - e.log.Error("failed to load delivery for retry", - "delivery_id", task.DeliveryID, - "error", err, - ) - return - } - - if d.Status != database.DeliveryStatusRetrying { - e.log.Debug("skipping retry for delivery no longer in retrying status", - "delivery_id", d.ID, - "status", d.Status, - ) - return - } - - // Build Event from task data - event := database.Event{ - Method: task.Method, - Headers: task.Headers, - ContentType: task.ContentType, - } - event.ID = task.EventID - event.WebhookID = task.WebhookID - - if task.Body != nil { - event.Body = *task.Body - } else { - // Large body: fetch from per-webhook DB - var dbEvent database.Event - if err := webhookDB.Select("body"). - First(&dbEvent, "id = ?", task.EventID).Error; err != nil { - e.log.Error("failed to fetch event body for retry", - "event_id", task.EventID, - "error", err, - ) - return - } - event.Body = dbEvent.Body - } - - // Build Target from task data - target := database.Target{ - Name: task.TargetName, - Type: task.TargetType, - Config: task.TargetConfig, - MaxRetries: task.MaxRetries, - } - target.ID = task.TargetID - - // Populate the delivery with event and target for processing - d.EventID = task.EventID - d.TargetID = task.TargetID - d.Event = event - d.Target = target - - e.processDelivery(ctx, webhookDB, &d, &task) -} - -// processWebhookPendingDeliveries queries a single webhook's database for -// all pending deliveries and processes them. Used for crash recovery where -// we don't have in-memory notifications — everything is loaded from the DB. -func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID string) { - webhookDB, err := e.dbManager.GetDB(webhookID) - if err != nil { - e.log.Error("failed to get webhook database", - "webhook_id", webhookID, - "error", err, - ) - return - } - +// recoverPendingDeliveries sends pending deliveries for a single webhook +// into the delivery channel. Used for crash recovery where we don't have +// in-memory notifications — everything is loaded from the DB and queued +// for workers to pick up. +func (e *Engine) recoverPendingDeliveries(ctx context.Context, webhookDB *gorm.DB, webhookID string) { var deliveries []database.Delivery result := webhookDB. Where("status = ?", database.DeliveryStatusPending). @@ -569,6 +545,11 @@ func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID return } + e.log.Info("recovering pending deliveries", + "webhook_id", webhookID, + "count", len(deliveries), + ) + // Collect unique target IDs and load targets from the main DB seen := make(map[string]bool) targetIDs := make([]string, 0, len(deliveries)) @@ -590,8 +571,7 @@ func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID targetMap[t.ID] = t } - // Fan out recovered deliveries in parallel — same as the normal - // delivery path, each task gets its own goroutine. + // Send recovered deliveries to the delivery channel for workers for i := range deliveries { select { case <-ctx.Done(): @@ -607,10 +587,14 @@ func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID ) continue } - deliveries[i].Target = target - // Build task from DB data for the recovery path - bodyStr := deliveries[i].Event.Body + // Build task from DB data + var bodyPtr *string + if len(deliveries[i].Event.Body) < MaxInlineBodySize { + bodyStr := deliveries[i].Event.Body + bodyPtr = &bodyStr + } + task := DeliveryTask{ DeliveryID: deliveries[i].ID, EventID: deliveries[i].EventID, @@ -623,21 +607,30 @@ func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID Method: deliveries[i].Event.Method, Headers: deliveries[i].Event.Headers, ContentType: deliveries[i].Event.ContentType, - Body: &bodyStr, + Body: bodyPtr, AttemptNum: 1, } - d := deliveries[i] // copy for goroutine closure safety - go func() { - e.processDelivery(ctx, webhookDB, &d, &task) - }() + select { + case e.deliveryCh <- task: + default: + e.log.Warn("delivery channel full during recovery, remaining deliveries will be recovered on next restart", + "delivery_id", deliveries[i].ID, + ) + return + } } } // scheduleRetry creates a Go timer that fires after the given delay and // sends the full DeliveryTask to the engine's retry channel. The task -// carries all data needed for the retry attempt, so when it fires, the -// engine can deliver without reading event or target data from the DB. +// carries all data needed for the retry attempt, so when it fires, a +// worker can deliver without reading event or target data from the DB. +// +// If the retry channel is full when the timer fires, the timer is +// dropped. The delivery remains in `retrying` status in the database +// and will be picked up by the periodic retry sweep (DB-mediated +// fallback path). No goroutines are blocked or re-armed. func (e *Engine) scheduleRetry(task DeliveryTask, delay time.Duration) { e.log.Debug("scheduling delivery retry", "webhook_id", task.WebhookID, @@ -650,13 +643,189 @@ func (e *Engine) scheduleRetry(task DeliveryTask, delay time.Duration) { select { case e.retryCh <- task: default: - e.log.Warn("retry channel full, delivery will be recovered on restart", + // Retry channel full — drop the timer. The delivery is + // already marked as `retrying` in the per-webhook DB, so + // the periodic retry sweep will pick it up. This is the + // DB-mediated fallback path: no blocked goroutines, no + // unbounded timer chains. + e.log.Warn("retry channel full, delivery will be recovered by periodic sweep", "delivery_id", task.DeliveryID, + "webhook_id", task.WebhookID, ) } }) } +// retrySweep runs periodically to scan all per-webhook databases for +// "orphaned" retrying deliveries — ones whose in-memory retry timer was +// dropped because the retry channel was full. This is the DB-mediated +// fallback path that ensures no retries are permanently lost even under +// extreme backpressure. +// +// The sweep is also the same mechanism used on startup recovery, making +// the system resilient to both channel overflow and unexpected restarts. +func (e *Engine) retrySweep(ctx context.Context) { + defer e.wg.Done() + ticker := time.NewTicker(retrySweepInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + e.sweepOrphanedRetries(ctx) + } + } +} + +// sweepOrphanedRetries scans all webhooks for retrying deliveries whose +// backoff period has elapsed. For each eligible delivery, it builds a +// DeliveryTask and sends it to the retry channel. If the channel is +// still full, the delivery is skipped and will be retried on the next +// sweep cycle. +func (e *Engine) sweepOrphanedRetries(ctx context.Context) { + var webhookIDs []string + if err := e.database.DB().Model(&database.Webhook{}).Pluck("id", &webhookIDs).Error; err != nil { + e.log.Error("retry sweep: failed to query webhook IDs", "error", err) + return + } + + for _, webhookID := range webhookIDs { + select { + case <-ctx.Done(): + return + default: + } + + if !e.dbManager.DBExists(webhookID) { + continue + } + + e.sweepWebhookRetries(ctx, webhookID) + } +} + +// sweepWebhookRetries scans a single webhook's database for retrying +// deliveries whose backoff period has elapsed and sends them to the +// retry channel. +func (e *Engine) sweepWebhookRetries(ctx context.Context, webhookID string) { + webhookDB, err := e.dbManager.GetDB(webhookID) + if err != nil { + e.log.Error("retry sweep: failed to get webhook database", + "webhook_id", webhookID, + "error", err, + ) + return + } + + var retrying []database.Delivery + if err := webhookDB.Where("status = ?", database.DeliveryStatusRetrying). + Find(&retrying).Error; err != nil { + e.log.Error("retry sweep: failed to query retrying deliveries", + "webhook_id", webhookID, + "error", err, + ) + return + } + + for i := range retrying { + select { + case <-ctx.Done(): + return + default: + } + + d := &retrying[i] + + // Count prior attempts to determine backoff + var resultCount int64 + webhookDB.Model(&database.DeliveryResult{}). + Where("delivery_id = ?", d.ID). + Count(&resultCount) + attemptNum := int(resultCount) + + // Check if the backoff period has elapsed since the last attempt. + // If it hasn't, this delivery likely has an active in-memory + // timer and is not orphaned — skip it. + var lastResult database.DeliveryResult + if err := webhookDB.Where("delivery_id = ?", d.ID). + Order("created_at DESC"). + First(&lastResult).Error; err == nil { + shift := attemptNum - 1 + if shift < 0 { + shift = 0 + } + if shift > 30 { + shift = 30 + } + backoff := time.Duration(1<