diff --git a/README.md b/README.md index 3d476a5..c96300d 100644 --- a/README.md +++ b/README.md @@ -291,20 +291,23 @@ events should be forwarded. | `id` | UUID | Primary key | | `webhook_id` | UUID | Foreign key → Webhook | | `name` | string | Human-readable name | -| `type` | TargetType | One of: `http`, `retry`, `database`, `log` | +| `type` | TargetType | One of: `http`, `database`, `log` | | `active` | boolean | Whether deliveries are enabled (default: true) | | `config` | JSON text | Type-specific configuration | -| `max_retries` | integer | Maximum retry attempts (for retry targets) | -| `max_queue_size` | integer | Maximum queued deliveries (for retry targets) | +| `max_retries` | integer | Maximum retry attempts for HTTP targets (0 = fire-and-forget, >0 = retries with backoff) | +| `max_queue_size` | integer | Maximum queued deliveries (for HTTP targets with retries) | **Relations:** Belongs to Webhook. Has many Deliveries. **Target types:** - **`http`** — Forward the event as an HTTP POST to a configured URL. - Fire-and-forget: a single attempt with no retries. -- **`retry`** — Forward the event via HTTP POST with automatic retry on - failure. Uses exponential backoff up to `max_retries` attempts. + Behavior depends on `max_retries`: when `max_retries` is 0 (the + default), the target operates in fire-and-forget mode — a single + attempt with no retries and no circuit breaker. When `max_retries` is + greater than 0, failed deliveries are retried with exponential backoff + up to `max_retries` attempts, protected by a per-target circuit + breaker. - **`database`** — Confirm the event is stored in the webhook's per-webhook database (no external delivery). Since events are always written to the per-webhook DB on ingestion, this target marks delivery @@ -495,10 +498,12 @@ External Service ┌── bounded worker pool (N workers) ──┐ ▼ ▼ ▼ ┌────────────┐ ┌────────────┐ ┌────────────┐ - │ HTTP Target│ │Retry Target│ │ Log Target │ - │ (1 attempt)│ │ (backoff + │ │ (stdout) │ - └────────────┘ │ circuit │ └────────────┘ - │ breaker) │ + │ HTTP Target│ │ HTTP Target│ │ Log Target │ + │(max_retries│ │(max_retries│ │ (stdout) │ + │ == 0) │ │ > 0, │ └────────────┘ + │ fire+forget│ │ backoff + │ + └────────────┘ │ circuit │ + │ breaker) │ └────────────┘ ``` @@ -553,9 +558,9 @@ This means: durable fallback that ensures no retry is permanently lost, even under extreme backpressure. -### Circuit Breaker (Retry Targets) +### Circuit Breaker (HTTP Targets with Retries) -Retry targets are protected by a **per-target circuit breaker** that +HTTP targets with `max_retries` > 0 are protected by a **per-target circuit breaker** that prevents hammering a down target with repeated failed delivery attempts. The circuit breaker is in-memory only and resets on restart (which is fine — startup recovery rescans the database anyway). @@ -594,9 +599,10 @@ fine — startup recovery rescans the database anyway). - **Failure threshold:** 5 consecutive failures before opening - **Cooldown:** 30 seconds in open state before probing -**Scope:** Circuit breakers only apply to **retry** target types. HTTP -targets (fire-and-forget), database targets (local operations), and log -targets (stdout) do not use circuit breakers. +**Scope:** Circuit breakers only apply to **HTTP targets with +`max_retries` > 0**. Fire-and-forget HTTP targets (`max_retries` == 0), +database targets (local operations), and log targets (stdout) do not use +circuit breakers. When a circuit is open and a new delivery arrives, the engine marks the delivery as `retrying` and schedules a retry timer for after the @@ -704,7 +710,7 @@ webhooker/ │ │ └── globals.go # Build-time variables (appname, version, arch) │ ├── delivery/ │ │ ├── engine.go # Event-driven delivery engine (channel + timer based) -│ │ └── circuit_breaker.go # Per-target circuit breaker for retry targets +│ │ └── circuit_breaker.go # Per-target circuit breaker for HTTP targets with retries │ ├── handlers/ │ │ ├── handlers.go # Base handler struct, JSON helpers, template rendering │ │ ├── auth.go # Login, logout handlers @@ -838,8 +844,8 @@ linted, tested, and compiled. ### Completed: Core Webhook Engine (Phase 2 of MVP) - [x] Implement webhook reception and event storage at `/webhook/{uuid}` - [x] Build event processing and target delivery engine -- [x] Implement HTTP target type (fire-and-forget POST) -- [x] Implement retry target type (exponential backoff) +- [x] Implement HTTP target type (fire-and-forget with max_retries=0, + retries with exponential backoff when max_retries>0) - [x] Implement database target type (store events in per-webhook DB) - [x] Implement log target type (console output) - [x] Webhook management pages (list, create, edit, delete) @@ -861,9 +867,9 @@ linted, tested, and compiled. (events are already in the per-webhook DB) - [x] Parallel fan-out: all targets for an event are delivered via the bounded worker pool (no goroutine-per-target) -- [x] Circuit breaker for retry targets: tracks consecutive failures - per target, opens after 5 failures (30s cooldown), half-open - probe to test recovery +- [x] Circuit breaker for HTTP targets with retries: tracks consecutive + failures per target, opens after 5 failures (30s cooldown), + half-open probe to test recovery ### Remaining: Core Features - [ ] Per-webhook rate limiting in the receiver handler diff --git a/internal/database/database.go b/internal/database/database.go index 933a9ca..531f8b1 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -92,6 +92,16 @@ func (d *Database) migrate() error { } d.log.Info("database migrations completed") + // Data migration: merge "retry" target type into "http". + // Previously there were two separate HTTP-based target types: "http" + // (fire-and-forget) and "retry" (with retries). Now "http" handles + // both: max_retries=0 means fire-and-forget, max_retries>0 enables + // retries with exponential backoff and circuit breaker. + if err := d.db.Exec("UPDATE targets SET type = 'http' WHERE type = 'retry'").Error; err != nil { + d.log.Error("failed to migrate retry targets to http", "error", err) + return err + } + // Check if admin user exists var userCount int64 if err := d.db.Model(&User{}).Count(&userCount).Error; err != nil { diff --git a/internal/database/model_target.go b/internal/database/model_target.go index 1c1c842..e9c4628 100644 --- a/internal/database/model_target.go +++ b/internal/database/model_target.go @@ -5,7 +5,6 @@ type TargetType string const ( TargetTypeHTTP TargetType = "http" - TargetTypeRetry TargetType = "retry" TargetTypeDatabase TargetType = "database" TargetTypeLog TargetType = "log" ) @@ -22,7 +21,7 @@ type Target struct { // Configuration fields (JSON stored based on type) Config string `gorm:"type:text" json:"config"` // JSON configuration - // For retry targets + // For HTTP targets (max_retries=0 means fire-and-forget, >0 enables retries with backoff) MaxRetries int `json:"max_retries,omitempty"` MaxQueueSize int `json:"max_queue_size,omitempty"` diff --git a/internal/delivery/engine.go b/internal/delivery/engine.go index 029752a..ca254f1 100644 --- a/internal/delivery/engine.go +++ b/internal/delivery/engine.go @@ -133,7 +133,8 @@ type Engine struct { workers int // circuitBreakers stores a *CircuitBreaker per target ID. Only used - // for retry targets — HTTP, database, and log targets do not need + // for HTTP targets with MaxRetries > 0 — fire-and-forget HTTP targets + // (MaxRetries == 0), database targets, and log targets do not need // circuit breakers because they either fire once or are local ops. circuitBreakers sync.Map } @@ -829,9 +830,7 @@ func (e *Engine) sweepWebhookRetries(ctx context.Context, webhookID string) { func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) { switch d.Target.Type { case database.TargetTypeHTTP: - e.deliverHTTP(ctx, webhookDB, d) - case database.TargetTypeRetry: - e.deliverRetry(ctx, webhookDB, d, task) + e.deliverHTTP(ctx, webhookDB, d, task) case database.TargetTypeDatabase: e.deliverDatabase(webhookDB, d) case database.TargetTypeLog: @@ -845,47 +844,43 @@ func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *dat } } -func (e *Engine) deliverHTTP(_ context.Context, webhookDB *gorm.DB, d *database.Delivery) { +func (e *Engine) deliverHTTP(_ context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) { cfg, err := e.parseHTTPConfig(d.Target.Config) if err != nil { e.log.Error("invalid HTTP target config", "target_id", d.TargetID, "error", err, ) - e.recordResult(webhookDB, d, 1, false, 0, "", err.Error(), 0) - e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) - return - } - - statusCode, respBody, duration, err := e.doHTTPRequest(cfg, &d.Event) - - success := err == nil && statusCode >= 200 && statusCode < 300 - errMsg := "" - if err != nil { - errMsg = err.Error() - } - - e.recordResult(webhookDB, d, 1, success, statusCode, respBody, errMsg, duration) - - if success { - e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) - } else { - e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) - } -} - -func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) { - cfg, err := e.parseHTTPConfig(d.Target.Config) - if err != nil { - e.log.Error("invalid retry target config", - "target_id", d.TargetID, - "error", err, - ) e.recordResult(webhookDB, d, task.AttemptNum, false, 0, "", err.Error(), 0) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) return } + maxRetries := d.Target.MaxRetries + + // Fire-and-forget mode: max_retries == 0 means attempt once with no + // circuit breaker and no retry scheduling. + if maxRetries == 0 { + statusCode, respBody, duration, reqErr := e.doHTTPRequest(cfg, &d.Event) + + success := reqErr == nil && statusCode >= 200 && statusCode < 300 + errMsg := "" + if reqErr != nil { + errMsg = reqErr.Error() + } + + e.recordResult(webhookDB, d, 1, success, statusCode, respBody, errMsg, duration) + + if success { + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) + } else { + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) + } + return + } + + // Retry mode: max_retries > 0 — use circuit breaker and exponential backoff. + // Check the circuit breaker for this target before attempting delivery. cb := e.getCircuitBreaker(task.TargetID) if !cb.Allow() { @@ -910,12 +905,12 @@ func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database // Attempt delivery immediately — backoff is handled by the timer // that triggered this call, not by polling. - statusCode, respBody, duration, err := e.doHTTPRequest(cfg, &d.Event) + statusCode, respBody, duration, reqErr := e.doHTTPRequest(cfg, &d.Event) - success := err == nil && statusCode >= 200 && statusCode < 300 + success := reqErr == nil && statusCode >= 200 && statusCode < 300 errMsg := "" - if err != nil { - errMsg = err.Error() + if reqErr != nil { + errMsg = reqErr.Error() } e.recordResult(webhookDB, d, attemptNum, success, statusCode, respBody, errMsg, duration) @@ -929,11 +924,6 @@ func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database // Delivery failed — record failure in circuit breaker cb.RecordFailure() - maxRetries := d.Target.MaxRetries - if maxRetries <= 0 { - maxRetries = 5 // default - } - if attemptNum >= maxRetries { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } else { diff --git a/internal/delivery/engine_test.go b/internal/delivery/engine_test.go index d05ef42..279629d 100644 --- a/internal/delivery/engine_test.go +++ b/internal/delivery/engine_test.go @@ -141,13 +141,26 @@ func TestDeliverHTTP_Success(t *testing.T) { defer ts.Close() e := testEngine(t, 1) + targetID := uuid.New().String() event := seedEvent(t, db, `{"hello":"world"}`) - delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending) + delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending) + + task := &DeliveryTask{ + DeliveryID: delivery.ID, + EventID: event.ID, + WebhookID: event.WebhookID, + TargetID: targetID, + TargetName: "test-http", + TargetType: database.TargetTypeHTTP, + TargetConfig: newHTTPTargetConfig(ts.URL), + MaxRetries: 0, + AttemptNum: 1, + } d := &database.Delivery{ EventID: event.ID, - TargetID: delivery.TargetID, + TargetID: targetID, Status: database.DeliveryStatusPending, Event: event, Target: database.Target{ @@ -158,7 +171,7 @@ func TestDeliverHTTP_Success(t *testing.T) { } d.ID = delivery.ID - e.deliverHTTP(context.TODO(), db, d) + e.deliverHTTP(context.TODO(), db, d, task) assert.True(t, received.Load(), "HTTP target should have received request") @@ -185,13 +198,26 @@ func TestDeliverHTTP_Failure(t *testing.T) { defer ts.Close() e := testEngine(t, 1) + targetID := uuid.New().String() event := seedEvent(t, db, `{"test":true}`) - delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending) + delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending) + + task := &DeliveryTask{ + DeliveryID: delivery.ID, + EventID: event.ID, + WebhookID: event.WebhookID, + TargetID: targetID, + TargetName: "test-http-fail", + TargetType: database.TargetTypeHTTP, + TargetConfig: newHTTPTargetConfig(ts.URL), + MaxRetries: 0, + AttemptNum: 1, + } d := &database.Delivery{ EventID: event.ID, - TargetID: delivery.TargetID, + TargetID: targetID, Status: database.DeliveryStatusPending, Event: event, Target: database.Target{ @@ -202,7 +228,7 @@ func TestDeliverHTTP_Failure(t *testing.T) { } d.ID = delivery.ID - e.deliverHTTP(context.TODO(), db, d) + e.deliverHTTP(context.TODO(), db, d, task) // HTTP (fire-and-forget) marks as failed on non-2xx var updated database.Delivery @@ -280,7 +306,7 @@ func TestDeliverLog_ImmediateSuccess(t *testing.T) { assert.True(t, result.Success) } -func TestDeliverRetry_Success(t *testing.T) { +func TestDeliverHTTP_WithRetries_Success(t *testing.T) { t.Parallel() db := testWebhookDB(t) @@ -300,8 +326,8 @@ func TestDeliverRetry_Success(t *testing.T) { EventID: event.ID, WebhookID: event.WebhookID, TargetID: targetID, - TargetName: "test-retry", - TargetType: database.TargetTypeRetry, + TargetName: "test-http-retry", + TargetType: database.TargetTypeHTTP, TargetConfig: newHTTPTargetConfig(ts.URL), MaxRetries: 5, AttemptNum: 1, @@ -313,8 +339,8 @@ func TestDeliverRetry_Success(t *testing.T) { Status: database.DeliveryStatusPending, Event: event, Target: database.Target{ - Name: "test-retry", - Type: database.TargetTypeRetry, + Name: "test-http-retry", + Type: database.TargetTypeHTTP, Config: newHTTPTargetConfig(ts.URL), MaxRetries: 5, }, @@ -322,7 +348,7 @@ func TestDeliverRetry_Success(t *testing.T) { d.ID = delivery.ID d.Target.ID = targetID - e.deliverRetry(context.TODO(), db, d, task) + e.deliverHTTP(context.TODO(), db, d, task) var updated database.Delivery require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error) @@ -333,7 +359,7 @@ func TestDeliverRetry_Success(t *testing.T) { assert.Equal(t, CircuitClosed, cb.State()) } -func TestDeliverRetry_MaxRetriesExhausted(t *testing.T) { +func TestDeliverHTTP_MaxRetriesExhausted(t *testing.T) { t.Parallel() db := testWebhookDB(t) @@ -354,8 +380,8 @@ func TestDeliverRetry_MaxRetriesExhausted(t *testing.T) { EventID: event.ID, WebhookID: event.WebhookID, TargetID: targetID, - TargetName: "test-retry-exhaust", - TargetType: database.TargetTypeRetry, + TargetName: "test-http-exhaust", + TargetType: database.TargetTypeHTTP, TargetConfig: newHTTPTargetConfig(ts.URL), MaxRetries: maxRetries, AttemptNum: maxRetries, // final attempt @@ -367,8 +393,8 @@ func TestDeliverRetry_MaxRetriesExhausted(t *testing.T) { Status: database.DeliveryStatusRetrying, Event: event, Target: database.Target{ - Name: "test-retry-exhaust", - Type: database.TargetTypeRetry, + Name: "test-http-exhaust", + Type: database.TargetTypeHTTP, Config: newHTTPTargetConfig(ts.URL), MaxRetries: maxRetries, }, @@ -376,7 +402,7 @@ func TestDeliverRetry_MaxRetriesExhausted(t *testing.T) { d.ID = delivery.ID d.Target.ID = targetID - e.deliverRetry(context.TODO(), db, d, task) + e.deliverHTTP(context.TODO(), db, d, task) // After max retries exhausted, delivery should be failed var updated database.Delivery @@ -385,7 +411,7 @@ func TestDeliverRetry_MaxRetriesExhausted(t *testing.T) { "delivery should be failed after max retries exhausted") } -func TestDeliverRetry_SchedulesRetryOnFailure(t *testing.T) { +func TestDeliverHTTP_SchedulesRetryOnFailure(t *testing.T) { t.Parallel() db := testWebhookDB(t) @@ -405,8 +431,8 @@ func TestDeliverRetry_SchedulesRetryOnFailure(t *testing.T) { EventID: event.ID, WebhookID: event.WebhookID, TargetID: targetID, - TargetName: "test-retry-schedule", - TargetType: database.TargetTypeRetry, + TargetName: "test-http-schedule", + TargetType: database.TargetTypeHTTP, TargetConfig: newHTTPTargetConfig(ts.URL), MaxRetries: 5, AttemptNum: 1, @@ -418,8 +444,8 @@ func TestDeliverRetry_SchedulesRetryOnFailure(t *testing.T) { Status: database.DeliveryStatusPending, Event: event, Target: database.Target{ - Name: "test-retry-schedule", - Type: database.TargetTypeRetry, + Name: "test-http-schedule", + Type: database.TargetTypeHTTP, Config: newHTTPTargetConfig(ts.URL), MaxRetries: 5, }, @@ -427,7 +453,7 @@ func TestDeliverRetry_SchedulesRetryOnFailure(t *testing.T) { d.ID = delivery.ID d.Target.ID = targetID - e.deliverRetry(context.TODO(), db, d, task) + e.deliverHTTP(context.TODO(), db, d, task) // Delivery should be in retrying status (not failed — retries remain) var updated database.Delivery @@ -591,6 +617,21 @@ func TestWorkerPool_BoundedConcurrency(t *testing.T) { tasks[i].ID = delivery.ID } + // Build DeliveryTask structs for each delivery (needed by deliverHTTP) + deliveryTasks := make([]DeliveryTask, numTasks) + for i := 0; i < numTasks; i++ { + deliveryTasks[i] = DeliveryTask{ + DeliveryID: tasks[i].ID, + EventID: tasks[i].EventID, + TargetID: tasks[i].TargetID, + TargetName: tasks[i].Target.Name, + TargetType: tasks[i].Target.Type, + TargetConfig: tasks[i].Target.Config, + MaxRetries: 0, + AttemptNum: 1, + } + } + // Process all tasks through a bounded pool of goroutines to simulate // the engine's worker pool behavior var wg sync.WaitGroup @@ -606,7 +647,7 @@ func TestWorkerPool_BoundedConcurrency(t *testing.T) { go func() { defer wg.Done() for idx := range taskCh { - e.deliverHTTP(context.TODO(), db, &tasks[idx]) + e.deliverHTTP(context.TODO(), db, &tasks[idx], &deliveryTasks[idx]) } }() } @@ -629,7 +670,7 @@ func TestWorkerPool_BoundedConcurrency(t *testing.T) { } } -func TestDeliverRetry_CircuitBreakerBlocks(t *testing.T) { +func TestDeliverHTTP_CircuitBreakerBlocks(t *testing.T) { t.Parallel() db := testWebhookDB(t) e := testEngine(t, 1) @@ -651,7 +692,7 @@ func TestDeliverRetry_CircuitBreakerBlocks(t *testing.T) { WebhookID: event.WebhookID, TargetID: targetID, TargetName: "test-cb-block", - TargetType: database.TargetTypeRetry, + TargetType: database.TargetTypeHTTP, TargetConfig: newHTTPTargetConfig("http://will-not-be-called.invalid"), MaxRetries: 5, AttemptNum: 1, @@ -664,7 +705,7 @@ func TestDeliverRetry_CircuitBreakerBlocks(t *testing.T) { Event: event, Target: database.Target{ Name: "test-cb-block", - Type: database.TargetTypeRetry, + Type: database.TargetTypeHTTP, Config: newHTTPTargetConfig("http://will-not-be-called.invalid"), MaxRetries: 5, }, @@ -672,7 +713,7 @@ func TestDeliverRetry_CircuitBreakerBlocks(t *testing.T) { d.ID = delivery.ID d.Target.ID = targetID - e.deliverRetry(context.TODO(), db, d, task) + e.deliverHTTP(context.TODO(), db, d, task) // Delivery should be retrying (circuit open, no attempt made) var updated database.Delivery diff --git a/internal/handlers/source_management.go b/internal/handlers/source_management.go index 0e3b197..66a4873 100644 --- a/internal/handlers/source_management.go +++ b/internal/handlers/source_management.go @@ -519,16 +519,16 @@ func (h *Handlers) HandleTargetCreate() http.HandlerFunc { // Validate target type switch targetType { - case database.TargetTypeHTTP, database.TargetTypeRetry, database.TargetTypeDatabase, database.TargetTypeLog: + case database.TargetTypeHTTP, database.TargetTypeDatabase, database.TargetTypeLog: // valid default: http.Error(w, "Invalid target type", http.StatusBadRequest) return } - // Build config JSON for HTTP-based targets + // Build config JSON for HTTP targets var configJSON string - if targetType == database.TargetTypeHTTP || targetType == database.TargetTypeRetry { + if targetType == database.TargetTypeHTTP { if url == "" { http.Error(w, "URL is required for HTTP targets", http.StatusBadRequest) return @@ -544,9 +544,9 @@ func (h *Handlers) HandleTargetCreate() http.HandlerFunc { configJSON = string(configBytes) } - maxRetries := 5 + maxRetries := 0 // default: fire-and-forget (no retries) if maxRetriesStr != "" { - if v, err := strconv.Atoi(maxRetriesStr); err == nil && v > 0 { + if v, err := strconv.Atoi(maxRetriesStr); err == nil && v >= 0 { maxRetries = v } } diff --git a/templates/source_detail.html b/templates/source_detail.html index 437ad4b..7e034f2 100644 --- a/templates/source_detail.html +++ b/templates/source_detail.html @@ -92,17 +92,16 @@ -