From 9b4ae41c449b360cd636a54d48273d2c98edeb56 Mon Sep 17 00:00:00 2001 From: clawbot Date: Sun, 1 Mar 2026 22:20:33 -0800 Subject: [PATCH] feat: parallel fan-out delivery + circuit breaker for retry targets - Fan out all targets for an event in parallel goroutines (fire-and-forget) - Add per-target circuit breaker for retry targets (closed/open/half-open) - Circuit breaker trips after 5 consecutive failures, 30s cooldown - Open circuit skips delivery and reschedules after cooldown - Half-open allows one probe delivery to test recovery - HTTP/database/log targets unaffected (no circuit breaker) - Recovery path also fans out in parallel - Update README with parallel delivery and circuit breaker docs --- README.md | 95 ++++++++++- internal/delivery/circuit_breaker.go | 162 ++++++++++++++++++ internal/delivery/engine.go | 242 +++++++++++++++++---------- 3 files changed, 407 insertions(+), 92 deletions(-) create mode 100644 internal/delivery/circuit_breaker.go diff --git a/README.md b/README.md index 4c64d7f..3237579 100644 --- a/README.md +++ b/README.md @@ -498,14 +498,89 @@ External Service │ Engine │ (backoff) └──────┬───────┘ │ - ┌────────────────────┼────────────────────┐ - ▼ ▼ ▼ - ┌────────────┐ ┌────────────┐ ┌────────────┐ - │ HTTP Target│ │Retry Target│ │ Log Target │ - │ (1 attempt)│ │ (backoff) │ │ (stdout) │ - └────────────┘ └────────────┘ └────────────┘ + ┌─── parallel goroutines (fan-out) ───┐ + ▼ ▼ ▼ + ┌────────────┐ ┌────────────┐ ┌────────────┐ + │ HTTP Target│ │Retry Target│ │ Log Target │ + │ (1 attempt)│ │ (backoff + │ │ (stdout) │ + └────────────┘ │ circuit │ └────────────┘ + │ breaker) │ + └────────────┘ ``` +### Parallel Fan-Out Delivery + +When the delivery engine receives a batch of tasks for an event, it +fans out **all targets in parallel** — each `DeliveryTask` is dispatched +in its own goroutine immediately. An HTTP target, a retry target, and +a log target for the same event all start delivering simultaneously +with no sequential bottleneck. + +This means: + +- **No head-of-line blocking** — a slow HTTP target doesn't delay the + log target or other targets. +- **Maximum throughput** — all targets receive the event as quickly as + possible. +- **Independent results** — each goroutine records its own delivery + result in the per-webhook database without coordination. +- **Fire-and-forget** — the engine doesn't wait for all goroutines to + finish; each delivery is completely independent. + +The same parallel fan-out applies to crash recovery: when the engine +restarts and finds pending deliveries in per-webhook databases, it +recovers them and fans them out in parallel just like fresh deliveries. + +### Circuit Breaker (Retry Targets) + +Retry targets are protected by a **per-target circuit breaker** that +prevents hammering a down target with repeated failed delivery attempts. +The circuit breaker is in-memory only and resets on restart (which is +fine — startup recovery rescans the database anyway). + +**States:** + +| State | Behavior | +| ----------- | -------- | +| **Closed** | Normal operation. Deliveries flow through. Consecutive failures are counted. | +| **Open** | Target appears down. Deliveries are skipped and rescheduled for after the cooldown. | +| **Half-Open** | Cooldown expired. One probe delivery is allowed to test if the target has recovered. | + +**Transitions:** + +``` + success ┌──────────┐ + ┌────────────────────► │ Closed │ ◄─── probe succeeds + │ │ (normal) │ + │ └────┬─────┘ + │ │ N consecutive failures + │ ▼ + │ ┌──────────┐ + │ │ Open │ ◄─── probe fails + │ │(tripped) │ + │ └────┬─────┘ + │ │ cooldown expires + │ ▼ + │ ┌──────────┐ + └──────────────────────│Half-Open │ + │ (probe) │ + └──────────┘ +``` + +**Defaults:** + +- **Failure threshold:** 5 consecutive failures before opening +- **Cooldown:** 30 seconds in open state before probing + +**Scope:** Circuit breakers only apply to **retry** target types. HTTP +targets (fire-and-forget), database targets (local operations), and log +targets (stdout) do not use circuit breakers. + +When a circuit is open and a new delivery arrives, the engine marks the +delivery as `retrying` and schedules a retry timer for after the +remaining cooldown period. This ensures no deliveries are lost — they're +just delayed until the target is healthy again. + ### Rate Limiting Global rate limiting middleware (e.g., per-IP throttling applied at the @@ -606,7 +681,8 @@ webhooker/ │ ├── globals/ │ │ └── globals.go # Build-time variables (appname, version, arch) │ ├── delivery/ -│ │ └── engine.go # Event-driven delivery engine (channel + timer based) +│ │ ├── engine.go # Event-driven delivery engine (channel + timer based) +│ │ └── circuit_breaker.go # Per-target circuit breaker for retry targets │ ├── handlers/ │ │ ├── handlers.go # Base handler struct, JSON helpers, template rendering │ │ ├── auth.go # Login, logout handlers @@ -764,6 +840,11 @@ linted, tested, and compiled. Large bodies (≥16KB) are fetched from the per-webhook DB on demand. - [x] Database target type marks delivery as immediately successful (events are already in the per-webhook DB) +- [x] Parallel fan-out: all targets for an event are delivered + simultaneously in separate goroutines +- [x] Circuit breaker for retry targets: tracks consecutive failures + per target, opens after 5 failures (30s cooldown), half-open + probe to test recovery ### Remaining: Core Features - [ ] Per-webhook rate limiting in the receiver handler diff --git a/internal/delivery/circuit_breaker.go b/internal/delivery/circuit_breaker.go new file mode 100644 index 0000000..f49a15b --- /dev/null +++ b/internal/delivery/circuit_breaker.go @@ -0,0 +1,162 @@ +package delivery + +import ( + "sync" + "time" +) + +// CircuitState represents the current state of a circuit breaker. +type CircuitState int + +const ( + // CircuitClosed is the normal operating state. Deliveries flow through. + CircuitClosed CircuitState = iota + // CircuitOpen means the circuit has tripped. Deliveries are skipped + // until the cooldown expires. + CircuitOpen + // CircuitHalfOpen allows a single probe delivery to test whether + // the target has recovered. + CircuitHalfOpen +) + +const ( + // defaultFailureThreshold is the number of consecutive failures + // before a circuit breaker trips open. + defaultFailureThreshold = 5 + + // defaultCooldown is how long a circuit stays open before + // transitioning to half-open for a probe delivery. + defaultCooldown = 30 * time.Second +) + +// CircuitBreaker implements the circuit breaker pattern for a single +// delivery target. It tracks consecutive failures and prevents +// hammering a down target by temporarily stopping delivery attempts. +// +// States: +// - Closed (normal): deliveries flow through; consecutive failures +// are counted. +// - Open (tripped): deliveries are skipped; a cooldown timer is +// running. After the cooldown expires the state moves to HalfOpen. +// - HalfOpen (probing): one probe delivery is allowed. If it +// succeeds the circuit closes; if it fails the circuit reopens. +type CircuitBreaker struct { + mu sync.Mutex + state CircuitState + failures int + threshold int + cooldown time.Duration + lastFailure time.Time +} + +// NewCircuitBreaker creates a circuit breaker with default settings. +func NewCircuitBreaker() *CircuitBreaker { + return &CircuitBreaker{ + state: CircuitClosed, + threshold: defaultFailureThreshold, + cooldown: defaultCooldown, + } +} + +// Allow checks whether a delivery attempt should proceed. It returns +// true if the delivery should be attempted, false if the circuit is +// open and the delivery should be skipped. +// +// When the circuit is open and the cooldown has elapsed, Allow +// transitions to half-open and permits exactly one probe delivery. +func (cb *CircuitBreaker) Allow() bool { + cb.mu.Lock() + defer cb.mu.Unlock() + + switch cb.state { + case CircuitClosed: + return true + + case CircuitOpen: + // Check if cooldown has elapsed + if time.Since(cb.lastFailure) >= cb.cooldown { + cb.state = CircuitHalfOpen + return true + } + return false + + case CircuitHalfOpen: + // Only one probe at a time — reject additional attempts while + // a probe is in flight. The probe goroutine will call + // RecordSuccess or RecordFailure to resolve the state. + return false + + default: + return true + } +} + +// CooldownRemaining returns how much time is left before an open circuit +// transitions to half-open. Returns zero if the circuit is not open or +// the cooldown has already elapsed. +func (cb *CircuitBreaker) CooldownRemaining() time.Duration { + cb.mu.Lock() + defer cb.mu.Unlock() + + if cb.state != CircuitOpen { + return 0 + } + + remaining := cb.cooldown - time.Since(cb.lastFailure) + if remaining < 0 { + return 0 + } + return remaining +} + +// RecordSuccess records a successful delivery and resets the circuit +// breaker to closed state with zero failures. +func (cb *CircuitBreaker) RecordSuccess() { + cb.mu.Lock() + defer cb.mu.Unlock() + + cb.failures = 0 + cb.state = CircuitClosed +} + +// RecordFailure records a failed delivery. If the failure count reaches +// the threshold, the circuit trips open. +func (cb *CircuitBreaker) RecordFailure() { + cb.mu.Lock() + defer cb.mu.Unlock() + + cb.failures++ + cb.lastFailure = time.Now() + + switch cb.state { + case CircuitClosed: + if cb.failures >= cb.threshold { + cb.state = CircuitOpen + } + + case CircuitHalfOpen: + // Probe failed — reopen immediately + cb.state = CircuitOpen + } +} + +// State returns the current circuit state. Safe for concurrent use. +func (cb *CircuitBreaker) State() CircuitState { + cb.mu.Lock() + defer cb.mu.Unlock() + return cb.state +} + +// String returns the human-readable name of a circuit state. +func (s CircuitState) String() string { + switch s { + case CircuitClosed: + return "closed" + case CircuitOpen: + return "open" + case CircuitHalfOpen: + return "half-open" + default: + return "unknown" + } +} diff --git a/internal/delivery/engine.go b/internal/delivery/engine.go index 58d9293..235283b 100644 --- a/internal/delivery/engine.go +++ b/internal/delivery/engine.go @@ -104,6 +104,11 @@ type EngineParams struct { // DeliveryTask so retries also avoid unnecessary DB reads. The database // stores delivery status for crash recovery only; on startup the engine // scans for interrupted deliveries and re-queues them. +// +// All targets for a single event are delivered in parallel — each +// DeliveryTask is dispatched in its own goroutine for maximum fan-out +// speed. Retry targets are protected by a per-target circuit breaker +// that stops hammering a down target after consecutive failures. type Engine struct { database *database.Database dbManager *database.WebhookDBManager @@ -113,6 +118,11 @@ type Engine struct { wg sync.WaitGroup notifyCh chan []DeliveryTask retryCh chan DeliveryTask + + // circuitBreakers stores a *CircuitBreaker per target ID. Only used + // for retry targets — HTTP, database, and log targets do not need + // circuit breakers because they either fire once or are local ops. + circuitBreakers sync.Map } // New creates and registers the delivery engine with the fx lifecycle. @@ -347,10 +357,12 @@ func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) } // processDeliveryTasks handles a batch of delivery tasks from the webhook -// handler. In the happy path (body ≤ 16KB), the engine delivers without -// reading from any database — it trusts the handler's inline data and -// only touches the DB to record results. For large bodies (body > 16KB), -// the body is fetched from the per-webhook database on demand. +// handler. Each task is dispatched in its own goroutine for parallel +// fan-out — all targets for a single event start delivering simultaneously. +// In the happy path (body ≤ 16KB), the engine delivers without reading +// from any database — it trusts the handler's inline data and only touches +// the DB to record results. For large bodies (body > 16KB), the body is +// fetched once and shared across all goroutines in the batch. func (e *Engine) processDeliveryTasks(ctx context.Context, tasks []DeliveryTask) { if len(tasks) == 0 { return @@ -367,10 +379,25 @@ func (e *Engine) processDeliveryTasks(ctx context.Context, tasks []DeliveryTask) return } - // For the large-body case, we may need to fetch the event body once - // for all tasks sharing the same event. Cache it here. + // For the large-body case, pre-fetch the event body once before + // fanning out so all goroutines share the same data. var fetchedBody *string + if tasks[0].Body == nil { + var dbEvent database.Event + if err := webhookDB.Select("body"). + First(&dbEvent, "id = ?", tasks[0].EventID).Error; err != nil { + e.log.Error("failed to fetch event body from database", + "event_id", tasks[0].EventID, + "error", err, + ) + return + } + fetchedBody = &dbEvent.Body + } + // Fan out: spin up a goroutine per task for parallel delivery. + // Each goroutine is independent (fire-and-forget) and records its + // own result. No need to wait for all goroutines to finish. for i := range tasks { select { case <-ctx.Done(): @@ -378,60 +405,61 @@ func (e *Engine) processDeliveryTasks(ctx context.Context, tasks []DeliveryTask) default: } - task := &tasks[i] + task := tasks[i] // copy for goroutine closure safety - // Build Event from task data - event := database.Event{ - Method: task.Method, - Headers: task.Headers, - ContentType: task.ContentType, - } - event.ID = task.EventID - event.WebhookID = task.WebhookID - - if task.Body != nil { - // Happy path: body inline, no DB read needed - event.Body = *task.Body - } else { - // Large body path: fetch from per-webhook DB (once per batch) - if fetchedBody == nil { - var dbEvent database.Event - if err := webhookDB.Select("body"). - First(&dbEvent, "id = ?", task.EventID).Error; err != nil { - e.log.Error("failed to fetch event body from database", - "event_id", task.EventID, - "error", err, - ) - continue - } - fetchedBody = &dbEvent.Body - } - event.Body = *fetchedBody - } - - // Build Target from task data (no main DB query needed) - target := database.Target{ - Name: task.TargetName, - Type: task.TargetType, - Config: task.TargetConfig, - MaxRetries: task.MaxRetries, - } - target.ID = task.TargetID - - // Build Delivery struct for the processing chain - d := &database.Delivery{ - EventID: task.EventID, - TargetID: task.TargetID, - Status: database.DeliveryStatusPending, - Event: event, - Target: target, - } - d.ID = task.DeliveryID - - e.processDelivery(ctx, webhookDB, d, task) + go func() { + e.deliverTask(ctx, webhookDB, &task, fetchedBody) + }() } } +// deliverTask prepares and executes a single delivery task. Called from +// a dedicated goroutine for parallel fan-out. +func (e *Engine) deliverTask(ctx context.Context, webhookDB *gorm.DB, task *DeliveryTask, fetchedBody *string) { + // Build Event from task data + event := database.Event{ + Method: task.Method, + Headers: task.Headers, + ContentType: task.ContentType, + } + event.ID = task.EventID + event.WebhookID = task.WebhookID + + switch { + case task.Body != nil: + event.Body = *task.Body + case fetchedBody != nil: + event.Body = *fetchedBody + default: + e.log.Error("no body available for delivery task", + "delivery_id", task.DeliveryID, + "event_id", task.EventID, + ) + return + } + + // Build Target from task data (no main DB query needed) + target := database.Target{ + Name: task.TargetName, + Type: task.TargetType, + Config: task.TargetConfig, + MaxRetries: task.MaxRetries, + } + target.ID = task.TargetID + + // Build Delivery struct for the processing chain + d := &database.Delivery{ + EventID: task.EventID, + TargetID: task.TargetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: target, + } + d.ID = task.DeliveryID + + e.processDelivery(ctx, webhookDB, d, task) +} + // processRetryTask handles a single delivery task fired by a retry timer. // The task carries all data needed for delivery (same as the initial // notification). The only DB read is a status check to verify the delivery @@ -562,41 +590,47 @@ func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID targetMap[t.ID] = t } + // Fan out recovered deliveries in parallel — same as the normal + // delivery path, each task gets its own goroutine. for i := range deliveries { select { case <-ctx.Done(): return default: - target, ok := targetMap[deliveries[i].TargetID] - if !ok { - e.log.Error("target not found for delivery", - "delivery_id", deliveries[i].ID, - "target_id", deliveries[i].TargetID, - ) - continue - } - deliveries[i].Target = target - - // Build task from DB data for the recovery path - bodyStr := deliveries[i].Event.Body - task := &DeliveryTask{ - DeliveryID: deliveries[i].ID, - EventID: deliveries[i].EventID, - WebhookID: webhookID, - TargetID: target.ID, - TargetName: target.Name, - TargetType: target.Type, - TargetConfig: target.Config, - MaxRetries: target.MaxRetries, - Method: deliveries[i].Event.Method, - Headers: deliveries[i].Event.Headers, - ContentType: deliveries[i].Event.ContentType, - Body: &bodyStr, - AttemptNum: 1, - } - - e.processDelivery(ctx, webhookDB, &deliveries[i], task) } + + target, ok := targetMap[deliveries[i].TargetID] + if !ok { + e.log.Error("target not found for delivery", + "delivery_id", deliveries[i].ID, + "target_id", deliveries[i].TargetID, + ) + continue + } + deliveries[i].Target = target + + // Build task from DB data for the recovery path + bodyStr := deliveries[i].Event.Body + task := DeliveryTask{ + DeliveryID: deliveries[i].ID, + EventID: deliveries[i].EventID, + WebhookID: webhookID, + TargetID: target.ID, + TargetName: target.Name, + TargetType: target.Type, + TargetConfig: target.Config, + MaxRetries: target.MaxRetries, + Method: deliveries[i].Event.Method, + Headers: deliveries[i].Event.Headers, + ContentType: deliveries[i].Event.ContentType, + Body: &bodyStr, + AttemptNum: 1, + } + + d := deliveries[i] // copy for goroutine closure safety + go func() { + e.processDelivery(ctx, webhookDB, &d, &task) + }() } } @@ -683,6 +717,26 @@ func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database return } + // Check the circuit breaker for this target before attempting delivery. + cb := e.getCircuitBreaker(task.TargetID) + if !cb.Allow() { + // Circuit is open — skip delivery, mark as retrying, and + // schedule a retry for after the cooldown expires. + remaining := cb.CooldownRemaining() + e.log.Info("circuit breaker open, skipping delivery", + "target_id", task.TargetID, + "target_name", task.TargetName, + "delivery_id", d.ID, + "cooldown_remaining", remaining, + ) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying) + + retryTask := *task + // Don't increment AttemptNum — this wasn't a real attempt + e.scheduleRetry(retryTask, remaining) + return + } + attemptNum := task.AttemptNum // Attempt delivery immediately — backoff is handled by the timer @@ -698,10 +752,14 @@ func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database e.recordResult(webhookDB, d, attemptNum, success, statusCode, respBody, errMsg, duration) if success { + cb.RecordSuccess() e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) return } + // Delivery failed — record failure in circuit breaker + cb.RecordFailure() + maxRetries := d.Target.MaxRetries if maxRetries <= 0 { maxRetries = 5 // default @@ -727,6 +785,20 @@ func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database } } +// getCircuitBreaker returns the circuit breaker for the given target ID, +// creating one if it doesn't exist yet. Circuit breakers are in-memory +// only and reset on restart (startup recovery rescans the DB anyway). +func (e *Engine) getCircuitBreaker(targetID string) *CircuitBreaker { + if val, ok := e.circuitBreakers.Load(targetID); ok { + cb, _ := val.(*CircuitBreaker) //nolint:errcheck // type is guaranteed by LoadOrStore below + return cb + } + fresh := NewCircuitBreaker() + actual, _ := e.circuitBreakers.LoadOrStore(targetID, fresh) + cb, _ := actual.(*CircuitBreaker) //nolint:errcheck // we only store *CircuitBreaker values + return cb +} + // deliverDatabase handles the database target type. Since events are already // stored in the per-webhook database (that's the whole point of per-webhook // databases), the database target simply marks the delivery as successful.