From 32bd40b313c4344901145bd57a2a5ffad02858fa Mon Sep 17 00:00:00 2001 From: clawbot Date: Sun, 1 Mar 2026 22:09:41 -0800 Subject: [PATCH] =?UTF-8?q?refactor:=20self-contained=20delivery=20tasks?= =?UTF-8?q?=20=E2=80=94=20engine=20delivers=20without=20DB=20reads=20in=20?= =?UTF-8?q?happy=20path?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The webhook handler now builds DeliveryTask structs carrying all target config and event data inline (for bodies ≤16KB) and sends them through the delivery channel. In the happy path, the engine delivers without reading from any database — it only writes to record delivery results. For large bodies (≥16KB), Body is nil and the engine fetches it from the per-webhook database on demand. Retry timers also carry the full DeliveryTask, so retries avoid unnecessary DB reads. The database is used for crash recovery only: on startup the engine scans for interrupted pending/retrying deliveries and re-queues them. Implements owner feedback from issue #15: > the message in the <=16KB case should have everything it needs to do > its delivery. it shouldn't touch the db until it has a success or > failure to record. --- README.md | 18 +- internal/delivery/engine.go | 483 +++++++++++++++++------------ internal/handlers/handlers_test.go | 2 +- internal/handlers/webhook.go | 53 +++- 4 files changed, 338 insertions(+), 218 deletions(-) diff --git a/README.md b/README.md index 6c29a3f..4c64d7f 100644 --- a/README.md +++ b/README.md @@ -487,8 +487,10 @@ External Service │ 1. Look up Entrypoint by UUID 2. Capture full request as Event - 3. Queue Delivery to each active Target - 4. Notify Engine via channel + 3. Create Delivery records for each active Target + 4. Build self-contained DeliveryTask structs + (target config + event data inline for ≤16KB) + 5. Notify Engine via channel (no DB read needed) │ ▼ ┌──────────────┐ @@ -660,8 +662,11 @@ Components are wired via Uber fx in this order: The server starts via `fx.Invoke(func(*server.Server, *delivery.Engine) {})` which triggers the fx lifecycle hooks in dependency order. The -`DeliveryNotifier` interface allows the webhook handler to notify the -delivery engine of new work without a direct package dependency. +`DeliveryNotifier` interface allows the webhook handler to send +self-contained `DeliveryTask` slices to the engine without a direct +package dependency. Each task carries all target config and event data +inline (for bodies ≤16KB), so the engine can deliver without reading +from any database — it only writes to record results. ### Middleware Stack @@ -752,6 +757,11 @@ linted, tested, and compiled. creation, delete on webhook removal) - [x] `WebhookDBManager` component with lazy connection pooling - [x] Event-driven delivery engine (channel notifications + timer-based retries) +- [x] Self-contained delivery tasks: in the ≤16KB happy path, the engine + delivers without reading from any database — target config, event + headers, and body are all carried inline in the channel notification. + The engine only touches the DB to record results (success/failure). + Large bodies (≥16KB) are fetched from the per-webhook DB on demand. - [x] Database target type marks delivery as immediately successful (events are already in the per-webhook DB) diff --git a/internal/delivery/engine.go b/internal/delivery/engine.go index dffc053..58d9293 100644 --- a/internal/delivery/engine.go +++ b/internal/delivery/engine.go @@ -27,7 +27,7 @@ const ( retryChannelSize = 1000 // MaxInlineBodySize is the maximum event body size that will be carried - // inline in a Notification through the channel. Bodies at or above this + // inline in a DeliveryTask through the channel. Bodies at or above this // size are left nil and fetched from the per-webhook database on demand. // This keeps channel buffer memory bounded under high traffic. MaxInlineBodySize = 16 * 1024 @@ -39,31 +39,42 @@ const ( maxBodyLog = 4096 ) -// Notification carries event data through the delivery notification channel. -// The Body field is a pointer: non-nil for payloads under MaxInlineBodySize -// (16 KB), nil for larger payloads. When nil, the engine fetches the body -// from the per-webhook database using EventID. This keeps channel buffer -// memory bounded regardless of payload sizes during high traffic. -type Notification struct { - WebhookID string - EventID string +// DeliveryTask contains everything needed to deliver an event to a single +// target. In the ≤16KB happy path, Body is non-nil and the engine delivers +// without touching any database — it trusts that the webhook handler wrote +// the records correctly. Only after a delivery attempt (success or failure) +// does the engine write to the DB to record the result. +// +// When Body is nil (payload ≥ MaxInlineBodySize), the engine fetches the +// body from the per-webhook database using EventID before delivering. +type DeliveryTask struct { + DeliveryID string // ID of the Delivery record (for recording results) + EventID string // Event ID (for DB lookup if body is nil) + WebhookID string // Webhook ID (for per-webhook DB access) + + // Target info (from main DB, included at notification time) + TargetID string + TargetName string + TargetType database.TargetType + TargetConfig string // JSON config (URL, headers, etc.) + MaxRetries int + + // Event data (inline for ≤16KB bodies) Method string - Headers string + Headers string // JSON ContentType string - Body *string // nil if body >= MaxInlineBodySize; fetch from DB by EventID + Body *string // nil if body ≥ MaxInlineBodySize; fetch from DB by EventID + + // AttemptNum tracks the delivery attempt number. Set to 1 for the + // initial delivery and incremented for each retry. This avoids a DB + // query to count prior results in the hot path. + AttemptNum int } // Notifier is the interface for notifying the delivery engine about new // deliveries. Implemented by Engine and injected into handlers. type Notifier interface { - Notify(n Notification) -} - -// retryRequest carries the information needed to retry a specific delivery. -// Sent from timer goroutines to the engine's retry channel. -type retryRequest struct { - webhookID string - deliveryID string + Notify(tasks []DeliveryTask) } // HTTPTargetConfig holds configuration for http and retry target types. @@ -84,13 +95,15 @@ type EngineParams struct { } // Engine processes queued deliveries in the background using an -// event-driven architecture. New deliveries are signaled via a buffered -// channel from the webhook handler and processed immediately. Failed -// deliveries that need retry are scheduled via Go timers with exponential -// backoff — each timer fires into a separate retry channel when the -// backoff period expires. The database stores delivery status for crash -// recovery only; on startup the engine scans for interrupted deliveries -// and re-queues them. +// event-driven architecture. New deliveries arrive as self-contained +// DeliveryTask slices via a buffered channel from the webhook handler. +// In the happy path (body ≤ 16KB), the engine delivers without reading +// from any database — it only writes to record results. Failed deliveries +// that need retry are scheduled via Go timers with exponential backoff; +// each timer fires into a separate retry channel carrying the full +// DeliveryTask so retries also avoid unnecessary DB reads. The database +// stores delivery status for crash recovery only; on startup the engine +// scans for interrupted deliveries and re-queues them. type Engine struct { database *database.Database dbManager *database.WebhookDBManager @@ -98,8 +111,8 @@ type Engine struct { client *http.Client cancel context.CancelFunc wg sync.WaitGroup - notifyCh chan Notification - retryCh chan retryRequest + notifyCh chan []DeliveryTask + retryCh chan DeliveryTask } // New creates and registers the delivery engine with the fx lifecycle. @@ -111,8 +124,8 @@ func New(lc fx.Lifecycle, params EngineParams) *Engine { client: &http.Client{ Timeout: httpClientTimeout, }, - notifyCh: make(chan Notification, notifyChannelSize), - retryCh: make(chan retryRequest, retryChannelSize), + notifyCh: make(chan []DeliveryTask, notifyChannelSize), + retryCh: make(chan DeliveryTask, retryChannelSize), } lc.Append(fx.Hook{ @@ -144,19 +157,17 @@ func (e *Engine) stop() { e.log.Info("delivery engine stopped") } -// Notify signals the delivery engine that new deliveries are available. -// This is called by the webhook handler after creating delivery records. -// The notification carries the event data inline (with body pointer -// semantics for memory efficiency). The call is non-blocking; if the -// channel is full, a warning is logged and the deliveries will be -// recovered on the next engine restart. -func (e *Engine) Notify(n Notification) { +// Notify signals the delivery engine that new deliveries are ready. +// Called by the webhook handler after creating delivery records. Each +// DeliveryTask carries all data needed for delivery in the ≤16KB case. +// The call is non-blocking; if the channel is full, a warning is logged +// and the deliveries will be recovered on the next engine restart. +func (e *Engine) Notify(tasks []DeliveryTask) { select { - case e.notifyCh <- n: + case e.notifyCh <- tasks: default: e.log.Warn("delivery notification channel full, deliveries will be recovered on restart", - "webhook_id", n.WebhookID, - "event_id", n.EventID, + "task_count", len(tasks), ) } } @@ -174,10 +185,10 @@ func (e *Engine) run(ctx context.Context) { select { case <-ctx.Done(): return - case n := <-e.notifyCh: - e.processNotification(ctx, n) - case req := <-e.retryCh: - e.processRetryDelivery(ctx, req) + case tasks := <-e.notifyCh: + e.processDeliveryTasks(ctx, tasks) + case task := <-e.retryCh: + e.processRetryTask(ctx, task) } } } @@ -209,9 +220,9 @@ func (e *Engine) recoverInFlight(ctx context.Context) { } // recoverWebhookDeliveries recovers pending and retrying deliveries for -// a single webhook. Pending deliveries are processed directly (loading -// event data from DB); retrying deliveries get timers scheduled based on -// the elapsed time since the last attempt. +// a single webhook. This is the recovery path — it reads everything from +// the database since there are no in-memory notifications available after +// a restart. func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) { webhookDB, err := e.dbManager.GetDB(webhookID) if err != nil { @@ -256,6 +267,28 @@ func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) Count(&resultCount) attemptNum := int(resultCount) + // Load event for this delivery + var event database.Event + if err := webhookDB.First(&event, "id = ?", d.EventID).Error; err != nil { + e.log.Error("failed to load event for retrying delivery recovery", + "delivery_id", d.ID, + "event_id", d.EventID, + "error", err, + ) + continue + } + + // Load target from main DB + var target database.Target + if err := e.database.DB().First(&target, "id = ?", d.TargetID).Error; err != nil { + e.log.Error("failed to load target for retrying delivery recovery", + "delivery_id", d.ID, + "target_id", d.TargetID, + "error", err, + ) + continue + } + // Calculate remaining backoff from last attempt remaining := time.Duration(0) @@ -278,6 +311,30 @@ func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) } } + // Build task from DB data. Use body pointer semantics: inline + // for small bodies, nil for large ones (will be fetched on retry). + var bodyPtr *string + if len(event.Body) < MaxInlineBodySize { + bodyStr := event.Body + bodyPtr = &bodyStr + } + + task := DeliveryTask{ + DeliveryID: d.ID, + EventID: d.EventID, + WebhookID: webhookID, + TargetID: target.ID, + TargetName: target.Name, + TargetType: target.Type, + TargetConfig: target.Config, + MaxRetries: target.MaxRetries, + Method: event.Method, + Headers: event.Headers, + ContentType: event.ContentType, + Body: bodyPtr, + AttemptNum: attemptNum + 1, + } + e.log.Info("recovering retrying delivery", "webhook_id", webhookID, "delivery_id", d.ID, @@ -285,42 +342,149 @@ func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) "remaining_backoff", remaining, ) - e.scheduleRetry(webhookID, d.ID, remaining) + e.scheduleRetry(task, remaining) } } -// processNotification handles a delivery notification from the webhook -// handler. It uses the inline event data from the notification (avoiding -// a DB round-trip for the event) and only fetches the body from DB when -// it was too large to carry inline (Body pointer is nil). -func (e *Engine) processNotification(ctx context.Context, n Notification) { - webhookDB, err := e.dbManager.GetDB(n.WebhookID) +// processDeliveryTasks handles a batch of delivery tasks from the webhook +// handler. In the happy path (body ≤ 16KB), the engine delivers without +// reading from any database — it trusts the handler's inline data and +// only touches the DB to record results. For large bodies (body > 16KB), +// the body is fetched from the per-webhook database on demand. +func (e *Engine) processDeliveryTasks(ctx context.Context, tasks []DeliveryTask) { + if len(tasks) == 0 { + return + } + + // All tasks in a batch share the same webhook ID + webhookID := tasks[0].WebhookID + webhookDB, err := e.dbManager.GetDB(webhookID) if err != nil { e.log.Error("failed to get webhook database", - "webhook_id", n.WebhookID, + "webhook_id", webhookID, "error", err, ) return } - // Build the Event from the notification's inline data - event := database.Event{ - Method: n.Method, - Headers: n.Headers, - ContentType: n.ContentType, - } - event.ID = n.EventID - event.WebhookID = n.WebhookID + // For the large-body case, we may need to fetch the event body once + // for all tasks sharing the same event. Cache it here. + var fetchedBody *string - if n.Body != nil { - event.Body = *n.Body + for i := range tasks { + select { + case <-ctx.Done(): + return + default: + } + + task := &tasks[i] + + // Build Event from task data + event := database.Event{ + Method: task.Method, + Headers: task.Headers, + ContentType: task.ContentType, + } + event.ID = task.EventID + event.WebhookID = task.WebhookID + + if task.Body != nil { + // Happy path: body inline, no DB read needed + event.Body = *task.Body + } else { + // Large body path: fetch from per-webhook DB (once per batch) + if fetchedBody == nil { + var dbEvent database.Event + if err := webhookDB.Select("body"). + First(&dbEvent, "id = ?", task.EventID).Error; err != nil { + e.log.Error("failed to fetch event body from database", + "event_id", task.EventID, + "error", err, + ) + continue + } + fetchedBody = &dbEvent.Body + } + event.Body = *fetchedBody + } + + // Build Target from task data (no main DB query needed) + target := database.Target{ + Name: task.TargetName, + Type: task.TargetType, + Config: task.TargetConfig, + MaxRetries: task.MaxRetries, + } + target.ID = task.TargetID + + // Build Delivery struct for the processing chain + d := &database.Delivery{ + EventID: task.EventID, + TargetID: task.TargetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: target, + } + d.ID = task.DeliveryID + + e.processDelivery(ctx, webhookDB, d, task) + } +} + +// processRetryTask handles a single delivery task fired by a retry timer. +// The task carries all data needed for delivery (same as the initial +// notification). The only DB read is a status check to verify the delivery +// hasn't been cancelled or resolved while the timer was pending. +func (e *Engine) processRetryTask(ctx context.Context, task DeliveryTask) { + webhookDB, err := e.dbManager.GetDB(task.WebhookID) + if err != nil { + e.log.Error("failed to get webhook database for retry", + "webhook_id", task.WebhookID, + "delivery_id", task.DeliveryID, + "error", err, + ) + return + } + + // Verify delivery is still in retrying status (may have been + // cancelled or manually resolved while the timer was pending) + var d database.Delivery + if err := webhookDB.Select("id", "status"). + First(&d, "id = ?", task.DeliveryID).Error; err != nil { + e.log.Error("failed to load delivery for retry", + "delivery_id", task.DeliveryID, + "error", err, + ) + return + } + + if d.Status != database.DeliveryStatusRetrying { + e.log.Debug("skipping retry for delivery no longer in retrying status", + "delivery_id", d.ID, + "status", d.Status, + ) + return + } + + // Build Event from task data + event := database.Event{ + Method: task.Method, + Headers: task.Headers, + ContentType: task.ContentType, + } + event.ID = task.EventID + event.WebhookID = task.WebhookID + + if task.Body != nil { + event.Body = *task.Body } else { - // Body was too large for inline transport — fetch from DB + // Large body: fetch from per-webhook DB var dbEvent database.Event if err := webhookDB.Select("body"). - First(&dbEvent, "id = ?", n.EventID).Error; err != nil { - e.log.Error("failed to fetch event body from database", - "event_id", n.EventID, + First(&dbEvent, "id = ?", task.EventID).Error; err != nil { + e.log.Error("failed to fetch event body for retry", + "event_id", task.EventID, "error", err, ) return @@ -328,69 +492,27 @@ func (e *Engine) processNotification(ctx context.Context, n Notification) { event.Body = dbEvent.Body } - // Query pending deliveries for this specific event - var deliveries []database.Delivery - result := webhookDB. - Where("event_id = ? AND status = ?", n.EventID, database.DeliveryStatusPending). - Find(&deliveries) - - if result.Error != nil { - e.log.Error("failed to query pending deliveries", - "webhook_id", n.WebhookID, - "event_id", n.EventID, - "error", result.Error, - ) - return + // Build Target from task data + target := database.Target{ + Name: task.TargetName, + Type: task.TargetType, + Config: task.TargetConfig, + MaxRetries: task.MaxRetries, } + target.ID = task.TargetID - if len(deliveries) == 0 { - return - } + // Populate the delivery with event and target for processing + d.EventID = task.EventID + d.TargetID = task.TargetID + d.Event = event + d.Target = target - // Collect unique target IDs and load targets from the main DB - seen := make(map[string]bool) - targetIDs := make([]string, 0, len(deliveries)) - for _, d := range deliveries { - if !seen[d.TargetID] { - targetIDs = append(targetIDs, d.TargetID) - seen[d.TargetID] = true - } - } - - var targets []database.Target - if err := e.database.DB().Where("id IN ?", targetIDs).Find(&targets).Error; err != nil { - e.log.Error("failed to load targets from main DB", "error", err) - return - } - - targetMap := make(map[string]database.Target, len(targets)) - for _, t := range targets { - targetMap[t.ID] = t - } - - for i := range deliveries { - select { - case <-ctx.Done(): - return - default: - target, ok := targetMap[deliveries[i].TargetID] - if !ok { - e.log.Error("target not found for delivery", - "delivery_id", deliveries[i].ID, - "target_id", deliveries[i].TargetID, - ) - continue - } - deliveries[i].Event = event - deliveries[i].Target = target - e.processDelivery(ctx, webhookDB, &deliveries[i]) - } - } + e.processDelivery(ctx, webhookDB, &d, &task) } // processWebhookPendingDeliveries queries a single webhook's database for // all pending deliveries and processes them. Used for crash recovery where -// we don't have inline event data — everything is loaded from the DB. +// we don't have in-memory notifications — everything is loaded from the DB. func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID string) { webhookDB, err := e.dbManager.GetDB(webhookID) if err != nil { @@ -454,90 +576,59 @@ func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID continue } deliveries[i].Target = target - e.processDelivery(ctx, webhookDB, &deliveries[i]) + + // Build task from DB data for the recovery path + bodyStr := deliveries[i].Event.Body + task := &DeliveryTask{ + DeliveryID: deliveries[i].ID, + EventID: deliveries[i].EventID, + WebhookID: webhookID, + TargetID: target.ID, + TargetName: target.Name, + TargetType: target.Type, + TargetConfig: target.Config, + MaxRetries: target.MaxRetries, + Method: deliveries[i].Event.Method, + Headers: deliveries[i].Event.Headers, + ContentType: deliveries[i].Event.ContentType, + Body: &bodyStr, + AttemptNum: 1, + } + + e.processDelivery(ctx, webhookDB, &deliveries[i], task) } } } -// processRetryDelivery handles a single retry delivery triggered by a -// backoff timer. It loads the delivery and target from the database and -// re-attempts delivery. -func (e *Engine) processRetryDelivery(ctx context.Context, req retryRequest) { - webhookDB, err := e.dbManager.GetDB(req.webhookID) - if err != nil { - e.log.Error("failed to get webhook database for retry", - "webhook_id", req.webhookID, - "delivery_id", req.deliveryID, - "error", err, - ) - return - } - - var d database.Delivery - if err := webhookDB.Preload("Event"). - First(&d, "id = ?", req.deliveryID).Error; err != nil { - e.log.Error("failed to load delivery for retry", - "delivery_id", req.deliveryID, - "error", err, - ) - return - } - - // Verify delivery is still in retrying status (may have been - // cancelled or manually resolved while the timer was pending) - if d.Status != database.DeliveryStatusRetrying { - e.log.Debug("skipping retry for delivery no longer in retrying status", - "delivery_id", d.ID, - "status", d.Status, - ) - return - } - - // Load target from main DB - var target database.Target - if err := e.database.DB().First(&target, "id = ?", d.TargetID).Error; err != nil { - e.log.Error("failed to load target for retry", - "delivery_id", d.ID, - "target_id", d.TargetID, - "error", err, - ) - return - } - d.Target = target - - e.processDelivery(ctx, webhookDB, &d) -} - // scheduleRetry creates a Go timer that fires after the given delay and -// sends a retry request to the engine's retry channel. This is the -// mechanism for exponential backoff — no periodic DB scanning needed. -func (e *Engine) scheduleRetry(webhookID, deliveryID string, delay time.Duration) { +// sends the full DeliveryTask to the engine's retry channel. The task +// carries all data needed for the retry attempt, so when it fires, the +// engine can deliver without reading event or target data from the DB. +func (e *Engine) scheduleRetry(task DeliveryTask, delay time.Duration) { e.log.Debug("scheduling delivery retry", - "webhook_id", webhookID, - "delivery_id", deliveryID, + "webhook_id", task.WebhookID, + "delivery_id", task.DeliveryID, "delay", delay, + "next_attempt", task.AttemptNum, ) time.AfterFunc(delay, func() { select { - case e.retryCh <- retryRequest{ - webhookID: webhookID, - deliveryID: deliveryID, - }: + case e.retryCh <- task: default: e.log.Warn("retry channel full, delivery will be recovered on restart", - "delivery_id", deliveryID, + "delivery_id", task.DeliveryID, ) } }) } -func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *database.Delivery) { +func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) { switch d.Target.Type { case database.TargetTypeHTTP: e.deliverHTTP(ctx, webhookDB, d) case database.TargetTypeRetry: - e.deliverRetry(ctx, webhookDB, d) + e.deliverRetry(ctx, webhookDB, d, task) case database.TargetTypeDatabase: e.deliverDatabase(webhookDB, d) case database.TargetTypeLog: @@ -580,22 +671,19 @@ func (e *Engine) deliverHTTP(_ context.Context, webhookDB *gorm.DB, d *database. } } -func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database.Delivery) { +func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) { cfg, err := e.parseHTTPConfig(d.Target.Config) if err != nil { e.log.Error("invalid retry target config", "target_id", d.TargetID, "error", err, ) - e.recordResult(webhookDB, d, 1, false, 0, "", err.Error(), 0) + e.recordResult(webhookDB, d, task.AttemptNum, false, 0, "", err.Error(), 0) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) return } - // Determine attempt number from existing results (in per-webhook DB) - var resultCount int64 - webhookDB.Model(&database.DeliveryResult{}).Where("delivery_id = ?", d.ID).Count(&resultCount) - attemptNum := int(resultCount) + 1 + attemptNum := task.AttemptNum // Attempt delivery immediately — backoff is handled by the timer // that triggered this call, not by polling. @@ -625,14 +713,17 @@ func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying) // Schedule a timer for the next retry with exponential backoff. - // The timer will fire and send a retryRequest to the engine's - // retry channel, which triggers processRetryDelivery. + // The timer fires a DeliveryTask into the retry channel carrying + // all data needed for the next attempt. shift := attemptNum - 1 if shift > 30 { shift = 30 } backoff := time.Duration(1<= 16KB) are left nil to keep channel memory - // bounded; the engine fetches them from DB on demand. - n := delivery.Notification{ - WebhookID: entrypoint.WebhookID, - EventID: event.ID, - Method: event.Method, - Headers: event.Headers, - ContentType: event.ContentType, + // Notify the delivery engine with self-contained delivery tasks. + // Each task carries all target config and event data inline so + // the engine can deliver without touching any database (in the + // ≤16KB happy path). The engine only writes to the DB to record + // delivery results after each attempt. + if len(tasks) > 0 { + h.notifier.Notify(tasks) } - bodyStr := string(body) - if len(body) < delivery.MaxInlineBodySize { - n.Body = &bodyStr - } - h.notifier.Notify(n) h.log.Info("webhook event created", "event_id", event.ID,