refactor: self-contained delivery tasks — engine delivers without DB reads in happy path
All checks were successful
check / check (push) Successful in 58s
All checks were successful
check / check (push) Successful in 58s
The webhook handler now builds DeliveryTask structs carrying all target config and event data inline (for bodies ≤16KB) and sends them through the delivery channel. In the happy path, the engine delivers without reading from any database — it only writes to record delivery results. For large bodies (≥16KB), Body is nil and the engine fetches it from the per-webhook database on demand. Retry timers also carry the full DeliveryTask, so retries avoid unnecessary DB reads. The database is used for crash recovery only: on startup the engine scans for interrupted pending/retrying deliveries and re-queues them. Implements owner feedback from issue #15: > the message in the <=16KB case should have everything it needs to do > its delivery. it shouldn't touch the db until it has a success or > failure to record.
This commit is contained in:
18
README.md
18
README.md
@@ -487,8 +487,10 @@ External Service
|
|||||||
│
|
│
|
||||||
1. Look up Entrypoint by UUID
|
1. Look up Entrypoint by UUID
|
||||||
2. Capture full request as Event
|
2. Capture full request as Event
|
||||||
3. Queue Delivery to each active Target
|
3. Create Delivery records for each active Target
|
||||||
4. Notify Engine via channel
|
4. Build self-contained DeliveryTask structs
|
||||||
|
(target config + event data inline for ≤16KB)
|
||||||
|
5. Notify Engine via channel (no DB read needed)
|
||||||
│
|
│
|
||||||
▼
|
▼
|
||||||
┌──────────────┐
|
┌──────────────┐
|
||||||
@@ -660,8 +662,11 @@ Components are wired via Uber fx in this order:
|
|||||||
|
|
||||||
The server starts via `fx.Invoke(func(*server.Server, *delivery.Engine)
|
The server starts via `fx.Invoke(func(*server.Server, *delivery.Engine)
|
||||||
{})` which triggers the fx lifecycle hooks in dependency order. The
|
{})` which triggers the fx lifecycle hooks in dependency order. The
|
||||||
`DeliveryNotifier` interface allows the webhook handler to notify the
|
`DeliveryNotifier` interface allows the webhook handler to send
|
||||||
delivery engine of new work without a direct package dependency.
|
self-contained `DeliveryTask` slices to the engine without a direct
|
||||||
|
package dependency. Each task carries all target config and event data
|
||||||
|
inline (for bodies ≤16KB), so the engine can deliver without reading
|
||||||
|
from any database — it only writes to record results.
|
||||||
|
|
||||||
### Middleware Stack
|
### Middleware Stack
|
||||||
|
|
||||||
@@ -752,6 +757,11 @@ linted, tested, and compiled.
|
|||||||
creation, delete on webhook removal)
|
creation, delete on webhook removal)
|
||||||
- [x] `WebhookDBManager` component with lazy connection pooling
|
- [x] `WebhookDBManager` component with lazy connection pooling
|
||||||
- [x] Event-driven delivery engine (channel notifications + timer-based retries)
|
- [x] Event-driven delivery engine (channel notifications + timer-based retries)
|
||||||
|
- [x] Self-contained delivery tasks: in the ≤16KB happy path, the engine
|
||||||
|
delivers without reading from any database — target config, event
|
||||||
|
headers, and body are all carried inline in the channel notification.
|
||||||
|
The engine only touches the DB to record results (success/failure).
|
||||||
|
Large bodies (≥16KB) are fetched from the per-webhook DB on demand.
|
||||||
- [x] Database target type marks delivery as immediately successful
|
- [x] Database target type marks delivery as immediately successful
|
||||||
(events are already in the per-webhook DB)
|
(events are already in the per-webhook DB)
|
||||||
|
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ const (
|
|||||||
retryChannelSize = 1000
|
retryChannelSize = 1000
|
||||||
|
|
||||||
// MaxInlineBodySize is the maximum event body size that will be carried
|
// MaxInlineBodySize is the maximum event body size that will be carried
|
||||||
// inline in a Notification through the channel. Bodies at or above this
|
// inline in a DeliveryTask through the channel. Bodies at or above this
|
||||||
// size are left nil and fetched from the per-webhook database on demand.
|
// size are left nil and fetched from the per-webhook database on demand.
|
||||||
// This keeps channel buffer memory bounded under high traffic.
|
// This keeps channel buffer memory bounded under high traffic.
|
||||||
MaxInlineBodySize = 16 * 1024
|
MaxInlineBodySize = 16 * 1024
|
||||||
@@ -39,31 +39,42 @@ const (
|
|||||||
maxBodyLog = 4096
|
maxBodyLog = 4096
|
||||||
)
|
)
|
||||||
|
|
||||||
// Notification carries event data through the delivery notification channel.
|
// DeliveryTask contains everything needed to deliver an event to a single
|
||||||
// The Body field is a pointer: non-nil for payloads under MaxInlineBodySize
|
// target. In the ≤16KB happy path, Body is non-nil and the engine delivers
|
||||||
// (16 KB), nil for larger payloads. When nil, the engine fetches the body
|
// without touching any database — it trusts that the webhook handler wrote
|
||||||
// from the per-webhook database using EventID. This keeps channel buffer
|
// the records correctly. Only after a delivery attempt (success or failure)
|
||||||
// memory bounded regardless of payload sizes during high traffic.
|
// does the engine write to the DB to record the result.
|
||||||
type Notification struct {
|
//
|
||||||
WebhookID string
|
// When Body is nil (payload ≥ MaxInlineBodySize), the engine fetches the
|
||||||
EventID string
|
// body from the per-webhook database using EventID before delivering.
|
||||||
|
type DeliveryTask struct {
|
||||||
|
DeliveryID string // ID of the Delivery record (for recording results)
|
||||||
|
EventID string // Event ID (for DB lookup if body is nil)
|
||||||
|
WebhookID string // Webhook ID (for per-webhook DB access)
|
||||||
|
|
||||||
|
// Target info (from main DB, included at notification time)
|
||||||
|
TargetID string
|
||||||
|
TargetName string
|
||||||
|
TargetType database.TargetType
|
||||||
|
TargetConfig string // JSON config (URL, headers, etc.)
|
||||||
|
MaxRetries int
|
||||||
|
|
||||||
|
// Event data (inline for ≤16KB bodies)
|
||||||
Method string
|
Method string
|
||||||
Headers string
|
Headers string // JSON
|
||||||
ContentType string
|
ContentType string
|
||||||
Body *string // nil if body >= MaxInlineBodySize; fetch from DB by EventID
|
Body *string // nil if body ≥ MaxInlineBodySize; fetch from DB by EventID
|
||||||
|
|
||||||
|
// AttemptNum tracks the delivery attempt number. Set to 1 for the
|
||||||
|
// initial delivery and incremented for each retry. This avoids a DB
|
||||||
|
// query to count prior results in the hot path.
|
||||||
|
AttemptNum int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notifier is the interface for notifying the delivery engine about new
|
// Notifier is the interface for notifying the delivery engine about new
|
||||||
// deliveries. Implemented by Engine and injected into handlers.
|
// deliveries. Implemented by Engine and injected into handlers.
|
||||||
type Notifier interface {
|
type Notifier interface {
|
||||||
Notify(n Notification)
|
Notify(tasks []DeliveryTask)
|
||||||
}
|
|
||||||
|
|
||||||
// retryRequest carries the information needed to retry a specific delivery.
|
|
||||||
// Sent from timer goroutines to the engine's retry channel.
|
|
||||||
type retryRequest struct {
|
|
||||||
webhookID string
|
|
||||||
deliveryID string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// HTTPTargetConfig holds configuration for http and retry target types.
|
// HTTPTargetConfig holds configuration for http and retry target types.
|
||||||
@@ -84,13 +95,15 @@ type EngineParams struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Engine processes queued deliveries in the background using an
|
// Engine processes queued deliveries in the background using an
|
||||||
// event-driven architecture. New deliveries are signaled via a buffered
|
// event-driven architecture. New deliveries arrive as self-contained
|
||||||
// channel from the webhook handler and processed immediately. Failed
|
// DeliveryTask slices via a buffered channel from the webhook handler.
|
||||||
// deliveries that need retry are scheduled via Go timers with exponential
|
// In the happy path (body ≤ 16KB), the engine delivers without reading
|
||||||
// backoff — each timer fires into a separate retry channel when the
|
// from any database — it only writes to record results. Failed deliveries
|
||||||
// backoff period expires. The database stores delivery status for crash
|
// that need retry are scheduled via Go timers with exponential backoff;
|
||||||
// recovery only; on startup the engine scans for interrupted deliveries
|
// each timer fires into a separate retry channel carrying the full
|
||||||
// and re-queues them.
|
// DeliveryTask so retries also avoid unnecessary DB reads. The database
|
||||||
|
// stores delivery status for crash recovery only; on startup the engine
|
||||||
|
// scans for interrupted deliveries and re-queues them.
|
||||||
type Engine struct {
|
type Engine struct {
|
||||||
database *database.Database
|
database *database.Database
|
||||||
dbManager *database.WebhookDBManager
|
dbManager *database.WebhookDBManager
|
||||||
@@ -98,8 +111,8 @@ type Engine struct {
|
|||||||
client *http.Client
|
client *http.Client
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
notifyCh chan Notification
|
notifyCh chan []DeliveryTask
|
||||||
retryCh chan retryRequest
|
retryCh chan DeliveryTask
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates and registers the delivery engine with the fx lifecycle.
|
// New creates and registers the delivery engine with the fx lifecycle.
|
||||||
@@ -111,8 +124,8 @@ func New(lc fx.Lifecycle, params EngineParams) *Engine {
|
|||||||
client: &http.Client{
|
client: &http.Client{
|
||||||
Timeout: httpClientTimeout,
|
Timeout: httpClientTimeout,
|
||||||
},
|
},
|
||||||
notifyCh: make(chan Notification, notifyChannelSize),
|
notifyCh: make(chan []DeliveryTask, notifyChannelSize),
|
||||||
retryCh: make(chan retryRequest, retryChannelSize),
|
retryCh: make(chan DeliveryTask, retryChannelSize),
|
||||||
}
|
}
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
@@ -144,19 +157,17 @@ func (e *Engine) stop() {
|
|||||||
e.log.Info("delivery engine stopped")
|
e.log.Info("delivery engine stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify signals the delivery engine that new deliveries are available.
|
// Notify signals the delivery engine that new deliveries are ready.
|
||||||
// This is called by the webhook handler after creating delivery records.
|
// Called by the webhook handler after creating delivery records. Each
|
||||||
// The notification carries the event data inline (with body pointer
|
// DeliveryTask carries all data needed for delivery in the ≤16KB case.
|
||||||
// semantics for memory efficiency). The call is non-blocking; if the
|
// The call is non-blocking; if the channel is full, a warning is logged
|
||||||
// channel is full, a warning is logged and the deliveries will be
|
// and the deliveries will be recovered on the next engine restart.
|
||||||
// recovered on the next engine restart.
|
func (e *Engine) Notify(tasks []DeliveryTask) {
|
||||||
func (e *Engine) Notify(n Notification) {
|
|
||||||
select {
|
select {
|
||||||
case e.notifyCh <- n:
|
case e.notifyCh <- tasks:
|
||||||
default:
|
default:
|
||||||
e.log.Warn("delivery notification channel full, deliveries will be recovered on restart",
|
e.log.Warn("delivery notification channel full, deliveries will be recovered on restart",
|
||||||
"webhook_id", n.WebhookID,
|
"task_count", len(tasks),
|
||||||
"event_id", n.EventID,
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -174,10 +185,10 @@ func (e *Engine) run(ctx context.Context) {
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case n := <-e.notifyCh:
|
case tasks := <-e.notifyCh:
|
||||||
e.processNotification(ctx, n)
|
e.processDeliveryTasks(ctx, tasks)
|
||||||
case req := <-e.retryCh:
|
case task := <-e.retryCh:
|
||||||
e.processRetryDelivery(ctx, req)
|
e.processRetryTask(ctx, task)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -209,9 +220,9 @@ func (e *Engine) recoverInFlight(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// recoverWebhookDeliveries recovers pending and retrying deliveries for
|
// recoverWebhookDeliveries recovers pending and retrying deliveries for
|
||||||
// a single webhook. Pending deliveries are processed directly (loading
|
// a single webhook. This is the recovery path — it reads everything from
|
||||||
// event data from DB); retrying deliveries get timers scheduled based on
|
// the database since there are no in-memory notifications available after
|
||||||
// the elapsed time since the last attempt.
|
// a restart.
|
||||||
func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) {
|
func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) {
|
||||||
webhookDB, err := e.dbManager.GetDB(webhookID)
|
webhookDB, err := e.dbManager.GetDB(webhookID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -256,6 +267,28 @@ func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string)
|
|||||||
Count(&resultCount)
|
Count(&resultCount)
|
||||||
attemptNum := int(resultCount)
|
attemptNum := int(resultCount)
|
||||||
|
|
||||||
|
// Load event for this delivery
|
||||||
|
var event database.Event
|
||||||
|
if err := webhookDB.First(&event, "id = ?", d.EventID).Error; err != nil {
|
||||||
|
e.log.Error("failed to load event for retrying delivery recovery",
|
||||||
|
"delivery_id", d.ID,
|
||||||
|
"event_id", d.EventID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load target from main DB
|
||||||
|
var target database.Target
|
||||||
|
if err := e.database.DB().First(&target, "id = ?", d.TargetID).Error; err != nil {
|
||||||
|
e.log.Error("failed to load target for retrying delivery recovery",
|
||||||
|
"delivery_id", d.ID,
|
||||||
|
"target_id", d.TargetID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Calculate remaining backoff from last attempt
|
// Calculate remaining backoff from last attempt
|
||||||
remaining := time.Duration(0)
|
remaining := time.Duration(0)
|
||||||
|
|
||||||
@@ -278,6 +311,30 @@ func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Build task from DB data. Use body pointer semantics: inline
|
||||||
|
// for small bodies, nil for large ones (will be fetched on retry).
|
||||||
|
var bodyPtr *string
|
||||||
|
if len(event.Body) < MaxInlineBodySize {
|
||||||
|
bodyStr := event.Body
|
||||||
|
bodyPtr = &bodyStr
|
||||||
|
}
|
||||||
|
|
||||||
|
task := DeliveryTask{
|
||||||
|
DeliveryID: d.ID,
|
||||||
|
EventID: d.EventID,
|
||||||
|
WebhookID: webhookID,
|
||||||
|
TargetID: target.ID,
|
||||||
|
TargetName: target.Name,
|
||||||
|
TargetType: target.Type,
|
||||||
|
TargetConfig: target.Config,
|
||||||
|
MaxRetries: target.MaxRetries,
|
||||||
|
Method: event.Method,
|
||||||
|
Headers: event.Headers,
|
||||||
|
ContentType: event.ContentType,
|
||||||
|
Body: bodyPtr,
|
||||||
|
AttemptNum: attemptNum + 1,
|
||||||
|
}
|
||||||
|
|
||||||
e.log.Info("recovering retrying delivery",
|
e.log.Info("recovering retrying delivery",
|
||||||
"webhook_id", webhookID,
|
"webhook_id", webhookID,
|
||||||
"delivery_id", d.ID,
|
"delivery_id", d.ID,
|
||||||
@@ -285,42 +342,149 @@ func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string)
|
|||||||
"remaining_backoff", remaining,
|
"remaining_backoff", remaining,
|
||||||
)
|
)
|
||||||
|
|
||||||
e.scheduleRetry(webhookID, d.ID, remaining)
|
e.scheduleRetry(task, remaining)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// processNotification handles a delivery notification from the webhook
|
// processDeliveryTasks handles a batch of delivery tasks from the webhook
|
||||||
// handler. It uses the inline event data from the notification (avoiding
|
// handler. In the happy path (body ≤ 16KB), the engine delivers without
|
||||||
// a DB round-trip for the event) and only fetches the body from DB when
|
// reading from any database — it trusts the handler's inline data and
|
||||||
// it was too large to carry inline (Body pointer is nil).
|
// only touches the DB to record results. For large bodies (body > 16KB),
|
||||||
func (e *Engine) processNotification(ctx context.Context, n Notification) {
|
// the body is fetched from the per-webhook database on demand.
|
||||||
webhookDB, err := e.dbManager.GetDB(n.WebhookID)
|
func (e *Engine) processDeliveryTasks(ctx context.Context, tasks []DeliveryTask) {
|
||||||
|
if len(tasks) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// All tasks in a batch share the same webhook ID
|
||||||
|
webhookID := tasks[0].WebhookID
|
||||||
|
webhookDB, err := e.dbManager.GetDB(webhookID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error("failed to get webhook database",
|
e.log.Error("failed to get webhook database",
|
||||||
"webhook_id", n.WebhookID,
|
"webhook_id", webhookID,
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build the Event from the notification's inline data
|
// For the large-body case, we may need to fetch the event body once
|
||||||
event := database.Event{
|
// for all tasks sharing the same event. Cache it here.
|
||||||
Method: n.Method,
|
var fetchedBody *string
|
||||||
Headers: n.Headers,
|
|
||||||
ContentType: n.ContentType,
|
|
||||||
}
|
|
||||||
event.ID = n.EventID
|
|
||||||
event.WebhookID = n.WebhookID
|
|
||||||
|
|
||||||
if n.Body != nil {
|
for i := range tasks {
|
||||||
event.Body = *n.Body
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
task := &tasks[i]
|
||||||
|
|
||||||
|
// Build Event from task data
|
||||||
|
event := database.Event{
|
||||||
|
Method: task.Method,
|
||||||
|
Headers: task.Headers,
|
||||||
|
ContentType: task.ContentType,
|
||||||
|
}
|
||||||
|
event.ID = task.EventID
|
||||||
|
event.WebhookID = task.WebhookID
|
||||||
|
|
||||||
|
if task.Body != nil {
|
||||||
|
// Happy path: body inline, no DB read needed
|
||||||
|
event.Body = *task.Body
|
||||||
} else {
|
} else {
|
||||||
// Body was too large for inline transport — fetch from DB
|
// Large body path: fetch from per-webhook DB (once per batch)
|
||||||
|
if fetchedBody == nil {
|
||||||
var dbEvent database.Event
|
var dbEvent database.Event
|
||||||
if err := webhookDB.Select("body").
|
if err := webhookDB.Select("body").
|
||||||
First(&dbEvent, "id = ?", n.EventID).Error; err != nil {
|
First(&dbEvent, "id = ?", task.EventID).Error; err != nil {
|
||||||
e.log.Error("failed to fetch event body from database",
|
e.log.Error("failed to fetch event body from database",
|
||||||
"event_id", n.EventID,
|
"event_id", task.EventID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fetchedBody = &dbEvent.Body
|
||||||
|
}
|
||||||
|
event.Body = *fetchedBody
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build Target from task data (no main DB query needed)
|
||||||
|
target := database.Target{
|
||||||
|
Name: task.TargetName,
|
||||||
|
Type: task.TargetType,
|
||||||
|
Config: task.TargetConfig,
|
||||||
|
MaxRetries: task.MaxRetries,
|
||||||
|
}
|
||||||
|
target.ID = task.TargetID
|
||||||
|
|
||||||
|
// Build Delivery struct for the processing chain
|
||||||
|
d := &database.Delivery{
|
||||||
|
EventID: task.EventID,
|
||||||
|
TargetID: task.TargetID,
|
||||||
|
Status: database.DeliveryStatusPending,
|
||||||
|
Event: event,
|
||||||
|
Target: target,
|
||||||
|
}
|
||||||
|
d.ID = task.DeliveryID
|
||||||
|
|
||||||
|
e.processDelivery(ctx, webhookDB, d, task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// processRetryTask handles a single delivery task fired by a retry timer.
|
||||||
|
// The task carries all data needed for delivery (same as the initial
|
||||||
|
// notification). The only DB read is a status check to verify the delivery
|
||||||
|
// hasn't been cancelled or resolved while the timer was pending.
|
||||||
|
func (e *Engine) processRetryTask(ctx context.Context, task DeliveryTask) {
|
||||||
|
webhookDB, err := e.dbManager.GetDB(task.WebhookID)
|
||||||
|
if err != nil {
|
||||||
|
e.log.Error("failed to get webhook database for retry",
|
||||||
|
"webhook_id", task.WebhookID,
|
||||||
|
"delivery_id", task.DeliveryID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify delivery is still in retrying status (may have been
|
||||||
|
// cancelled or manually resolved while the timer was pending)
|
||||||
|
var d database.Delivery
|
||||||
|
if err := webhookDB.Select("id", "status").
|
||||||
|
First(&d, "id = ?", task.DeliveryID).Error; err != nil {
|
||||||
|
e.log.Error("failed to load delivery for retry",
|
||||||
|
"delivery_id", task.DeliveryID,
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.Status != database.DeliveryStatusRetrying {
|
||||||
|
e.log.Debug("skipping retry for delivery no longer in retrying status",
|
||||||
|
"delivery_id", d.ID,
|
||||||
|
"status", d.Status,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build Event from task data
|
||||||
|
event := database.Event{
|
||||||
|
Method: task.Method,
|
||||||
|
Headers: task.Headers,
|
||||||
|
ContentType: task.ContentType,
|
||||||
|
}
|
||||||
|
event.ID = task.EventID
|
||||||
|
event.WebhookID = task.WebhookID
|
||||||
|
|
||||||
|
if task.Body != nil {
|
||||||
|
event.Body = *task.Body
|
||||||
|
} else {
|
||||||
|
// Large body: fetch from per-webhook DB
|
||||||
|
var dbEvent database.Event
|
||||||
|
if err := webhookDB.Select("body").
|
||||||
|
First(&dbEvent, "id = ?", task.EventID).Error; err != nil {
|
||||||
|
e.log.Error("failed to fetch event body for retry",
|
||||||
|
"event_id", task.EventID,
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
@@ -328,69 +492,27 @@ func (e *Engine) processNotification(ctx context.Context, n Notification) {
|
|||||||
event.Body = dbEvent.Body
|
event.Body = dbEvent.Body
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query pending deliveries for this specific event
|
// Build Target from task data
|
||||||
var deliveries []database.Delivery
|
target := database.Target{
|
||||||
result := webhookDB.
|
Name: task.TargetName,
|
||||||
Where("event_id = ? AND status = ?", n.EventID, database.DeliveryStatusPending).
|
Type: task.TargetType,
|
||||||
Find(&deliveries)
|
Config: task.TargetConfig,
|
||||||
|
MaxRetries: task.MaxRetries,
|
||||||
|
}
|
||||||
|
target.ID = task.TargetID
|
||||||
|
|
||||||
if result.Error != nil {
|
// Populate the delivery with event and target for processing
|
||||||
e.log.Error("failed to query pending deliveries",
|
d.EventID = task.EventID
|
||||||
"webhook_id", n.WebhookID,
|
d.TargetID = task.TargetID
|
||||||
"event_id", n.EventID,
|
d.Event = event
|
||||||
"error", result.Error,
|
d.Target = target
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(deliveries) == 0 {
|
e.processDelivery(ctx, webhookDB, &d, &task)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect unique target IDs and load targets from the main DB
|
|
||||||
seen := make(map[string]bool)
|
|
||||||
targetIDs := make([]string, 0, len(deliveries))
|
|
||||||
for _, d := range deliveries {
|
|
||||||
if !seen[d.TargetID] {
|
|
||||||
targetIDs = append(targetIDs, d.TargetID)
|
|
||||||
seen[d.TargetID] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var targets []database.Target
|
|
||||||
if err := e.database.DB().Where("id IN ?", targetIDs).Find(&targets).Error; err != nil {
|
|
||||||
e.log.Error("failed to load targets from main DB", "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
targetMap := make(map[string]database.Target, len(targets))
|
|
||||||
for _, t := range targets {
|
|
||||||
targetMap[t.ID] = t
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range deliveries {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
target, ok := targetMap[deliveries[i].TargetID]
|
|
||||||
if !ok {
|
|
||||||
e.log.Error("target not found for delivery",
|
|
||||||
"delivery_id", deliveries[i].ID,
|
|
||||||
"target_id", deliveries[i].TargetID,
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
deliveries[i].Event = event
|
|
||||||
deliveries[i].Target = target
|
|
||||||
e.processDelivery(ctx, webhookDB, &deliveries[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// processWebhookPendingDeliveries queries a single webhook's database for
|
// processWebhookPendingDeliveries queries a single webhook's database for
|
||||||
// all pending deliveries and processes them. Used for crash recovery where
|
// all pending deliveries and processes them. Used for crash recovery where
|
||||||
// we don't have inline event data — everything is loaded from the DB.
|
// we don't have in-memory notifications — everything is loaded from the DB.
|
||||||
func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID string) {
|
func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID string) {
|
||||||
webhookDB, err := e.dbManager.GetDB(webhookID)
|
webhookDB, err := e.dbManager.GetDB(webhookID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -454,90 +576,59 @@ func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
deliveries[i].Target = target
|
deliveries[i].Target = target
|
||||||
e.processDelivery(ctx, webhookDB, &deliveries[i])
|
|
||||||
}
|
// Build task from DB data for the recovery path
|
||||||
}
|
bodyStr := deliveries[i].Event.Body
|
||||||
|
task := &DeliveryTask{
|
||||||
|
DeliveryID: deliveries[i].ID,
|
||||||
|
EventID: deliveries[i].EventID,
|
||||||
|
WebhookID: webhookID,
|
||||||
|
TargetID: target.ID,
|
||||||
|
TargetName: target.Name,
|
||||||
|
TargetType: target.Type,
|
||||||
|
TargetConfig: target.Config,
|
||||||
|
MaxRetries: target.MaxRetries,
|
||||||
|
Method: deliveries[i].Event.Method,
|
||||||
|
Headers: deliveries[i].Event.Headers,
|
||||||
|
ContentType: deliveries[i].Event.ContentType,
|
||||||
|
Body: &bodyStr,
|
||||||
|
AttemptNum: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
// processRetryDelivery handles a single retry delivery triggered by a
|
e.processDelivery(ctx, webhookDB, &deliveries[i], task)
|
||||||
// backoff timer. It loads the delivery and target from the database and
|
|
||||||
// re-attempts delivery.
|
|
||||||
func (e *Engine) processRetryDelivery(ctx context.Context, req retryRequest) {
|
|
||||||
webhookDB, err := e.dbManager.GetDB(req.webhookID)
|
|
||||||
if err != nil {
|
|
||||||
e.log.Error("failed to get webhook database for retry",
|
|
||||||
"webhook_id", req.webhookID,
|
|
||||||
"delivery_id", req.deliveryID,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var d database.Delivery
|
|
||||||
if err := webhookDB.Preload("Event").
|
|
||||||
First(&d, "id = ?", req.deliveryID).Error; err != nil {
|
|
||||||
e.log.Error("failed to load delivery for retry",
|
|
||||||
"delivery_id", req.deliveryID,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify delivery is still in retrying status (may have been
|
|
||||||
// cancelled or manually resolved while the timer was pending)
|
|
||||||
if d.Status != database.DeliveryStatusRetrying {
|
|
||||||
e.log.Debug("skipping retry for delivery no longer in retrying status",
|
|
||||||
"delivery_id", d.ID,
|
|
||||||
"status", d.Status,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load target from main DB
|
|
||||||
var target database.Target
|
|
||||||
if err := e.database.DB().First(&target, "id = ?", d.TargetID).Error; err != nil {
|
|
||||||
e.log.Error("failed to load target for retry",
|
|
||||||
"delivery_id", d.ID,
|
|
||||||
"target_id", d.TargetID,
|
|
||||||
"error", err,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
d.Target = target
|
|
||||||
|
|
||||||
e.processDelivery(ctx, webhookDB, &d)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// scheduleRetry creates a Go timer that fires after the given delay and
|
// scheduleRetry creates a Go timer that fires after the given delay and
|
||||||
// sends a retry request to the engine's retry channel. This is the
|
// sends the full DeliveryTask to the engine's retry channel. The task
|
||||||
// mechanism for exponential backoff — no periodic DB scanning needed.
|
// carries all data needed for the retry attempt, so when it fires, the
|
||||||
func (e *Engine) scheduleRetry(webhookID, deliveryID string, delay time.Duration) {
|
// engine can deliver without reading event or target data from the DB.
|
||||||
|
func (e *Engine) scheduleRetry(task DeliveryTask, delay time.Duration) {
|
||||||
e.log.Debug("scheduling delivery retry",
|
e.log.Debug("scheduling delivery retry",
|
||||||
"webhook_id", webhookID,
|
"webhook_id", task.WebhookID,
|
||||||
"delivery_id", deliveryID,
|
"delivery_id", task.DeliveryID,
|
||||||
"delay", delay,
|
"delay", delay,
|
||||||
|
"next_attempt", task.AttemptNum,
|
||||||
)
|
)
|
||||||
|
|
||||||
time.AfterFunc(delay, func() {
|
time.AfterFunc(delay, func() {
|
||||||
select {
|
select {
|
||||||
case e.retryCh <- retryRequest{
|
case e.retryCh <- task:
|
||||||
webhookID: webhookID,
|
|
||||||
deliveryID: deliveryID,
|
|
||||||
}:
|
|
||||||
default:
|
default:
|
||||||
e.log.Warn("retry channel full, delivery will be recovered on restart",
|
e.log.Warn("retry channel full, delivery will be recovered on restart",
|
||||||
"delivery_id", deliveryID,
|
"delivery_id", task.DeliveryID,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *database.Delivery) {
|
func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) {
|
||||||
switch d.Target.Type {
|
switch d.Target.Type {
|
||||||
case database.TargetTypeHTTP:
|
case database.TargetTypeHTTP:
|
||||||
e.deliverHTTP(ctx, webhookDB, d)
|
e.deliverHTTP(ctx, webhookDB, d)
|
||||||
case database.TargetTypeRetry:
|
case database.TargetTypeRetry:
|
||||||
e.deliverRetry(ctx, webhookDB, d)
|
e.deliverRetry(ctx, webhookDB, d, task)
|
||||||
case database.TargetTypeDatabase:
|
case database.TargetTypeDatabase:
|
||||||
e.deliverDatabase(webhookDB, d)
|
e.deliverDatabase(webhookDB, d)
|
||||||
case database.TargetTypeLog:
|
case database.TargetTypeLog:
|
||||||
@@ -580,22 +671,19 @@ func (e *Engine) deliverHTTP(_ context.Context, webhookDB *gorm.DB, d *database.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database.Delivery) {
|
func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) {
|
||||||
cfg, err := e.parseHTTPConfig(d.Target.Config)
|
cfg, err := e.parseHTTPConfig(d.Target.Config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error("invalid retry target config",
|
e.log.Error("invalid retry target config",
|
||||||
"target_id", d.TargetID,
|
"target_id", d.TargetID,
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
e.recordResult(webhookDB, d, 1, false, 0, "", err.Error(), 0)
|
e.recordResult(webhookDB, d, task.AttemptNum, false, 0, "", err.Error(), 0)
|
||||||
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed)
|
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine attempt number from existing results (in per-webhook DB)
|
attemptNum := task.AttemptNum
|
||||||
var resultCount int64
|
|
||||||
webhookDB.Model(&database.DeliveryResult{}).Where("delivery_id = ?", d.ID).Count(&resultCount)
|
|
||||||
attemptNum := int(resultCount) + 1
|
|
||||||
|
|
||||||
// Attempt delivery immediately — backoff is handled by the timer
|
// Attempt delivery immediately — backoff is handled by the timer
|
||||||
// that triggered this call, not by polling.
|
// that triggered this call, not by polling.
|
||||||
@@ -625,14 +713,17 @@ func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database
|
|||||||
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying)
|
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying)
|
||||||
|
|
||||||
// Schedule a timer for the next retry with exponential backoff.
|
// Schedule a timer for the next retry with exponential backoff.
|
||||||
// The timer will fire and send a retryRequest to the engine's
|
// The timer fires a DeliveryTask into the retry channel carrying
|
||||||
// retry channel, which triggers processRetryDelivery.
|
// all data needed for the next attempt.
|
||||||
shift := attemptNum - 1
|
shift := attemptNum - 1
|
||||||
if shift > 30 {
|
if shift > 30 {
|
||||||
shift = 30
|
shift = 30
|
||||||
}
|
}
|
||||||
backoff := time.Duration(1<<uint(shift)) * time.Second //nolint:gosec // bounded above
|
backoff := time.Duration(1<<uint(shift)) * time.Second //nolint:gosec // bounded above
|
||||||
e.scheduleRetry(d.Event.WebhookID, d.ID, backoff)
|
|
||||||
|
retryTask := *task
|
||||||
|
retryTask.AttemptNum = attemptNum + 1
|
||||||
|
e.scheduleRetry(retryTask, backoff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ import (
|
|||||||
// noopNotifier is a no-op delivery.Notifier for tests.
|
// noopNotifier is a no-op delivery.Notifier for tests.
|
||||||
type noopNotifier struct{}
|
type noopNotifier struct{}
|
||||||
|
|
||||||
func (n *noopNotifier) Notify(delivery.Notification) {}
|
func (n *noopNotifier) Notify([]delivery.DeliveryTask) {}
|
||||||
|
|
||||||
func TestHandleIndex(t *testing.T) {
|
func TestHandleIndex(t *testing.T) {
|
||||||
var h *Handlers
|
var h *Handlers
|
||||||
|
|||||||
@@ -17,7 +17,9 @@ const (
|
|||||||
|
|
||||||
// HandleWebhook handles incoming webhook requests at entrypoint URLs.
|
// HandleWebhook handles incoming webhook requests at entrypoint URLs.
|
||||||
// Only POST requests are accepted; all other methods return 405 Method Not Allowed.
|
// Only POST requests are accepted; all other methods return 405 Method Not Allowed.
|
||||||
// Events and deliveries are stored in the per-webhook database.
|
// Events and deliveries are stored in the per-webhook database. The handler
|
||||||
|
// builds self-contained DeliveryTask structs with all target and event data
|
||||||
|
// so the delivery engine can process them without additional DB reads.
|
||||||
func (h *Handlers) HandleWebhook() http.HandlerFunc {
|
func (h *Handlers) HandleWebhook() http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
if r.Method != http.MethodPost {
|
if r.Method != http.MethodPost {
|
||||||
@@ -116,7 +118,16 @@ func (h *Handlers) HandleWebhook() http.HandlerFunc {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create delivery records for each active target
|
// Prepare body pointer for inline transport (≤16KB bodies are
|
||||||
|
// included in the DeliveryTask so the engine needs no DB read).
|
||||||
|
var bodyPtr *string
|
||||||
|
if len(body) < delivery.MaxInlineBodySize {
|
||||||
|
bodyStr := string(body)
|
||||||
|
bodyPtr = &bodyStr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create delivery records and build self-contained delivery tasks
|
||||||
|
tasks := make([]delivery.DeliveryTask, 0, len(targets))
|
||||||
for i := range targets {
|
for i := range targets {
|
||||||
dlv := &database.Delivery{
|
dlv := &database.Delivery{
|
||||||
EventID: event.ID,
|
EventID: event.ID,
|
||||||
@@ -132,6 +143,22 @@ func (h *Handlers) HandleWebhook() http.HandlerFunc {
|
|||||||
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tasks = append(tasks, delivery.DeliveryTask{
|
||||||
|
DeliveryID: dlv.ID,
|
||||||
|
EventID: event.ID,
|
||||||
|
WebhookID: entrypoint.WebhookID,
|
||||||
|
TargetID: targets[i].ID,
|
||||||
|
TargetName: targets[i].Name,
|
||||||
|
TargetType: targets[i].Type,
|
||||||
|
TargetConfig: targets[i].Config,
|
||||||
|
MaxRetries: targets[i].MaxRetries,
|
||||||
|
Method: event.Method,
|
||||||
|
Headers: event.Headers,
|
||||||
|
ContentType: event.ContentType,
|
||||||
|
Body: bodyPtr,
|
||||||
|
AttemptNum: 1,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tx.Commit().Error; err != nil {
|
if err := tx.Commit().Error; err != nil {
|
||||||
@@ -140,22 +167,14 @@ func (h *Handlers) HandleWebhook() http.HandlerFunc {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify the delivery engine with inline event data so it can
|
// Notify the delivery engine with self-contained delivery tasks.
|
||||||
// process deliveries immediately without a DB round-trip.
|
// Each task carries all target config and event data inline so
|
||||||
// Large bodies (>= 16KB) are left nil to keep channel memory
|
// the engine can deliver without touching any database (in the
|
||||||
// bounded; the engine fetches them from DB on demand.
|
// ≤16KB happy path). The engine only writes to the DB to record
|
||||||
n := delivery.Notification{
|
// delivery results after each attempt.
|
||||||
WebhookID: entrypoint.WebhookID,
|
if len(tasks) > 0 {
|
||||||
EventID: event.ID,
|
h.notifier.Notify(tasks)
|
||||||
Method: event.Method,
|
|
||||||
Headers: event.Headers,
|
|
||||||
ContentType: event.ContentType,
|
|
||||||
}
|
}
|
||||||
bodyStr := string(body)
|
|
||||||
if len(body) < delivery.MaxInlineBodySize {
|
|
||||||
n.Body = &bodyStr
|
|
||||||
}
|
|
||||||
h.notifier.Notify(n)
|
|
||||||
|
|
||||||
h.log.Info("webhook event created",
|
h.log.Info("webhook event created",
|
||||||
"event_id", event.ID,
|
"event_id", event.ID,
|
||||||
|
|||||||
Reference in New Issue
Block a user