diff --git a/internal/delivery/circuit_breaker_test.go b/internal/delivery/circuit_breaker_test.go new file mode 100644 index 0000000..4ea68da --- /dev/null +++ b/internal/delivery/circuit_breaker_test.go @@ -0,0 +1,243 @@ +package delivery + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCircuitBreaker_ClosedState_AllowsDeliveries(t *testing.T) { + t.Parallel() + cb := NewCircuitBreaker() + + assert.Equal(t, CircuitClosed, cb.State()) + assert.True(t, cb.Allow(), "closed circuit should allow deliveries") + // Multiple calls should all succeed + for i := 0; i < 10; i++ { + assert.True(t, cb.Allow()) + } +} + +func TestCircuitBreaker_FailureCounting(t *testing.T) { + t.Parallel() + cb := NewCircuitBreaker() + + // Record failures below threshold — circuit should stay closed + for i := 0; i < defaultFailureThreshold-1; i++ { + cb.RecordFailure() + assert.Equal(t, CircuitClosed, cb.State(), + "circuit should remain closed after %d failures", i+1) + assert.True(t, cb.Allow(), "should still allow after %d failures", i+1) + } +} + +func TestCircuitBreaker_OpenTransition(t *testing.T) { + t.Parallel() + cb := NewCircuitBreaker() + + // Record exactly threshold failures + for i := 0; i < defaultFailureThreshold; i++ { + cb.RecordFailure() + } + + assert.Equal(t, CircuitOpen, cb.State(), "circuit should be open after threshold failures") + assert.False(t, cb.Allow(), "open circuit should reject deliveries") +} + +func TestCircuitBreaker_Cooldown_StaysOpen(t *testing.T) { + t.Parallel() + // Use a circuit with a known short cooldown for testing + cb := &CircuitBreaker{ + state: CircuitClosed, + threshold: defaultFailureThreshold, + cooldown: 200 * time.Millisecond, + } + + // Trip the circuit open + for i := 0; i < defaultFailureThreshold; i++ { + cb.RecordFailure() + } + require.Equal(t, CircuitOpen, cb.State()) + + // During cooldown, Allow should return false + assert.False(t, cb.Allow(), "should be blocked during cooldown") + + // CooldownRemaining should be positive + remaining := cb.CooldownRemaining() + assert.Greater(t, remaining, time.Duration(0), "cooldown should have remaining time") +} + +func TestCircuitBreaker_HalfOpen_AfterCooldown(t *testing.T) { + t.Parallel() + cb := &CircuitBreaker{ + state: CircuitClosed, + threshold: defaultFailureThreshold, + cooldown: 50 * time.Millisecond, + } + + // Trip the circuit open + for i := 0; i < defaultFailureThreshold; i++ { + cb.RecordFailure() + } + require.Equal(t, CircuitOpen, cb.State()) + + // Wait for cooldown to expire + time.Sleep(60 * time.Millisecond) + + // CooldownRemaining should be zero after cooldown + assert.Equal(t, time.Duration(0), cb.CooldownRemaining()) + + // First Allow after cooldown should succeed (probe) + assert.True(t, cb.Allow(), "should allow one probe after cooldown") + assert.Equal(t, CircuitHalfOpen, cb.State(), "should be half-open after probe allowed") + + // Second Allow should be rejected (only one probe at a time) + assert.False(t, cb.Allow(), "should reject additional probes while half-open") +} + +func TestCircuitBreaker_ProbeSuccess_ClosesCircuit(t *testing.T) { + t.Parallel() + cb := &CircuitBreaker{ + state: CircuitClosed, + threshold: defaultFailureThreshold, + cooldown: 50 * time.Millisecond, + } + + // Trip open → wait for cooldown → allow probe + for i := 0; i < defaultFailureThreshold; i++ { + cb.RecordFailure() + } + time.Sleep(60 * time.Millisecond) + require.True(t, cb.Allow()) // probe allowed, state → half-open + + // Probe succeeds → circuit should close + cb.RecordSuccess() + assert.Equal(t, CircuitClosed, cb.State(), "successful probe should close circuit") + + // Should allow deliveries again + assert.True(t, cb.Allow(), "closed circuit should allow deliveries") +} + +func TestCircuitBreaker_ProbeFailure_ReopensCircuit(t *testing.T) { + t.Parallel() + cb := &CircuitBreaker{ + state: CircuitClosed, + threshold: defaultFailureThreshold, + cooldown: 50 * time.Millisecond, + } + + // Trip open → wait for cooldown → allow probe + for i := 0; i < defaultFailureThreshold; i++ { + cb.RecordFailure() + } + time.Sleep(60 * time.Millisecond) + require.True(t, cb.Allow()) // probe allowed, state → half-open + + // Probe fails → circuit should reopen + cb.RecordFailure() + assert.Equal(t, CircuitOpen, cb.State(), "failed probe should reopen circuit") + assert.False(t, cb.Allow(), "reopened circuit should reject deliveries") +} + +func TestCircuitBreaker_SuccessResetsFailures(t *testing.T) { + t.Parallel() + cb := NewCircuitBreaker() + + // Accumulate failures just below threshold + for i := 0; i < defaultFailureThreshold-1; i++ { + cb.RecordFailure() + } + require.Equal(t, CircuitClosed, cb.State()) + + // Success should reset the failure counter + cb.RecordSuccess() + assert.Equal(t, CircuitClosed, cb.State()) + + // Now we should need another full threshold of failures to trip + for i := 0; i < defaultFailureThreshold-1; i++ { + cb.RecordFailure() + } + assert.Equal(t, CircuitClosed, cb.State(), + "circuit should still be closed — success reset the counter") + + // One more failure should trip it + cb.RecordFailure() + assert.Equal(t, CircuitOpen, cb.State()) +} + +func TestCircuitBreaker_ConcurrentAccess(t *testing.T) { + t.Parallel() + cb := NewCircuitBreaker() + + const goroutines = 100 + var wg sync.WaitGroup + wg.Add(goroutines * 3) + + // Concurrent Allow calls + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + cb.Allow() + }() + } + + // Concurrent RecordFailure calls + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + cb.RecordFailure() + }() + } + + // Concurrent RecordSuccess calls + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + cb.RecordSuccess() + }() + } + + wg.Wait() + // No panic or data race — the test passes if -race doesn't flag anything. + // State should be one of the valid states. + state := cb.State() + assert.Contains(t, []CircuitState{CircuitClosed, CircuitOpen, CircuitHalfOpen}, state, + "state should be valid after concurrent access") +} + +func TestCircuitBreaker_CooldownRemaining_ClosedReturnsZero(t *testing.T) { + t.Parallel() + cb := NewCircuitBreaker() + assert.Equal(t, time.Duration(0), cb.CooldownRemaining(), + "closed circuit should have zero cooldown remaining") +} + +func TestCircuitBreaker_CooldownRemaining_HalfOpenReturnsZero(t *testing.T) { + t.Parallel() + cb := &CircuitBreaker{ + state: CircuitClosed, + threshold: defaultFailureThreshold, + cooldown: 50 * time.Millisecond, + } + + // Trip open, wait, transition to half-open + for i := 0; i < defaultFailureThreshold; i++ { + cb.RecordFailure() + } + time.Sleep(60 * time.Millisecond) + require.True(t, cb.Allow()) // → half-open + + assert.Equal(t, time.Duration(0), cb.CooldownRemaining(), + "half-open circuit should have zero cooldown remaining") +} + +func TestCircuitState_String(t *testing.T) { + t.Parallel() + assert.Equal(t, "closed", CircuitClosed.String()) + assert.Equal(t, "open", CircuitOpen.String()) + assert.Equal(t, "half-open", CircuitHalfOpen.String()) + assert.Equal(t, "unknown", CircuitState(99).String()) +} diff --git a/internal/delivery/engine_test.go b/internal/delivery/engine_test.go new file mode 100644 index 0000000..d05ef42 --- /dev/null +++ b/internal/delivery/engine_test.go @@ -0,0 +1,895 @@ +package delivery + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + _ "modernc.org/sqlite" + "sneak.berlin/go/webhooker/internal/database" +) + +// testWebhookDB creates a real SQLite per-webhook database in a temp dir +// and runs the event-tier migrations (Event, Delivery, DeliveryResult). +func testWebhookDB(t *testing.T) *gorm.DB { + t.Helper() + dbPath := filepath.Join(t.TempDir(), "events-test.db") + dsn := fmt.Sprintf("file:%s?cache=shared&mode=rwc", dbPath) + + sqlDB, err := sql.Open("sqlite", dsn) + require.NoError(t, err) + t.Cleanup(func() { sqlDB.Close() }) + + db, err := gorm.Open(sqlite.Dialector{Conn: sqlDB}, &gorm.Config{}) + require.NoError(t, err) + + require.NoError(t, db.AutoMigrate( + &database.Event{}, + &database.Delivery{}, + &database.DeliveryResult{}, + )) + + return db +} + +// testEngine builds an Engine with custom settings for testing. It does +// NOT call start() — callers control lifecycle for deterministic tests. +func testEngine(t *testing.T, workers int) *Engine { + t.Helper() + return &Engine{ + log: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})), + client: &http.Client{Timeout: 5 * time.Second}, + deliveryCh: make(chan DeliveryTask, deliveryChannelSize), + retryCh: make(chan DeliveryTask, retryChannelSize), + workers: workers, + } +} + +// newHTTPTargetConfig returns a JSON config for an HTTP/retry target +// pointing at the given URL. +func newHTTPTargetConfig(url string) string { + cfg := HTTPTargetConfig{URL: url} + data, err := json.Marshal(cfg) + if err != nil { + panic("failed to marshal HTTPTargetConfig: " + err.Error()) + } + return string(data) +} + +// seedEvent inserts an event into the per-webhook DB and returns it. +func seedEvent(t *testing.T, db *gorm.DB, body string) database.Event { + t.Helper() + event := database.Event{ + WebhookID: uuid.New().String(), + EntrypointID: uuid.New().String(), + Method: "POST", + Headers: `{"Content-Type":["application/json"]}`, + Body: body, + ContentType: "application/json", + } + require.NoError(t, db.Create(&event).Error) + return event +} + +// seedDelivery inserts a delivery for an event + target and returns it. +func seedDelivery(t *testing.T, db *gorm.DB, eventID, targetID string, status database.DeliveryStatus) database.Delivery { + t.Helper() + d := database.Delivery{ + EventID: eventID, + TargetID: targetID, + Status: status, + } + require.NoError(t, db.Create(&d).Error) + return d +} + +// --- Tests --- + +func TestNotify_NonBlocking(t *testing.T) { + t.Parallel() + e := testEngine(t, 1) + + // Fill the delivery channel to capacity + for i := 0; i < deliveryChannelSize; i++ { + e.deliveryCh <- DeliveryTask{DeliveryID: fmt.Sprintf("fill-%d", i)} + } + + // Notify should NOT block even though channel is full + done := make(chan struct{}) + go func() { + e.Notify([]DeliveryTask{ + {DeliveryID: "overflow-1"}, + {DeliveryID: "overflow-2"}, + }) + close(done) + }() + + select { + case <-done: + // success: Notify returned without blocking + case <-time.After(2 * time.Second): + t.Fatal("Notify blocked when delivery channel was full") + } +} + +func TestDeliverHTTP_Success(t *testing.T) { + t.Parallel() + db := testWebhookDB(t) + + var received atomic.Bool + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + received.Store(true) + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"ok":true}`) + })) + defer ts.Close() + + e := testEngine(t, 1) + + event := seedEvent(t, db, `{"hello":"world"}`) + delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending) + + d := &database.Delivery{ + EventID: event.ID, + TargetID: delivery.TargetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: database.Target{ + Name: "test-http", + Type: database.TargetTypeHTTP, + Config: newHTTPTargetConfig(ts.URL), + }, + } + d.ID = delivery.ID + + e.deliverHTTP(context.TODO(), db, d) + + assert.True(t, received.Load(), "HTTP target should have received request") + + // Check DB: delivery should be delivered + var updated database.Delivery + require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error) + assert.Equal(t, database.DeliveryStatusDelivered, updated.Status) + + // Check that a result was recorded + var result database.DeliveryResult + require.NoError(t, db.Where("delivery_id = ?", delivery.ID).First(&result).Error) + assert.True(t, result.Success) + assert.Equal(t, http.StatusOK, result.StatusCode) +} + +func TestDeliverHTTP_Failure(t *testing.T) { + t.Parallel() + db := testWebhookDB(t) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, "internal error") + })) + defer ts.Close() + + e := testEngine(t, 1) + + event := seedEvent(t, db, `{"test":true}`) + delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending) + + d := &database.Delivery{ + EventID: event.ID, + TargetID: delivery.TargetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: database.Target{ + Name: "test-http-fail", + Type: database.TargetTypeHTTP, + Config: newHTTPTargetConfig(ts.URL), + }, + } + d.ID = delivery.ID + + e.deliverHTTP(context.TODO(), db, d) + + // HTTP (fire-and-forget) marks as failed on non-2xx + var updated database.Delivery + require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error) + assert.Equal(t, database.DeliveryStatusFailed, updated.Status) + + var result database.DeliveryResult + require.NoError(t, db.Where("delivery_id = ?", delivery.ID).First(&result).Error) + assert.False(t, result.Success) + assert.Equal(t, http.StatusInternalServerError, result.StatusCode) +} + +func TestDeliverDatabase_ImmediateSuccess(t *testing.T) { + t.Parallel() + db := testWebhookDB(t) + e := testEngine(t, 1) + + event := seedEvent(t, db, `{"db":"target"}`) + delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending) + + d := &database.Delivery{ + EventID: event.ID, + TargetID: delivery.TargetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: database.Target{ + Name: "test-db", + Type: database.TargetTypeDatabase, + }, + } + d.ID = delivery.ID + + e.deliverDatabase(db, d) + + var updated database.Delivery + require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error) + assert.Equal(t, database.DeliveryStatusDelivered, updated.Status, + "database target should immediately succeed") + + var result database.DeliveryResult + require.NoError(t, db.Where("delivery_id = ?", delivery.ID).First(&result).Error) + assert.True(t, result.Success) + assert.Equal(t, 0, result.StatusCode, "database target should not have an HTTP status code") +} + +func TestDeliverLog_ImmediateSuccess(t *testing.T) { + t.Parallel() + db := testWebhookDB(t) + e := testEngine(t, 1) + + event := seedEvent(t, db, `{"log":"target"}`) + delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending) + + d := &database.Delivery{ + EventID: event.ID, + TargetID: delivery.TargetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: database.Target{ + Name: "test-log", + Type: database.TargetTypeLog, + }, + } + d.ID = delivery.ID + + e.deliverLog(db, d) + + var updated database.Delivery + require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error) + assert.Equal(t, database.DeliveryStatusDelivered, updated.Status, + "log target should immediately succeed") + + var result database.DeliveryResult + require.NoError(t, db.Where("delivery_id = ?", delivery.ID).First(&result).Error) + assert.True(t, result.Success) +} + +func TestDeliverRetry_Success(t *testing.T) { + t.Parallel() + db := testWebhookDB(t) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + e := testEngine(t, 1) + targetID := uuid.New().String() + + event := seedEvent(t, db, `{"retry":"ok"}`) + delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending) + + task := &DeliveryTask{ + DeliveryID: delivery.ID, + EventID: event.ID, + WebhookID: event.WebhookID, + TargetID: targetID, + TargetName: "test-retry", + TargetType: database.TargetTypeRetry, + TargetConfig: newHTTPTargetConfig(ts.URL), + MaxRetries: 5, + AttemptNum: 1, + } + + d := &database.Delivery{ + EventID: event.ID, + TargetID: targetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: database.Target{ + Name: "test-retry", + Type: database.TargetTypeRetry, + Config: newHTTPTargetConfig(ts.URL), + MaxRetries: 5, + }, + } + d.ID = delivery.ID + d.Target.ID = targetID + + e.deliverRetry(context.TODO(), db, d, task) + + var updated database.Delivery + require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error) + assert.Equal(t, database.DeliveryStatusDelivered, updated.Status) + + // Circuit breaker should have recorded success + cb := e.getCircuitBreaker(targetID) + assert.Equal(t, CircuitClosed, cb.State()) +} + +func TestDeliverRetry_MaxRetriesExhausted(t *testing.T) { + t.Parallel() + db := testWebhookDB(t) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusBadGateway) + })) + defer ts.Close() + + e := testEngine(t, 1) + targetID := uuid.New().String() + + event := seedEvent(t, db, `{"retry":"exhaust"}`) + delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusRetrying) + + maxRetries := 3 + task := &DeliveryTask{ + DeliveryID: delivery.ID, + EventID: event.ID, + WebhookID: event.WebhookID, + TargetID: targetID, + TargetName: "test-retry-exhaust", + TargetType: database.TargetTypeRetry, + TargetConfig: newHTTPTargetConfig(ts.URL), + MaxRetries: maxRetries, + AttemptNum: maxRetries, // final attempt + } + + d := &database.Delivery{ + EventID: event.ID, + TargetID: targetID, + Status: database.DeliveryStatusRetrying, + Event: event, + Target: database.Target{ + Name: "test-retry-exhaust", + Type: database.TargetTypeRetry, + Config: newHTTPTargetConfig(ts.URL), + MaxRetries: maxRetries, + }, + } + d.ID = delivery.ID + d.Target.ID = targetID + + e.deliverRetry(context.TODO(), db, d, task) + + // After max retries exhausted, delivery should be failed + var updated database.Delivery + require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error) + assert.Equal(t, database.DeliveryStatusFailed, updated.Status, + "delivery should be failed after max retries exhausted") +} + +func TestDeliverRetry_SchedulesRetryOnFailure(t *testing.T) { + t.Parallel() + db := testWebhookDB(t) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer ts.Close() + + e := testEngine(t, 1) + targetID := uuid.New().String() + + event := seedEvent(t, db, `{"retry":"schedule"}`) + delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending) + + task := &DeliveryTask{ + DeliveryID: delivery.ID, + EventID: event.ID, + WebhookID: event.WebhookID, + TargetID: targetID, + TargetName: "test-retry-schedule", + TargetType: database.TargetTypeRetry, + TargetConfig: newHTTPTargetConfig(ts.URL), + MaxRetries: 5, + AttemptNum: 1, + } + + d := &database.Delivery{ + EventID: event.ID, + TargetID: targetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: database.Target{ + Name: "test-retry-schedule", + Type: database.TargetTypeRetry, + Config: newHTTPTargetConfig(ts.URL), + MaxRetries: 5, + }, + } + d.ID = delivery.ID + d.Target.ID = targetID + + e.deliverRetry(context.TODO(), db, d, task) + + // Delivery should be in retrying status (not failed — retries remain) + var updated database.Delivery + require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error) + assert.Equal(t, database.DeliveryStatusRetrying, updated.Status, + "delivery should be retrying when retries remain") + + // The timer should fire a task into the retry channel. Wait briefly + // for the timer (backoff for attempt 1 is 1s, but we're just verifying + // the status was set correctly and a result was recorded). + var result database.DeliveryResult + require.NoError(t, db.Where("delivery_id = ?", delivery.ID).First(&result).Error) + assert.False(t, result.Success) + assert.Equal(t, 1, result.AttemptNum) +} + +func TestExponentialBackoff_Durations(t *testing.T) { + t.Parallel() + // The engine uses: backoff = 2^(attemptNum-1) seconds + // attempt 1 → shift=0 → 1s + // attempt 2 → shift=1 → 2s + // attempt 3 → shift=2 → 4s + // attempt 4 → shift=3 → 8s + // attempt 5 → shift=4 → 16s + + expected := []time.Duration{ + 1 * time.Second, + 2 * time.Second, + 4 * time.Second, + 8 * time.Second, + 16 * time.Second, + } + + for attemptNum := 1; attemptNum <= 5; attemptNum++ { + shift := attemptNum - 1 + if shift > 30 { + shift = 30 + } + backoff := time.Duration(1< 30 { + shift = 30 + } + backoff := time.Duration(1< maxSeen { + maxSeen = concurrent + } + mu.Unlock() + + time.Sleep(100 * time.Millisecond) // simulate slow target + + mu.Lock() + concurrent-- + mu.Unlock() + + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + e := testEngine(t, numWorkers) + // We need a minimal dbManager-like setup. Since processNewTask + // needs dbManager, we'll drive workers by sending tasks through + // the delivery channel and manually calling deliverHTTP instead. + // Instead, let's directly test the worker pool by creating tasks + // and processing them through the channel. + + // Create tasks for more work than workers + const numTasks = 10 + tasks := make([]database.Delivery, numTasks) + targetCfg := newHTTPTargetConfig(ts.URL) + + for i := 0; i < numTasks; i++ { + event := seedEvent(t, db, fmt.Sprintf(`{"task":%d}`, i)) + delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending) + tasks[i] = database.Delivery{ + EventID: event.ID, + TargetID: delivery.TargetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: database.Target{ + Name: fmt.Sprintf("task-%d", i), + Type: database.TargetTypeHTTP, + Config: targetCfg, + }, + } + tasks[i].ID = delivery.ID + } + + // Process all tasks through a bounded pool of goroutines to simulate + // the engine's worker pool behavior + var wg sync.WaitGroup + taskCh := make(chan int, numTasks) + for i := 0; i < numTasks; i++ { + taskCh <- i + } + close(taskCh) + + // Start exactly numWorkers goroutines + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for idx := range taskCh { + e.deliverHTTP(context.TODO(), db, &tasks[idx]) + } + }() + } + + wg.Wait() + + mu.Lock() + observedMax := maxSeen + mu.Unlock() + + assert.LessOrEqual(t, observedMax, numWorkers, + "should never exceed %d concurrent deliveries, saw %d", numWorkers, observedMax) + + // All deliveries should be completed + for i := 0; i < numTasks; i++ { + var d database.Delivery + require.NoError(t, db.First(&d, "id = ?", tasks[i].ID).Error) + assert.Equal(t, database.DeliveryStatusDelivered, d.Status, + "task %d should be delivered", i) + } +} + +func TestDeliverRetry_CircuitBreakerBlocks(t *testing.T) { + t.Parallel() + db := testWebhookDB(t) + e := testEngine(t, 1) + targetID := uuid.New().String() + + // Pre-trip the circuit breaker for this target + cb := e.getCircuitBreaker(targetID) + for i := 0; i < defaultFailureThreshold; i++ { + cb.RecordFailure() + } + require.Equal(t, CircuitOpen, cb.State()) + + event := seedEvent(t, db, `{"cb":"blocked"}`) + delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending) + + task := &DeliveryTask{ + DeliveryID: delivery.ID, + EventID: event.ID, + WebhookID: event.WebhookID, + TargetID: targetID, + TargetName: "test-cb-block", + TargetType: database.TargetTypeRetry, + TargetConfig: newHTTPTargetConfig("http://will-not-be-called.invalid"), + MaxRetries: 5, + AttemptNum: 1, + } + + d := &database.Delivery{ + EventID: event.ID, + TargetID: targetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: database.Target{ + Name: "test-cb-block", + Type: database.TargetTypeRetry, + Config: newHTTPTargetConfig("http://will-not-be-called.invalid"), + MaxRetries: 5, + }, + } + d.ID = delivery.ID + d.Target.ID = targetID + + e.deliverRetry(context.TODO(), db, d, task) + + // Delivery should be retrying (circuit open, no attempt made) + var updated database.Delivery + require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error) + assert.Equal(t, database.DeliveryStatusRetrying, updated.Status, + "delivery should be retrying when circuit breaker is open") + + // No delivery result should have been recorded (no attempt was made) + var resultCount int64 + db.Model(&database.DeliveryResult{}).Where("delivery_id = ?", delivery.ID).Count(&resultCount) + assert.Equal(t, int64(0), resultCount, + "no delivery result should be recorded when circuit is open") +} + +func TestGetCircuitBreaker_CreatesOnDemand(t *testing.T) { + t.Parallel() + e := testEngine(t, 1) + + targetID := uuid.New().String() + cb1 := e.getCircuitBreaker(targetID) + require.NotNil(t, cb1) + assert.Equal(t, CircuitClosed, cb1.State()) + + // Same target should return the same circuit breaker + cb2 := e.getCircuitBreaker(targetID) + assert.Same(t, cb1, cb2, "same target ID should return the same circuit breaker") + + // Different target should return a different circuit breaker + otherID := uuid.New().String() + cb3 := e.getCircuitBreaker(otherID) + assert.NotSame(t, cb1, cb3, "different target ID should return a different circuit breaker") +} + +func TestParseHTTPConfig_Valid(t *testing.T) { + t.Parallel() + e := testEngine(t, 1) + + cfg, err := e.parseHTTPConfig(`{"url":"https://example.com/hook","headers":{"X-Token":"secret"}}`) + require.NoError(t, err) + assert.Equal(t, "https://example.com/hook", cfg.URL) + assert.Equal(t, "secret", cfg.Headers["X-Token"]) +} + +func TestParseHTTPConfig_Empty(t *testing.T) { + t.Parallel() + e := testEngine(t, 1) + + _, err := e.parseHTTPConfig("") + assert.Error(t, err, "empty config should return error") +} + +func TestParseHTTPConfig_MissingURL(t *testing.T) { + t.Parallel() + e := testEngine(t, 1) + + _, err := e.parseHTTPConfig(`{"headers":{"X-Token":"secret"}}`) + assert.Error(t, err, "config without URL should return error") +} + +func TestScheduleRetry_SendsToRetryChannel(t *testing.T) { + t.Parallel() + e := testEngine(t, 1) + + task := DeliveryTask{ + DeliveryID: uuid.New().String(), + EventID: uuid.New().String(), + WebhookID: uuid.New().String(), + TargetID: uuid.New().String(), + AttemptNum: 2, + } + + e.scheduleRetry(task, 10*time.Millisecond) + + // Wait for the timer to fire + select { + case received := <-e.retryCh: + assert.Equal(t, task.DeliveryID, received.DeliveryID) + assert.Equal(t, task.AttemptNum, received.AttemptNum) + case <-time.After(2 * time.Second): + t.Fatal("retry task was not sent to retry channel within timeout") + } +} + +func TestScheduleRetry_DropsWhenChannelFull(t *testing.T) { + t.Parallel() + e := &Engine{ + log: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})), + retryCh: make(chan DeliveryTask, 1), // tiny buffer + } + + // Fill the retry channel + e.retryCh <- DeliveryTask{DeliveryID: "fill"} + + task := DeliveryTask{ + DeliveryID: "overflow", + AttemptNum: 2, + } + + // Should not panic or block + e.scheduleRetry(task, 0) + + // Give timer a moment to fire + time.Sleep(50 * time.Millisecond) + + // Only the original task should be in the channel + received := <-e.retryCh + assert.Equal(t, "fill", received.DeliveryID, + "only the original task should be in the channel (overflow was dropped)") +} + +func TestIsForwardableHeader(t *testing.T) { + t.Parallel() + // Should forward + assert.True(t, isForwardableHeader("X-Custom-Header")) + assert.True(t, isForwardableHeader("Authorization")) + assert.True(t, isForwardableHeader("Accept")) + assert.True(t, isForwardableHeader("X-GitHub-Event")) + + // Should NOT forward (hop-by-hop) + assert.False(t, isForwardableHeader("Host")) + assert.False(t, isForwardableHeader("Connection")) + assert.False(t, isForwardableHeader("Keep-Alive")) + assert.False(t, isForwardableHeader("Transfer-Encoding")) + assert.False(t, isForwardableHeader("Content-Length")) +} + +func TestTruncate(t *testing.T) { + t.Parallel() + assert.Equal(t, "hello", truncate("hello", 10)) + assert.Equal(t, "hello", truncate("hello", 5)) + assert.Equal(t, "hel", truncate("hello", 3)) + assert.Equal(t, "", truncate("", 5)) +} + +func TestDoHTTPRequest_ForwardsHeaders(t *testing.T) { + t.Parallel() + + var receivedHeaders http.Header + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedHeaders = r.Header.Clone() + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + e := testEngine(t, 1) + cfg := &HTTPTargetConfig{ + URL: ts.URL, + Headers: map[string]string{"X-Target-Auth": "bearer xyz"}, + } + + event := &database.Event{ + Method: "POST", + Headers: `{"X-Custom":["value1"],"Content-Type":["application/json"]}`, + Body: `{"test":true}`, + ContentType: "application/json", + } + + statusCode, _, _, err := e.doHTTPRequest(cfg, event) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, statusCode) + + // Check forwarded headers + assert.Equal(t, "value1", receivedHeaders.Get("X-Custom")) + assert.Equal(t, "bearer xyz", receivedHeaders.Get("X-Target-Auth")) + assert.Equal(t, "application/json", receivedHeaders.Get("Content-Type")) + assert.Equal(t, "webhooker/1.0", receivedHeaders.Get("User-Agent")) +} + +func TestProcessDelivery_RoutesToCorrectHandler(t *testing.T) { + t.Parallel() + db := testWebhookDB(t) + e := testEngine(t, 1) + + tests := []struct { + name string + targetType database.TargetType + wantStatus database.DeliveryStatus + }{ + {"database target", database.TargetTypeDatabase, database.DeliveryStatusDelivered}, + {"log target", database.TargetTypeLog, database.DeliveryStatusDelivered}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + event := seedEvent(t, db, `{"routing":"test"}`) + delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending) + + d := &database.Delivery{ + EventID: event.ID, + TargetID: delivery.TargetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: database.Target{ + Name: "test-" + string(tt.targetType), + Type: tt.targetType, + }, + } + d.ID = delivery.ID + + task := &DeliveryTask{ + DeliveryID: delivery.ID, + TargetType: tt.targetType, + } + + e.processDelivery(context.TODO(), db, d, task) + + var updated database.Delivery + require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error) + assert.Equal(t, tt.wantStatus, updated.Status) + }) + } +} + +func TestMaxInlineBodySize_Constant(t *testing.T) { + t.Parallel() + // Verify the constant is 16KB as documented + assert.Equal(t, 16*1024, MaxInlineBodySize, + "MaxInlineBodySize should be 16KB (16384 bytes)") +}