refactor: bounded worker pool with DB-mediated retry fallback
All checks were successful
check / check (push) Successful in 58s
All checks were successful
check / check (push) Successful in 58s
Replace unbounded goroutine-per-delivery fan-out with a fixed-size worker pool (10 workers). Channels serve as bounded queues (10,000 buffer). Workers are the only goroutines doing HTTP delivery. When retry channel overflows, timers are dropped instead of re-armed. The delivery stays in 'retrying' status in the DB and a periodic sweep (every 60s) recovers orphaned retries. The database is the durable fallback — same path used on startup recovery. Addresses owner feedback on circuit breaker recovery goroutine flood.
This commit is contained in:
66
README.md
66
README.md
@@ -496,9 +496,11 @@ External Service
|
|||||||
┌──────────────┐
|
┌──────────────┐
|
||||||
│ Delivery │◄── retry timers
|
│ Delivery │◄── retry timers
|
||||||
│ Engine │ (backoff)
|
│ Engine │ (backoff)
|
||||||
|
│ (worker │
|
||||||
|
│ pool) │
|
||||||
└──────┬───────┘
|
└──────┬───────┘
|
||||||
│
|
│
|
||||||
┌─── parallel goroutines (fan-out) ───┐
|
┌── bounded worker pool (N workers) ──┐
|
||||||
▼ ▼ ▼
|
▼ ▼ ▼
|
||||||
┌────────────┐ ┌────────────┐ ┌────────────┐
|
┌────────────┐ ┌────────────┐ ┌────────────┐
|
||||||
│ HTTP Target│ │Retry Target│ │ Log Target │
|
│ HTTP Target│ │Retry Target│ │ Log Target │
|
||||||
@@ -508,28 +510,56 @@ External Service
|
|||||||
└────────────┘
|
└────────────┘
|
||||||
```
|
```
|
||||||
|
|
||||||
### Parallel Fan-Out Delivery
|
### Bounded Worker Pool
|
||||||
|
|
||||||
When the delivery engine receives a batch of tasks for an event, it
|
The delivery engine uses a **fixed-size worker pool** (default: 10
|
||||||
fans out **all targets in parallel** — each `DeliveryTask` is dispatched
|
workers) to process all deliveries. At most N deliveries are in-flight
|
||||||
in its own goroutine immediately. An HTTP target, a retry target, and
|
at any time, preventing goroutine explosions regardless of queue depth.
|
||||||
a log target for the same event all start delivering simultaneously
|
|
||||||
with no sequential bottleneck.
|
**Architecture:**
|
||||||
|
|
||||||
|
- **Channels as queues:** Two buffered channels serve as bounded queues:
|
||||||
|
a delivery channel (new tasks from the webhook handler) and a retry
|
||||||
|
channel (tasks from backoff timers). Both are buffered to 10,000.
|
||||||
|
- **Fan-out via channel, not goroutines:** When an event arrives with
|
||||||
|
multiple targets, each `DeliveryTask` is sent to the delivery channel.
|
||||||
|
Workers pick them up and process them — no goroutine-per-target.
|
||||||
|
- **Worker goroutines:** A fixed number of worker goroutines select from
|
||||||
|
both channels. Each worker processes one task at a time, then picks up
|
||||||
|
the next. Workers are the ONLY goroutines doing actual HTTP delivery.
|
||||||
|
- **Retry backpressure with DB fallback:** When a retry timer fires and
|
||||||
|
the retry channel is full, the timer is dropped — the delivery stays
|
||||||
|
in `retrying` status in the database. A periodic sweep (every 60s)
|
||||||
|
scans for these "orphaned" retries and re-queues them. No blocked
|
||||||
|
goroutines, no unbounded timer chains.
|
||||||
|
- **Bounded concurrency:** At most N deliveries (N = number of workers)
|
||||||
|
are in-flight simultaneously. Even if a circuit breaker is open for
|
||||||
|
hours and thousands of retries queue up in the channels, the workers
|
||||||
|
drain them at a controlled rate when the circuit closes.
|
||||||
|
|
||||||
This means:
|
This means:
|
||||||
|
|
||||||
- **No head-of-line blocking** — a slow HTTP target doesn't delay the
|
- **No goroutine explosion** — even with 10,000 queued retries, only
|
||||||
log target or other targets.
|
N worker goroutines exist.
|
||||||
- **Maximum throughput** — all targets receive the event as quickly as
|
- **Natural backpressure** — if workers are busy, new tasks wait in the
|
||||||
possible.
|
channel buffer rather than spawning more goroutines.
|
||||||
- **Independent results** — each goroutine records its own delivery
|
- **Independent results** — each worker records its own delivery result
|
||||||
result in the per-webhook database without coordination.
|
in the per-webhook database without coordination.
|
||||||
- **Fire-and-forget** — the engine doesn't wait for all goroutines to
|
- **Graceful shutdown** — cancel the context, workers finish their
|
||||||
finish; each delivery is completely independent.
|
current task and exit. `WaitGroup.Wait()` ensures clean shutdown.
|
||||||
|
|
||||||
The same parallel fan-out applies to crash recovery: when the engine
|
**Recovery paths:**
|
||||||
restarts and finds pending deliveries in per-webhook databases, it
|
|
||||||
recovers them and fans them out in parallel just like fresh deliveries.
|
1. **Startup recovery:** When the engine starts, it scans all per-webhook
|
||||||
|
databases for `pending` and `retrying` deliveries. Pending deliveries
|
||||||
|
are sent to the delivery channel; retrying deliveries get backoff
|
||||||
|
timers scheduled.
|
||||||
|
2. **Periodic retry sweep (DB-mediated fallback):** Every 60 seconds the
|
||||||
|
engine scans for `retrying` deliveries whose backoff period has
|
||||||
|
elapsed. This catches "orphaned" retries — ones whose in-memory timer
|
||||||
|
was dropped because the retry channel was full. The database is the
|
||||||
|
durable fallback that ensures no retry is permanently lost, even under
|
||||||
|
extreme backpressure.
|
||||||
|
|
||||||
### Circuit Breaker (Retry Targets)
|
### Circuit Breaker (Retry Targets)
|
||||||
|
|
||||||
|
|||||||
@@ -18,13 +18,26 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// notifyChannelSize is the buffer size for the delivery notification channel.
|
// deliveryChannelSize is the buffer size for the delivery channel.
|
||||||
// Sized large enough that the webhook handler should never block.
|
// New DeliveryTasks from the webhook handler are sent here. Workers
|
||||||
notifyChannelSize = 1000
|
// drain this channel. Sized large enough that the webhook handler
|
||||||
|
// should never block under normal load.
|
||||||
|
deliveryChannelSize = 10000
|
||||||
|
|
||||||
// retryChannelSize is the buffer size for the retry channel. Timer-fired
|
// retryChannelSize is the buffer size for the retry channel.
|
||||||
// retries are sent here for processing by the engine goroutine.
|
// Timer-fired retries are sent here for processing by workers.
|
||||||
retryChannelSize = 1000
|
retryChannelSize = 10000
|
||||||
|
|
||||||
|
// defaultWorkers is the number of worker goroutines in the delivery
|
||||||
|
// engine pool. At most this many deliveries are in-flight at any
|
||||||
|
// time, preventing goroutine explosions regardless of queue depth.
|
||||||
|
defaultWorkers = 10
|
||||||
|
|
||||||
|
// retrySweepInterval is how often the periodic retry sweep runs.
|
||||||
|
// The sweep scans all per-webhook databases for "orphaned" retrying
|
||||||
|
// deliveries — ones whose in-memory timer was dropped because the
|
||||||
|
// retry channel was full. This is the DB-mediated fallback path.
|
||||||
|
retrySweepInterval = 60 * time.Second
|
||||||
|
|
||||||
// MaxInlineBodySize is the maximum event body size that will be carried
|
// MaxInlineBodySize is the maximum event body size that will be carried
|
||||||
// inline in a DeliveryTask through the channel. Bodies at or above this
|
// inline in a DeliveryTask through the channel. Bodies at or above this
|
||||||
@@ -94,21 +107,20 @@ type EngineParams struct {
|
|||||||
Logger *logger.Logger
|
Logger *logger.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// Engine processes queued deliveries in the background using an
|
// Engine processes queued deliveries in the background using a bounded
|
||||||
// event-driven architecture. New deliveries arrive as self-contained
|
// worker pool architecture. New deliveries arrive as individual
|
||||||
// DeliveryTask slices via a buffered channel from the webhook handler.
|
// DeliveryTask values via a buffered delivery channel from the webhook
|
||||||
// In the happy path (body ≤ 16KB), the engine delivers without reading
|
// handler. Failed deliveries that need retry are scheduled via Go timers
|
||||||
// from any database — it only writes to record results. Failed deliveries
|
// with exponential backoff; each timer fires into a separate retry
|
||||||
// that need retry are scheduled via Go timers with exponential backoff;
|
// channel. A fixed number of worker goroutines drain both channels,
|
||||||
// each timer fires into a separate retry channel carrying the full
|
// ensuring at most N deliveries are in-flight at any time (N = number
|
||||||
// DeliveryTask so retries also avoid unnecessary DB reads. The database
|
// of workers). This prevents goroutine explosions when a circuit breaker
|
||||||
// stores delivery status for crash recovery only; on startup the engine
|
// is open for a long period and many retries queue up.
|
||||||
// scans for interrupted deliveries and re-queues them.
|
|
||||||
//
|
//
|
||||||
// All targets for a single event are delivered in parallel — each
|
// In the happy path (body ≤ 16KB), a worker delivers without reading
|
||||||
// DeliveryTask is dispatched in its own goroutine for maximum fan-out
|
// from any database — it only writes to record results. The database
|
||||||
// speed. Retry targets are protected by a per-target circuit breaker
|
// stores delivery status for crash recovery only; on startup the engine
|
||||||
// that stops hammering a down target after consecutive failures.
|
// scans for interrupted deliveries and re-queues them into the channels.
|
||||||
type Engine struct {
|
type Engine struct {
|
||||||
database *database.Database
|
database *database.Database
|
||||||
dbManager *database.WebhookDBManager
|
dbManager *database.WebhookDBManager
|
||||||
@@ -116,8 +128,9 @@ type Engine struct {
|
|||||||
client *http.Client
|
client *http.Client
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
notifyCh chan []DeliveryTask
|
deliveryCh chan DeliveryTask
|
||||||
retryCh chan DeliveryTask
|
retryCh chan DeliveryTask
|
||||||
|
workers int
|
||||||
|
|
||||||
// circuitBreakers stores a *CircuitBreaker per target ID. Only used
|
// circuitBreakers stores a *CircuitBreaker per target ID. Only used
|
||||||
// for retry targets — HTTP, database, and log targets do not need
|
// for retry targets — HTTP, database, and log targets do not need
|
||||||
@@ -134,8 +147,9 @@ func New(lc fx.Lifecycle, params EngineParams) *Engine {
|
|||||||
client: &http.Client{
|
client: &http.Client{
|
||||||
Timeout: httpClientTimeout,
|
Timeout: httpClientTimeout,
|
||||||
},
|
},
|
||||||
notifyCh: make(chan []DeliveryTask, notifyChannelSize),
|
deliveryCh: make(chan DeliveryTask, deliveryChannelSize),
|
||||||
retryCh: make(chan DeliveryTask, retryChannelSize),
|
retryCh: make(chan DeliveryTask, retryChannelSize),
|
||||||
|
workers: defaultWorkers,
|
||||||
}
|
}
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
@@ -155,9 +169,25 @@ func New(lc fx.Lifecycle, params EngineParams) *Engine {
|
|||||||
func (e *Engine) start() {
|
func (e *Engine) start() {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
e.cancel = cancel
|
e.cancel = cancel
|
||||||
|
|
||||||
|
// Start the worker pool. These are the ONLY goroutines that
|
||||||
|
// perform HTTP delivery. Bounded concurrency is guaranteed.
|
||||||
|
for i := 0; i < e.workers; i++ {
|
||||||
e.wg.Add(1)
|
e.wg.Add(1)
|
||||||
go e.run(ctx)
|
go e.worker(ctx)
|
||||||
e.log.Info("delivery engine started")
|
}
|
||||||
|
|
||||||
|
// Start recovery scan in a separate goroutine. Recovered tasks
|
||||||
|
// are sent into the delivery/retry channels and picked up by workers.
|
||||||
|
e.wg.Add(1)
|
||||||
|
go e.recoverPending(ctx)
|
||||||
|
|
||||||
|
// Start the periodic retry sweep. This is the DB-mediated fallback
|
||||||
|
// for retries whose timers were dropped due to channel overflow.
|
||||||
|
e.wg.Add(1)
|
||||||
|
go e.retrySweep(ctx)
|
||||||
|
|
||||||
|
e.log.Info("delivery engine started", "workers", e.workers)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) stop() {
|
func (e *Engine) stop() {
|
||||||
@@ -170,43 +200,191 @@ func (e *Engine) stop() {
|
|||||||
// Notify signals the delivery engine that new deliveries are ready.
|
// Notify signals the delivery engine that new deliveries are ready.
|
||||||
// Called by the webhook handler after creating delivery records. Each
|
// Called by the webhook handler after creating delivery records. Each
|
||||||
// DeliveryTask carries all data needed for delivery in the ≤16KB case.
|
// DeliveryTask carries all data needed for delivery in the ≤16KB case.
|
||||||
// The call is non-blocking; if the channel is full, a warning is logged
|
// Tasks are sent individually to the delivery channel. The call is
|
||||||
// and the deliveries will be recovered on the next engine restart.
|
// non-blocking; if the channel is full, a warning is logged and the
|
||||||
|
// delivery will be recovered on the next engine restart.
|
||||||
func (e *Engine) Notify(tasks []DeliveryTask) {
|
func (e *Engine) Notify(tasks []DeliveryTask) {
|
||||||
|
for i := range tasks {
|
||||||
select {
|
select {
|
||||||
case e.notifyCh <- tasks:
|
case e.deliveryCh <- tasks[i]:
|
||||||
default:
|
default:
|
||||||
e.log.Warn("delivery notification channel full, deliveries will be recovered on restart",
|
e.log.Warn("delivery channel full, task will be recovered on restart",
|
||||||
"task_count", len(tasks),
|
"delivery_id", tasks[i].DeliveryID,
|
||||||
|
"event_id", tasks[i].EventID,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (e *Engine) run(ctx context.Context) {
|
// worker is the main loop for a worker goroutine. It selects from both
|
||||||
|
// the delivery channel (new tasks from the handler) and the retry channel
|
||||||
|
// (tasks from backoff timers). At most e.workers deliveries are in-flight
|
||||||
|
// at any time.
|
||||||
|
func (e *Engine) worker(ctx context.Context) {
|
||||||
defer e.wg.Done()
|
defer e.wg.Done()
|
||||||
|
|
||||||
// On startup, recover any pending or retrying deliveries that were
|
|
||||||
// interrupted by an unexpected shutdown. Pending deliveries are
|
|
||||||
// processed immediately; retrying deliveries get timers scheduled
|
|
||||||
// for their remaining backoff.
|
|
||||||
e.recoverInFlight(ctx)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case tasks := <-e.notifyCh:
|
case task := <-e.deliveryCh:
|
||||||
e.processDeliveryTasks(ctx, tasks)
|
e.processNewTask(ctx, &task)
|
||||||
case task := <-e.retryCh:
|
case task := <-e.retryCh:
|
||||||
e.processRetryTask(ctx, task)
|
e.processRetryTask(ctx, &task)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// recoverPending runs on startup to recover any pending or retrying
|
||||||
|
// deliveries that were interrupted by an unexpected shutdown. Recovered
|
||||||
|
// tasks are sent into the delivery/retry channels for workers to pick up.
|
||||||
|
func (e *Engine) recoverPending(ctx context.Context) {
|
||||||
|
defer e.wg.Done()
|
||||||
|
e.recoverInFlight(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// processNewTask handles a single new delivery task from the delivery
|
||||||
|
// channel. It builds the event and target context from the task's inline
|
||||||
|
// data and executes the delivery. For large bodies (≥ MaxInlineBodySize),
|
||||||
|
// the body is fetched from the per-webhook database on demand.
|
||||||
|
func (e *Engine) processNewTask(ctx context.Context, task *DeliveryTask) {
|
||||||
|
webhookDB, err := e.dbManager.GetDB(task.WebhookID)
|
||||||
|
if err != nil {
|
||||||
|
e.log.Error("failed to get webhook database",
|
||||||
|
"webhook_id", task.WebhookID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build Event from task data
|
||||||
|
event := database.Event{
|
||||||
|
Method: task.Method,
|
||||||
|
Headers: task.Headers,
|
||||||
|
ContentType: task.ContentType,
|
||||||
|
}
|
||||||
|
event.ID = task.EventID
|
||||||
|
event.WebhookID = task.WebhookID
|
||||||
|
|
||||||
|
if task.Body != nil {
|
||||||
|
event.Body = *task.Body
|
||||||
|
} else {
|
||||||
|
// Large body: fetch from per-webhook DB
|
||||||
|
var dbEvent database.Event
|
||||||
|
if err := webhookDB.Select("body").
|
||||||
|
First(&dbEvent, "id = ?", task.EventID).Error; err != nil {
|
||||||
|
e.log.Error("failed to fetch event body from database",
|
||||||
|
"event_id", task.EventID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
event.Body = dbEvent.Body
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build Target from task data (no main DB query needed)
|
||||||
|
target := database.Target{
|
||||||
|
Name: task.TargetName,
|
||||||
|
Type: task.TargetType,
|
||||||
|
Config: task.TargetConfig,
|
||||||
|
MaxRetries: task.MaxRetries,
|
||||||
|
}
|
||||||
|
target.ID = task.TargetID
|
||||||
|
|
||||||
|
// Build Delivery struct for the processing chain
|
||||||
|
d := &database.Delivery{
|
||||||
|
EventID: task.EventID,
|
||||||
|
TargetID: task.TargetID,
|
||||||
|
Status: database.DeliveryStatusPending,
|
||||||
|
Event: event,
|
||||||
|
Target: target,
|
||||||
|
}
|
||||||
|
d.ID = task.DeliveryID
|
||||||
|
|
||||||
|
e.processDelivery(ctx, webhookDB, d, task)
|
||||||
|
}
|
||||||
|
|
||||||
|
// processRetryTask handles a single delivery task fired by a retry timer.
|
||||||
|
// The task carries all data needed for delivery (same as the initial
|
||||||
|
// notification). The only DB read is a status check to verify the delivery
|
||||||
|
// hasn't been cancelled or resolved while the timer was pending.
|
||||||
|
func (e *Engine) processRetryTask(ctx context.Context, task *DeliveryTask) {
|
||||||
|
webhookDB, err := e.dbManager.GetDB(task.WebhookID)
|
||||||
|
if err != nil {
|
||||||
|
e.log.Error("failed to get webhook database for retry",
|
||||||
|
"webhook_id", task.WebhookID,
|
||||||
|
"delivery_id", task.DeliveryID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify delivery is still in retrying status (may have been
|
||||||
|
// cancelled or manually resolved while the timer was pending)
|
||||||
|
var d database.Delivery
|
||||||
|
if err := webhookDB.Select("id", "status").
|
||||||
|
First(&d, "id = ?", task.DeliveryID).Error; err != nil {
|
||||||
|
e.log.Error("failed to load delivery for retry",
|
||||||
|
"delivery_id", task.DeliveryID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.Status != database.DeliveryStatusRetrying {
|
||||||
|
e.log.Debug("skipping retry for delivery no longer in retrying status",
|
||||||
|
"delivery_id", d.ID,
|
||||||
|
"status", d.Status,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build Event from task data
|
||||||
|
event := database.Event{
|
||||||
|
Method: task.Method,
|
||||||
|
Headers: task.Headers,
|
||||||
|
ContentType: task.ContentType,
|
||||||
|
}
|
||||||
|
event.ID = task.EventID
|
||||||
|
event.WebhookID = task.WebhookID
|
||||||
|
|
||||||
|
if task.Body != nil {
|
||||||
|
event.Body = *task.Body
|
||||||
|
} else {
|
||||||
|
// Large body: fetch from per-webhook DB
|
||||||
|
var dbEvent database.Event
|
||||||
|
if err := webhookDB.Select("body").
|
||||||
|
First(&dbEvent, "id = ?", task.EventID).Error; err != nil {
|
||||||
|
e.log.Error("failed to fetch event body for retry",
|
||||||
|
"event_id", task.EventID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
event.Body = dbEvent.Body
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build Target from task data
|
||||||
|
target := database.Target{
|
||||||
|
Name: task.TargetName,
|
||||||
|
Type: task.TargetType,
|
||||||
|
Config: task.TargetConfig,
|
||||||
|
MaxRetries: task.MaxRetries,
|
||||||
|
}
|
||||||
|
target.ID = task.TargetID
|
||||||
|
|
||||||
|
// Populate the delivery with event and target for processing
|
||||||
|
d.EventID = task.EventID
|
||||||
|
d.TargetID = task.TargetID
|
||||||
|
d.Event = event
|
||||||
|
d.Target = target
|
||||||
|
|
||||||
|
e.processDelivery(ctx, webhookDB, &d, task)
|
||||||
|
}
|
||||||
|
|
||||||
// recoverInFlight scans all webhooks on startup for deliveries that were
|
// recoverInFlight scans all webhooks on startup for deliveries that were
|
||||||
// interrupted by an unexpected shutdown. Pending deliveries are processed
|
// interrupted by an unexpected shutdown. Pending deliveries are sent to
|
||||||
// immediately; retrying deliveries get timers scheduled for their
|
// the delivery channel; retrying deliveries get timers scheduled for
|
||||||
// remaining backoff period.
|
// their remaining backoff period.
|
||||||
func (e *Engine) recoverInFlight(ctx context.Context) {
|
func (e *Engine) recoverInFlight(ctx context.Context) {
|
||||||
var webhookIDs []string
|
var webhookIDs []string
|
||||||
if err := e.database.DB().Model(&database.Webhook{}).Pluck("id", &webhookIDs).Error; err != nil {
|
if err := e.database.DB().Model(&database.Webhook{}).Pluck("id", &webhookIDs).Error; err != nil {
|
||||||
@@ -230,9 +408,8 @@ func (e *Engine) recoverInFlight(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// recoverWebhookDeliveries recovers pending and retrying deliveries for
|
// recoverWebhookDeliveries recovers pending and retrying deliveries for
|
||||||
// a single webhook. This is the recovery path — it reads everything from
|
// a single webhook. Pending deliveries are sent to the delivery channel;
|
||||||
// the database since there are no in-memory notifications available after
|
// retrying deliveries get timers scheduled for their remaining backoff.
|
||||||
// a restart.
|
|
||||||
func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) {
|
func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) {
|
||||||
webhookDB, err := e.dbManager.GetDB(webhookID)
|
webhookDB, err := e.dbManager.GetDB(webhookID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -243,19 +420,8 @@ func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for pending deliveries and process them immediately
|
// Recover pending deliveries by sending them to the delivery channel
|
||||||
var pendingCount int64
|
e.recoverPendingDeliveries(ctx, webhookDB, webhookID)
|
||||||
webhookDB.Model(&database.Delivery{}).
|
|
||||||
Where("status = ?", database.DeliveryStatusPending).
|
|
||||||
Count(&pendingCount)
|
|
||||||
|
|
||||||
if pendingCount > 0 {
|
|
||||||
e.log.Info("recovering pending deliveries",
|
|
||||||
"webhook_id", webhookID,
|
|
||||||
"count", pendingCount,
|
|
||||||
)
|
|
||||||
e.processWebhookPendingDeliveries(ctx, webhookID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Schedule timers for retrying deliveries based on remaining backoff
|
// Schedule timers for retrying deliveries based on remaining backoff
|
||||||
var retrying []database.Delivery
|
var retrying []database.Delivery
|
||||||
@@ -356,201 +522,11 @@ func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// processDeliveryTasks handles a batch of delivery tasks from the webhook
|
// recoverPendingDeliveries sends pending deliveries for a single webhook
|
||||||
// handler. Each task is dispatched in its own goroutine for parallel
|
// into the delivery channel. Used for crash recovery where we don't have
|
||||||
// fan-out — all targets for a single event start delivering simultaneously.
|
// in-memory notifications — everything is loaded from the DB and queued
|
||||||
// In the happy path (body ≤ 16KB), the engine delivers without reading
|
// for workers to pick up.
|
||||||
// from any database — it trusts the handler's inline data and only touches
|
func (e *Engine) recoverPendingDeliveries(ctx context.Context, webhookDB *gorm.DB, webhookID string) {
|
||||||
// the DB to record results. For large bodies (body > 16KB), the body is
|
|
||||||
// fetched once and shared across all goroutines in the batch.
|
|
||||||
func (e *Engine) processDeliveryTasks(ctx context.Context, tasks []DeliveryTask) {
|
|
||||||
if len(tasks) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// All tasks in a batch share the same webhook ID
|
|
||||||
webhookID := tasks[0].WebhookID
|
|
||||||
webhookDB, err := e.dbManager.GetDB(webhookID)
|
|
||||||
if err != nil {
|
|
||||||
e.log.Error("failed to get webhook database",
|
|
||||||
"webhook_id", webhookID,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// For the large-body case, pre-fetch the event body once before
|
|
||||||
// fanning out so all goroutines share the same data.
|
|
||||||
var fetchedBody *string
|
|
||||||
if tasks[0].Body == nil {
|
|
||||||
var dbEvent database.Event
|
|
||||||
if err := webhookDB.Select("body").
|
|
||||||
First(&dbEvent, "id = ?", tasks[0].EventID).Error; err != nil {
|
|
||||||
e.log.Error("failed to fetch event body from database",
|
|
||||||
"event_id", tasks[0].EventID,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fetchedBody = &dbEvent.Body
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fan out: spin up a goroutine per task for parallel delivery.
|
|
||||||
// Each goroutine is independent (fire-and-forget) and records its
|
|
||||||
// own result. No need to wait for all goroutines to finish.
|
|
||||||
for i := range tasks {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
task := tasks[i] // copy for goroutine closure safety
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
e.deliverTask(ctx, webhookDB, &task, fetchedBody)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// deliverTask prepares and executes a single delivery task. Called from
|
|
||||||
// a dedicated goroutine for parallel fan-out.
|
|
||||||
func (e *Engine) deliverTask(ctx context.Context, webhookDB *gorm.DB, task *DeliveryTask, fetchedBody *string) {
|
|
||||||
// Build Event from task data
|
|
||||||
event := database.Event{
|
|
||||||
Method: task.Method,
|
|
||||||
Headers: task.Headers,
|
|
||||||
ContentType: task.ContentType,
|
|
||||||
}
|
|
||||||
event.ID = task.EventID
|
|
||||||
event.WebhookID = task.WebhookID
|
|
||||||
|
|
||||||
switch {
|
|
||||||
case task.Body != nil:
|
|
||||||
event.Body = *task.Body
|
|
||||||
case fetchedBody != nil:
|
|
||||||
event.Body = *fetchedBody
|
|
||||||
default:
|
|
||||||
e.log.Error("no body available for delivery task",
|
|
||||||
"delivery_id", task.DeliveryID,
|
|
||||||
"event_id", task.EventID,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build Target from task data (no main DB query needed)
|
|
||||||
target := database.Target{
|
|
||||||
Name: task.TargetName,
|
|
||||||
Type: task.TargetType,
|
|
||||||
Config: task.TargetConfig,
|
|
||||||
MaxRetries: task.MaxRetries,
|
|
||||||
}
|
|
||||||
target.ID = task.TargetID
|
|
||||||
|
|
||||||
// Build Delivery struct for the processing chain
|
|
||||||
d := &database.Delivery{
|
|
||||||
EventID: task.EventID,
|
|
||||||
TargetID: task.TargetID,
|
|
||||||
Status: database.DeliveryStatusPending,
|
|
||||||
Event: event,
|
|
||||||
Target: target,
|
|
||||||
}
|
|
||||||
d.ID = task.DeliveryID
|
|
||||||
|
|
||||||
e.processDelivery(ctx, webhookDB, d, task)
|
|
||||||
}
|
|
||||||
|
|
||||||
// processRetryTask handles a single delivery task fired by a retry timer.
|
|
||||||
// The task carries all data needed for delivery (same as the initial
|
|
||||||
// notification). The only DB read is a status check to verify the delivery
|
|
||||||
// hasn't been cancelled or resolved while the timer was pending.
|
|
||||||
func (e *Engine) processRetryTask(ctx context.Context, task DeliveryTask) {
|
|
||||||
webhookDB, err := e.dbManager.GetDB(task.WebhookID)
|
|
||||||
if err != nil {
|
|
||||||
e.log.Error("failed to get webhook database for retry",
|
|
||||||
"webhook_id", task.WebhookID,
|
|
||||||
"delivery_id", task.DeliveryID,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify delivery is still in retrying status (may have been
|
|
||||||
// cancelled or manually resolved while the timer was pending)
|
|
||||||
var d database.Delivery
|
|
||||||
if err := webhookDB.Select("id", "status").
|
|
||||||
First(&d, "id = ?", task.DeliveryID).Error; err != nil {
|
|
||||||
e.log.Error("failed to load delivery for retry",
|
|
||||||
"delivery_id", task.DeliveryID,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if d.Status != database.DeliveryStatusRetrying {
|
|
||||||
e.log.Debug("skipping retry for delivery no longer in retrying status",
|
|
||||||
"delivery_id", d.ID,
|
|
||||||
"status", d.Status,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build Event from task data
|
|
||||||
event := database.Event{
|
|
||||||
Method: task.Method,
|
|
||||||
Headers: task.Headers,
|
|
||||||
ContentType: task.ContentType,
|
|
||||||
}
|
|
||||||
event.ID = task.EventID
|
|
||||||
event.WebhookID = task.WebhookID
|
|
||||||
|
|
||||||
if task.Body != nil {
|
|
||||||
event.Body = *task.Body
|
|
||||||
} else {
|
|
||||||
// Large body: fetch from per-webhook DB
|
|
||||||
var dbEvent database.Event
|
|
||||||
if err := webhookDB.Select("body").
|
|
||||||
First(&dbEvent, "id = ?", task.EventID).Error; err != nil {
|
|
||||||
e.log.Error("failed to fetch event body for retry",
|
|
||||||
"event_id", task.EventID,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
event.Body = dbEvent.Body
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build Target from task data
|
|
||||||
target := database.Target{
|
|
||||||
Name: task.TargetName,
|
|
||||||
Type: task.TargetType,
|
|
||||||
Config: task.TargetConfig,
|
|
||||||
MaxRetries: task.MaxRetries,
|
|
||||||
}
|
|
||||||
target.ID = task.TargetID
|
|
||||||
|
|
||||||
// Populate the delivery with event and target for processing
|
|
||||||
d.EventID = task.EventID
|
|
||||||
d.TargetID = task.TargetID
|
|
||||||
d.Event = event
|
|
||||||
d.Target = target
|
|
||||||
|
|
||||||
e.processDelivery(ctx, webhookDB, &d, &task)
|
|
||||||
}
|
|
||||||
|
|
||||||
// processWebhookPendingDeliveries queries a single webhook's database for
|
|
||||||
// all pending deliveries and processes them. Used for crash recovery where
|
|
||||||
// we don't have in-memory notifications — everything is loaded from the DB.
|
|
||||||
func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID string) {
|
|
||||||
webhookDB, err := e.dbManager.GetDB(webhookID)
|
|
||||||
if err != nil {
|
|
||||||
e.log.Error("failed to get webhook database",
|
|
||||||
"webhook_id", webhookID,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var deliveries []database.Delivery
|
var deliveries []database.Delivery
|
||||||
result := webhookDB.
|
result := webhookDB.
|
||||||
Where("status = ?", database.DeliveryStatusPending).
|
Where("status = ?", database.DeliveryStatusPending).
|
||||||
@@ -569,6 +545,11 @@ func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.log.Info("recovering pending deliveries",
|
||||||
|
"webhook_id", webhookID,
|
||||||
|
"count", len(deliveries),
|
||||||
|
)
|
||||||
|
|
||||||
// Collect unique target IDs and load targets from the main DB
|
// Collect unique target IDs and load targets from the main DB
|
||||||
seen := make(map[string]bool)
|
seen := make(map[string]bool)
|
||||||
targetIDs := make([]string, 0, len(deliveries))
|
targetIDs := make([]string, 0, len(deliveries))
|
||||||
@@ -590,8 +571,7 @@ func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID
|
|||||||
targetMap[t.ID] = t
|
targetMap[t.ID] = t
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fan out recovered deliveries in parallel — same as the normal
|
// Send recovered deliveries to the delivery channel for workers
|
||||||
// delivery path, each task gets its own goroutine.
|
|
||||||
for i := range deliveries {
|
for i := range deliveries {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@@ -607,10 +587,14 @@ func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID
|
|||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
deliveries[i].Target = target
|
|
||||||
|
|
||||||
// Build task from DB data for the recovery path
|
// Build task from DB data
|
||||||
|
var bodyPtr *string
|
||||||
|
if len(deliveries[i].Event.Body) < MaxInlineBodySize {
|
||||||
bodyStr := deliveries[i].Event.Body
|
bodyStr := deliveries[i].Event.Body
|
||||||
|
bodyPtr = &bodyStr
|
||||||
|
}
|
||||||
|
|
||||||
task := DeliveryTask{
|
task := DeliveryTask{
|
||||||
DeliveryID: deliveries[i].ID,
|
DeliveryID: deliveries[i].ID,
|
||||||
EventID: deliveries[i].EventID,
|
EventID: deliveries[i].EventID,
|
||||||
@@ -623,21 +607,30 @@ func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID
|
|||||||
Method: deliveries[i].Event.Method,
|
Method: deliveries[i].Event.Method,
|
||||||
Headers: deliveries[i].Event.Headers,
|
Headers: deliveries[i].Event.Headers,
|
||||||
ContentType: deliveries[i].Event.ContentType,
|
ContentType: deliveries[i].Event.ContentType,
|
||||||
Body: &bodyStr,
|
Body: bodyPtr,
|
||||||
AttemptNum: 1,
|
AttemptNum: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
d := deliveries[i] // copy for goroutine closure safety
|
select {
|
||||||
go func() {
|
case e.deliveryCh <- task:
|
||||||
e.processDelivery(ctx, webhookDB, &d, &task)
|
default:
|
||||||
}()
|
e.log.Warn("delivery channel full during recovery, remaining deliveries will be recovered on next restart",
|
||||||
|
"delivery_id", deliveries[i].ID,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// scheduleRetry creates a Go timer that fires after the given delay and
|
// scheduleRetry creates a Go timer that fires after the given delay and
|
||||||
// sends the full DeliveryTask to the engine's retry channel. The task
|
// sends the full DeliveryTask to the engine's retry channel. The task
|
||||||
// carries all data needed for the retry attempt, so when it fires, the
|
// carries all data needed for the retry attempt, so when it fires, a
|
||||||
// engine can deliver without reading event or target data from the DB.
|
// worker can deliver without reading event or target data from the DB.
|
||||||
|
//
|
||||||
|
// If the retry channel is full when the timer fires, the timer is
|
||||||
|
// dropped. The delivery remains in `retrying` status in the database
|
||||||
|
// and will be picked up by the periodic retry sweep (DB-mediated
|
||||||
|
// fallback path). No goroutines are blocked or re-armed.
|
||||||
func (e *Engine) scheduleRetry(task DeliveryTask, delay time.Duration) {
|
func (e *Engine) scheduleRetry(task DeliveryTask, delay time.Duration) {
|
||||||
e.log.Debug("scheduling delivery retry",
|
e.log.Debug("scheduling delivery retry",
|
||||||
"webhook_id", task.WebhookID,
|
"webhook_id", task.WebhookID,
|
||||||
@@ -650,13 +643,189 @@ func (e *Engine) scheduleRetry(task DeliveryTask, delay time.Duration) {
|
|||||||
select {
|
select {
|
||||||
case e.retryCh <- task:
|
case e.retryCh <- task:
|
||||||
default:
|
default:
|
||||||
e.log.Warn("retry channel full, delivery will be recovered on restart",
|
// Retry channel full — drop the timer. The delivery is
|
||||||
|
// already marked as `retrying` in the per-webhook DB, so
|
||||||
|
// the periodic retry sweep will pick it up. This is the
|
||||||
|
// DB-mediated fallback path: no blocked goroutines, no
|
||||||
|
// unbounded timer chains.
|
||||||
|
e.log.Warn("retry channel full, delivery will be recovered by periodic sweep",
|
||||||
"delivery_id", task.DeliveryID,
|
"delivery_id", task.DeliveryID,
|
||||||
|
"webhook_id", task.WebhookID,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// retrySweep runs periodically to scan all per-webhook databases for
|
||||||
|
// "orphaned" retrying deliveries — ones whose in-memory retry timer was
|
||||||
|
// dropped because the retry channel was full. This is the DB-mediated
|
||||||
|
// fallback path that ensures no retries are permanently lost even under
|
||||||
|
// extreme backpressure.
|
||||||
|
//
|
||||||
|
// The sweep is also the same mechanism used on startup recovery, making
|
||||||
|
// the system resilient to both channel overflow and unexpected restarts.
|
||||||
|
func (e *Engine) retrySweep(ctx context.Context) {
|
||||||
|
defer e.wg.Done()
|
||||||
|
ticker := time.NewTicker(retrySweepInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
e.sweepOrphanedRetries(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sweepOrphanedRetries scans all webhooks for retrying deliveries whose
|
||||||
|
// backoff period has elapsed. For each eligible delivery, it builds a
|
||||||
|
// DeliveryTask and sends it to the retry channel. If the channel is
|
||||||
|
// still full, the delivery is skipped and will be retried on the next
|
||||||
|
// sweep cycle.
|
||||||
|
func (e *Engine) sweepOrphanedRetries(ctx context.Context) {
|
||||||
|
var webhookIDs []string
|
||||||
|
if err := e.database.DB().Model(&database.Webhook{}).Pluck("id", &webhookIDs).Error; err != nil {
|
||||||
|
e.log.Error("retry sweep: failed to query webhook IDs", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, webhookID := range webhookIDs {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if !e.dbManager.DBExists(webhookID) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
e.sweepWebhookRetries(ctx, webhookID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sweepWebhookRetries scans a single webhook's database for retrying
|
||||||
|
// deliveries whose backoff period has elapsed and sends them to the
|
||||||
|
// retry channel.
|
||||||
|
func (e *Engine) sweepWebhookRetries(ctx context.Context, webhookID string) {
|
||||||
|
webhookDB, err := e.dbManager.GetDB(webhookID)
|
||||||
|
if err != nil {
|
||||||
|
e.log.Error("retry sweep: failed to get webhook database",
|
||||||
|
"webhook_id", webhookID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var retrying []database.Delivery
|
||||||
|
if err := webhookDB.Where("status = ?", database.DeliveryStatusRetrying).
|
||||||
|
Find(&retrying).Error; err != nil {
|
||||||
|
e.log.Error("retry sweep: failed to query retrying deliveries",
|
||||||
|
"webhook_id", webhookID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range retrying {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
d := &retrying[i]
|
||||||
|
|
||||||
|
// Count prior attempts to determine backoff
|
||||||
|
var resultCount int64
|
||||||
|
webhookDB.Model(&database.DeliveryResult{}).
|
||||||
|
Where("delivery_id = ?", d.ID).
|
||||||
|
Count(&resultCount)
|
||||||
|
attemptNum := int(resultCount)
|
||||||
|
|
||||||
|
// Check if the backoff period has elapsed since the last attempt.
|
||||||
|
// If it hasn't, this delivery likely has an active in-memory
|
||||||
|
// timer and is not orphaned — skip it.
|
||||||
|
var lastResult database.DeliveryResult
|
||||||
|
if err := webhookDB.Where("delivery_id = ?", d.ID).
|
||||||
|
Order("created_at DESC").
|
||||||
|
First(&lastResult).Error; err == nil {
|
||||||
|
shift := attemptNum - 1
|
||||||
|
if shift < 0 {
|
||||||
|
shift = 0
|
||||||
|
}
|
||||||
|
if shift > 30 {
|
||||||
|
shift = 30
|
||||||
|
}
|
||||||
|
backoff := time.Duration(1<<uint(shift)) * time.Second //nolint:gosec // bounded above
|
||||||
|
if time.Since(lastResult.CreatedAt) < backoff {
|
||||||
|
continue // Backoff hasn't elapsed; likely has an active timer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// If no result exists, the delivery was set to retrying but
|
||||||
|
// never actually attempted — process immediately.
|
||||||
|
|
||||||
|
// Load event for this delivery
|
||||||
|
var event database.Event
|
||||||
|
if err := webhookDB.First(&event, "id = ?", d.EventID).Error; err != nil {
|
||||||
|
e.log.Error("retry sweep: failed to load event",
|
||||||
|
"delivery_id", d.ID,
|
||||||
|
"event_id", d.EventID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load target from main DB
|
||||||
|
var target database.Target
|
||||||
|
if err := e.database.DB().First(&target, "id = ?", d.TargetID).Error; err != nil {
|
||||||
|
e.log.Error("retry sweep: failed to load target",
|
||||||
|
"delivery_id", d.ID,
|
||||||
|
"target_id", d.TargetID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build task from DB data
|
||||||
|
var bodyPtr *string
|
||||||
|
if len(event.Body) < MaxInlineBodySize {
|
||||||
|
bodyStr := event.Body
|
||||||
|
bodyPtr = &bodyStr
|
||||||
|
}
|
||||||
|
|
||||||
|
task := DeliveryTask{
|
||||||
|
DeliveryID: d.ID,
|
||||||
|
EventID: d.EventID,
|
||||||
|
WebhookID: webhookID,
|
||||||
|
TargetID: target.ID,
|
||||||
|
TargetName: target.Name,
|
||||||
|
TargetType: target.Type,
|
||||||
|
TargetConfig: target.Config,
|
||||||
|
MaxRetries: target.MaxRetries,
|
||||||
|
Method: event.Method,
|
||||||
|
Headers: event.Headers,
|
||||||
|
ContentType: event.ContentType,
|
||||||
|
Body: bodyPtr,
|
||||||
|
AttemptNum: attemptNum + 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to send to retry channel; skip if still full
|
||||||
|
select {
|
||||||
|
case e.retryCh <- task:
|
||||||
|
e.log.Info("retry sweep: recovered orphaned retrying delivery",
|
||||||
|
"delivery_id", d.ID,
|
||||||
|
"webhook_id", webhookID,
|
||||||
|
"attempt", attemptNum+1,
|
||||||
|
)
|
||||||
|
default:
|
||||||
|
// Channel still full — will try again on the next sweep
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) {
|
func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) {
|
||||||
switch d.Target.Type {
|
switch d.Target.Type {
|
||||||
case database.TargetTypeHTTP:
|
case database.TargetTypeHTTP:
|
||||||
|
|||||||
Reference in New Issue
Block a user