package delivery import ( "context" "database/sql" "encoding/json" "fmt" "io" "log/slog" "net/http" "net/http/httptest" "os" "path/filepath" "strings" "sync/atomic" "testing" "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gorm.io/driver/sqlite" "gorm.io/gorm" _ "modernc.org/sqlite" "sneak.berlin/go/webhooker/internal/database" ) // testMainDB creates a real SQLite main database with the required tables // (Webhook, Target, Setting, User, etc.) for integration tests. func testMainDB(t *testing.T) *gorm.DB { t.Helper() dbPath := filepath.Join(t.TempDir(), "main-test.db") dsn := fmt.Sprintf("file:%s?cache=shared&mode=rwc", dbPath) sqlDB, err := sql.Open("sqlite", dsn) require.NoError(t, err) t.Cleanup(func() { sqlDB.Close() }) db, err := gorm.Open(sqlite.Dialector{Conn: sqlDB}, &gorm.Config{}) require.NoError(t, err) require.NoError(t, db.AutoMigrate( &database.Webhook{}, &database.Target{}, &database.User{}, &database.Setting{}, )) return db } // testDatabase wraps a *gorm.DB into a *database.Database for the engine. func testDatabase(t *testing.T, db *gorm.DB) *database.Database { t.Helper() return database.NewTestDatabase(db) } // testDBManager creates a WebhookDBManager backed by a temp directory. // Register per-webhook databases by calling seedWebhookDB. func testDBManager(t *testing.T) *database.WebhookDBManager { t.Helper() dataDir := t.TempDir() return database.NewTestWebhookDBManager(dataDir) } // seedWebhookDB creates a per-webhook database and registers it in the manager. // Returns the webhookDB and the webhookID. func seedWebhookDB(t *testing.T, mgr *database.WebhookDBManager, webhookID string) *gorm.DB { t.Helper() db, err := mgr.GetDB(webhookID) require.NoError(t, err) return db } // testEngineWithDB builds an Engine with a real database and dbManager. func testEngineWithDB(t *testing.T, mainDB *gorm.DB, dbMgr *database.WebhookDBManager) *Engine { t.Helper() return &Engine{ database: testDatabase(t, mainDB), dbManager: dbMgr, log: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})), client: &http.Client{Timeout: 5 * time.Second}, deliveryCh: make(chan DeliveryTask, deliveryChannelSize), retryCh: make(chan DeliveryTask, retryChannelSize), workers: 2, } } // --- processNewTask Tests --- func TestProcessNewTask_InlineBody(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) webhookID := uuid.New().String() webhookDB := seedWebhookDB(t, dbMgr, webhookID) var received atomic.Bool ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { received.Store(true) assert.Equal(t, "application/json", r.Header.Get("Content-Type")) w.WriteHeader(http.StatusOK) fmt.Fprint(w, `{"ok":true}`) })) defer ts.Close() e := testEngineWithDB(t, mainDB, dbMgr) targetID := uuid.New().String() // Seed event in per-webhook DB event := database.Event{ WebhookID: webhookID, EntrypointID: uuid.New().String(), Method: "POST", Headers: `{"Content-Type":["application/json"]}`, Body: `{"hello":"world"}`, ContentType: "application/json", } require.NoError(t, webhookDB.Create(&event).Error) // Seed delivery in per-webhook DB delivery := database.Delivery{ EventID: event.ID, TargetID: targetID, Status: database.DeliveryStatusPending, } require.NoError(t, webhookDB.Create(&delivery).Error) bodyStr := event.Body task := DeliveryTask{ DeliveryID: delivery.ID, EventID: event.ID, WebhookID: webhookID, TargetID: targetID, TargetName: "test-target", TargetType: database.TargetTypeHTTP, TargetConfig: newHTTPTargetConfig(ts.URL), MaxRetries: 0, Method: event.Method, Headers: event.Headers, ContentType: event.ContentType, Body: &bodyStr, AttemptNum: 1, } e.processNewTask(context.TODO(), &task) assert.True(t, received.Load(), "HTTP target should have received request") var updated database.Delivery require.NoError(t, webhookDB.First(&updated, "id = ?", delivery.ID).Error) assert.Equal(t, database.DeliveryStatusDelivered, updated.Status) } func TestProcessNewTask_LargeBody_FetchFromDB(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) webhookID := uuid.New().String() webhookDB := seedWebhookDB(t, dbMgr, webhookID) largeBody := strings.Repeat("x", MaxInlineBodySize+100) var receivedBody string ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { http.Error(w, "read error", http.StatusInternalServerError) return } receivedBody = string(body) w.WriteHeader(http.StatusOK) })) defer ts.Close() e := testEngineWithDB(t, mainDB, dbMgr) targetID := uuid.New().String() // Seed event with large body event := database.Event{ WebhookID: webhookID, EntrypointID: uuid.New().String(), Method: "POST", Headers: `{}`, Body: largeBody, ContentType: "text/plain", } require.NoError(t, webhookDB.Create(&event).Error) delivery := database.Delivery{ EventID: event.ID, TargetID: targetID, Status: database.DeliveryStatusPending, } require.NoError(t, webhookDB.Create(&delivery).Error) // Body is nil — engine should fetch from DB task := DeliveryTask{ DeliveryID: delivery.ID, EventID: event.ID, WebhookID: webhookID, TargetID: targetID, TargetName: "test-large-body", TargetType: database.TargetTypeHTTP, TargetConfig: newHTTPTargetConfig(ts.URL), MaxRetries: 0, Method: event.Method, Headers: event.Headers, ContentType: event.ContentType, Body: nil, // Large body — must be fetched from DB AttemptNum: 1, } e.processNewTask(context.TODO(), &task) assert.Equal(t, largeBody, receivedBody, "engine should fetch large body from DB") var updated database.Delivery require.NoError(t, webhookDB.First(&updated, "id = ?", delivery.ID).Error) assert.Equal(t, database.DeliveryStatusDelivered, updated.Status) } func TestProcessNewTask_InvalidWebhookID(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) e := testEngineWithDB(t, mainDB, dbMgr) // Use a webhook ID that has no database // GetDB will create it lazily in the real impl, but the event won't exist task := DeliveryTask{ DeliveryID: uuid.New().String(), EventID: uuid.New().String(), WebhookID: uuid.New().String(), TargetID: uuid.New().String(), TargetName: "test", TargetType: database.TargetTypeHTTP, TargetConfig: newHTTPTargetConfig("http://localhost:9999"), MaxRetries: 0, Body: nil, // Will try to fetch from DB — event won't be found AttemptNum: 1, } // Should not panic — error is logged e.processNewTask(context.TODO(), &task) } // --- processRetryTask Tests --- func TestProcessRetryTask_SuccessfulRetry(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) webhookID := uuid.New().String() webhookDB := seedWebhookDB(t, dbMgr, webhookID) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) })) defer ts.Close() e := testEngineWithDB(t, mainDB, dbMgr) targetID := uuid.New().String() event := database.Event{ WebhookID: webhookID, EntrypointID: uuid.New().String(), Method: "POST", Headers: `{}`, Body: `{"retry":"test"}`, ContentType: "application/json", } require.NoError(t, webhookDB.Create(&event).Error) // Create delivery in retrying status (simulates a prior failure) delivery := database.Delivery{ EventID: event.ID, TargetID: targetID, Status: database.DeliveryStatusRetrying, } require.NoError(t, webhookDB.Create(&delivery).Error) bodyStr := event.Body task := DeliveryTask{ DeliveryID: delivery.ID, EventID: event.ID, WebhookID: webhookID, TargetID: targetID, TargetName: "retry-target", TargetType: database.TargetTypeHTTP, TargetConfig: newHTTPTargetConfig(ts.URL), MaxRetries: 5, Method: event.Method, Headers: event.Headers, ContentType: event.ContentType, Body: &bodyStr, AttemptNum: 2, } e.processRetryTask(context.TODO(), &task) var updated database.Delivery require.NoError(t, webhookDB.First(&updated, "id = ?", delivery.ID).Error) assert.Equal(t, database.DeliveryStatusDelivered, updated.Status) } func TestProcessRetryTask_SkipsNonRetryingDelivery(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) webhookID := uuid.New().String() webhookDB := seedWebhookDB(t, dbMgr, webhookID) // No HTTP server — if the delivery is processed it will fail, // so we can verify it was skipped. e := testEngineWithDB(t, mainDB, dbMgr) targetID := uuid.New().String() event := database.Event{ WebhookID: webhookID, EntrypointID: uuid.New().String(), Method: "POST", Headers: `{}`, Body: `{"skip":"test"}`, ContentType: "application/json", } require.NoError(t, webhookDB.Create(&event).Error) // Delivery is already delivered — processRetryTask should skip it delivery := database.Delivery{ EventID: event.ID, TargetID: targetID, Status: database.DeliveryStatusDelivered, } require.NoError(t, webhookDB.Create(&delivery).Error) bodyStr := event.Body task := DeliveryTask{ DeliveryID: delivery.ID, EventID: event.ID, WebhookID: webhookID, TargetID: targetID, TargetName: "skip-target", TargetType: database.TargetTypeHTTP, TargetConfig: newHTTPTargetConfig("http://localhost:1"), MaxRetries: 5, Method: event.Method, Headers: event.Headers, ContentType: event.ContentType, Body: &bodyStr, AttemptNum: 2, } e.processRetryTask(context.TODO(), &task) // Status should remain delivered (was not changed) var updated database.Delivery require.NoError(t, webhookDB.First(&updated, "id = ?", delivery.ID).Error) assert.Equal(t, database.DeliveryStatusDelivered, updated.Status, "processRetryTask should skip delivery that is no longer retrying") } func TestProcessRetryTask_LargeBody_FetchFromDB(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) webhookID := uuid.New().String() webhookDB := seedWebhookDB(t, dbMgr, webhookID) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) })) defer ts.Close() e := testEngineWithDB(t, mainDB, dbMgr) targetID := uuid.New().String() largeBody := strings.Repeat("z", MaxInlineBodySize+50) event := database.Event{ WebhookID: webhookID, EntrypointID: uuid.New().String(), Method: "POST", Headers: `{}`, Body: largeBody, ContentType: "text/plain", } require.NoError(t, webhookDB.Create(&event).Error) delivery := database.Delivery{ EventID: event.ID, TargetID: targetID, Status: database.DeliveryStatusRetrying, } require.NoError(t, webhookDB.Create(&delivery).Error) task := DeliveryTask{ DeliveryID: delivery.ID, EventID: event.ID, WebhookID: webhookID, TargetID: targetID, TargetName: "retry-large", TargetType: database.TargetTypeHTTP, TargetConfig: newHTTPTargetConfig(ts.URL), MaxRetries: 5, Method: event.Method, Headers: event.Headers, ContentType: event.ContentType, Body: nil, // Large body — fetch from DB AttemptNum: 2, } e.processRetryTask(context.TODO(), &task) var updated database.Delivery require.NoError(t, webhookDB.First(&updated, "id = ?", delivery.ID).Error) assert.Equal(t, database.DeliveryStatusDelivered, updated.Status) } // --- Worker Lifecycle Tests --- func TestWorkerLifecycle_StartStop(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) e := testEngineWithDB(t, mainDB, dbMgr) // Start the engine e.start() // Verify workers are running by sending a task through the channel webhookID := uuid.New().String() webhookDB := seedWebhookDB(t, dbMgr, webhookID) event := database.Event{ WebhookID: webhookID, EntrypointID: uuid.New().String(), Method: "POST", Headers: `{}`, Body: `{"lifecycle":"test"}`, ContentType: "application/json", } require.NoError(t, webhookDB.Create(&event).Error) delivery := database.Delivery{ EventID: event.ID, TargetID: uuid.New().String(), Status: database.DeliveryStatusPending, } require.NoError(t, webhookDB.Create(&delivery).Error) bodyStr := event.Body task := DeliveryTask{ DeliveryID: delivery.ID, EventID: event.ID, WebhookID: webhookID, TargetID: delivery.TargetID, TargetName: "lifecycle-test", TargetType: database.TargetTypeLog, TargetConfig: "", MaxRetries: 0, Method: event.Method, Headers: event.Headers, ContentType: event.ContentType, Body: &bodyStr, AttemptNum: 1, } e.Notify([]DeliveryTask{task}) // Wait for the worker to process the task require.Eventually(t, func() bool { var d database.Delivery if err := webhookDB.First(&d, "id = ?", delivery.ID).Error; err != nil { return false } return d.Status == database.DeliveryStatusDelivered }, 5*time.Second, 50*time.Millisecond, "worker should process the delivery task") // Stop the engine cleanly e.stop() } func TestWorkerLifecycle_ProcessesRetryChannel(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) e := testEngineWithDB(t, mainDB, dbMgr) webhookID := uuid.New().String() webhookDB := seedWebhookDB(t, dbMgr, webhookID) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) })) defer ts.Close() event := database.Event{ WebhookID: webhookID, EntrypointID: uuid.New().String(), Method: "POST", Headers: `{}`, Body: `{"retry-chan":"test"}`, ContentType: "application/json", } require.NoError(t, webhookDB.Create(&event).Error) targetID := uuid.New().String() delivery := database.Delivery{ EventID: event.ID, TargetID: targetID, Status: database.DeliveryStatusRetrying, } require.NoError(t, webhookDB.Create(&delivery).Error) // Start the engine e.start() // Send task directly to retry channel bodyStr := event.Body task := DeliveryTask{ DeliveryID: delivery.ID, EventID: event.ID, WebhookID: webhookID, TargetID: targetID, TargetName: "retry-chan-test", TargetType: database.TargetTypeHTTP, TargetConfig: newHTTPTargetConfig(ts.URL), MaxRetries: 5, Method: event.Method, Headers: event.Headers, ContentType: event.ContentType, Body: &bodyStr, AttemptNum: 2, } e.retryCh <- task require.Eventually(t, func() bool { var d database.Delivery if err := webhookDB.First(&d, "id = ?", delivery.ID).Error; err != nil { return false } return d.Status == database.DeliveryStatusDelivered }, 5*time.Second, 50*time.Millisecond, "worker should process task from retry channel") e.stop() } // --- processDelivery: unknown target type --- func TestProcessDelivery_UnknownTargetType(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) webhookID := uuid.New().String() webhookDB := seedWebhookDB(t, dbMgr, webhookID) e := testEngineWithDB(t, mainDB, dbMgr) event := database.Event{ WebhookID: webhookID, EntrypointID: uuid.New().String(), Method: "POST", Headers: `{}`, Body: `{"unknown":"type"}`, ContentType: "application/json", } require.NoError(t, webhookDB.Create(&event).Error) delivery := database.Delivery{ EventID: event.ID, TargetID: uuid.New().String(), Status: database.DeliveryStatusPending, } require.NoError(t, webhookDB.Create(&delivery).Error) d := &database.Delivery{ EventID: event.ID, TargetID: delivery.TargetID, Status: database.DeliveryStatusPending, Event: event, Target: database.Target{ Name: "unknown", Type: database.TargetType("unknown"), }, } d.ID = delivery.ID task := &DeliveryTask{ DeliveryID: delivery.ID, TargetType: database.TargetType("unknown"), } e.processDelivery(context.TODO(), webhookDB, d, task) var updated database.Delivery require.NoError(t, webhookDB.First(&updated, "id = ?", delivery.ID).Error) assert.Equal(t, database.DeliveryStatusFailed, updated.Status, "unknown target type should result in failed status") } // --- Recovery Tests --- func TestRecoverPendingDeliveries(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) webhookID := uuid.New().String() webhookDB := seedWebhookDB(t, dbMgr, webhookID) e := testEngineWithDB(t, mainDB, dbMgr) targetID := uuid.New().String() // Create a target in the main DB target := database.Target{ WebhookID: webhookID, Name: "recovery-target", Type: database.TargetTypeLog, Active: true, Config: "", MaxRetries: 0, } target.ID = targetID require.NoError(t, mainDB.Create(&target).Error) // Create pending deliveries in the per-webhook DB events := make([]database.Event, 3) deliveries := make([]database.Delivery, 3) for i := 0; i < 3; i++ { events[i] = database.Event{ WebhookID: webhookID, EntrypointID: uuid.New().String(), Method: "POST", Headers: `{}`, Body: fmt.Sprintf(`{"recovery":%d}`, i), ContentType: "application/json", } require.NoError(t, webhookDB.Create(&events[i]).Error) deliveries[i] = database.Delivery{ EventID: events[i].ID, TargetID: targetID, Status: database.DeliveryStatusPending, } require.NoError(t, webhookDB.Create(&deliveries[i]).Error) } // Run recovery — should send tasks to the delivery channel e.recoverPendingDeliveries(context.Background(), webhookDB, webhookID) // Verify tasks were sent to the delivery channel for i := 0; i < 3; i++ { select { case task := <-e.deliveryCh: assert.Equal(t, targetID, task.TargetID) assert.Equal(t, database.TargetTypeLog, task.TargetType) assert.Equal(t, 1, task.AttemptNum) case <-time.After(2 * time.Second): t.Fatalf("expected task %d on delivery channel", i) } } } func TestRecoverWebhookDeliveries_RetryingDeliveries(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) webhookID := uuid.New().String() webhookDB := seedWebhookDB(t, dbMgr, webhookID) e := testEngineWithDB(t, mainDB, dbMgr) targetID := uuid.New().String() // Create target in main DB target := database.Target{ WebhookID: webhookID, Name: "retry-recovery", Type: database.TargetTypeHTTP, Active: true, Config: newHTTPTargetConfig("http://example.com/hook"), MaxRetries: 5, } target.ID = targetID require.NoError(t, mainDB.Create(&target).Error) // Create a retrying delivery with a prior result event := database.Event{ WebhookID: webhookID, EntrypointID: uuid.New().String(), Method: "POST", Headers: `{}`, Body: `{"retry-recovery":"test"}`, ContentType: "application/json", } require.NoError(t, webhookDB.Create(&event).Error) delivery := database.Delivery{ EventID: event.ID, TargetID: targetID, Status: database.DeliveryStatusRetrying, } require.NoError(t, webhookDB.Create(&delivery).Error) // Create a delivery result (simulates a prior failed attempt) result := database.DeliveryResult{ DeliveryID: delivery.ID, AttemptNum: 1, Success: false, StatusCode: 500, Error: "server error", } require.NoError(t, webhookDB.Create(&result).Error) // Create a webhook record in the main DB so recoverInFlight can find it webhook := database.Webhook{ UserID: uuid.New().String(), Name: "test-webhook", } webhook.ID = webhookID require.NoError(t, mainDB.Create(&webhook).Error) // Run recovery — retrying deliveries get timers scheduled e.recoverWebhookDeliveries(context.Background(), webhookID) // The delivery timer fires into the retry channel. Since the last result // was just created, the remaining backoff should be ~1s (2^0=1s for // attempt 1). We'll wait a bit and check if a task appears. select { case task := <-e.retryCh: assert.Equal(t, delivery.ID, task.DeliveryID) assert.Equal(t, targetID, task.TargetID) assert.Equal(t, 2, task.AttemptNum) case <-time.After(5 * time.Second): t.Fatal("expected retry task on retry channel from recovery") } } // --- recoverInFlight Tests --- func TestRecoverInFlight_NoWebhooks(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) e := testEngineWithDB(t, mainDB, dbMgr) // Should not panic with no webhooks e.recoverInFlight(context.Background()) } func TestRecoverInFlight_WithPendingDeliveries(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) webhookID := uuid.New().String() webhookDB := seedWebhookDB(t, dbMgr, webhookID) e := testEngineWithDB(t, mainDB, dbMgr) targetID := uuid.New().String() // Create webhook in main DB webhook := database.Webhook{ UserID: uuid.New().String(), Name: "recover-test", } webhook.ID = webhookID require.NoError(t, mainDB.Create(&webhook).Error) // Create target in main DB target := database.Target{ WebhookID: webhookID, Name: "recover-target", Type: database.TargetTypeLog, Active: true, MaxRetries: 0, } target.ID = targetID require.NoError(t, mainDB.Create(&target).Error) // Create pending delivery event := database.Event{ WebhookID: webhookID, EntrypointID: uuid.New().String(), Method: "POST", Headers: `{}`, Body: `{"recover":"inflight"}`, ContentType: "application/json", } require.NoError(t, webhookDB.Create(&event).Error) delivery := database.Delivery{ EventID: event.ID, TargetID: targetID, Status: database.DeliveryStatusPending, } require.NoError(t, webhookDB.Create(&delivery).Error) // Run recovery e.recoverInFlight(context.Background()) // Should have pushed a task to the delivery channel select { case task := <-e.deliveryCh: assert.Equal(t, delivery.ID, task.DeliveryID) assert.Equal(t, database.TargetTypeLog, task.TargetType) case <-time.After(2 * time.Second): t.Fatal("expected task on delivery channel from recoverInFlight") } } // --- HTTP Config with custom headers --- func TestDeliverHTTP_CustomTargetHeaders(t *testing.T) { t.Parallel() mainDB := testMainDB(t) dbMgr := testDBManager(t) webhookID := uuid.New().String() webhookDB := seedWebhookDB(t, dbMgr, webhookID) var receivedAuth string ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { receivedAuth = r.Header.Get("Authorization") w.WriteHeader(http.StatusOK) })) defer ts.Close() cfg := HTTPTargetConfig{ URL: ts.URL, Headers: map[string]string{"Authorization": "Bearer secret-token"}, } cfgJSON, err := json.Marshal(cfg) require.NoError(t, err) e := testEngineWithDB(t, mainDB, dbMgr) targetID := uuid.New().String() event := database.Event{ WebhookID: webhookID, EntrypointID: uuid.New().String(), Method: "POST", Headers: `{}`, Body: `{"auth":"test"}`, ContentType: "application/json", } require.NoError(t, webhookDB.Create(&event).Error) delivery := database.Delivery{ EventID: event.ID, TargetID: targetID, Status: database.DeliveryStatusPending, } require.NoError(t, webhookDB.Create(&delivery).Error) bodyStr := event.Body task := DeliveryTask{ DeliveryID: delivery.ID, EventID: event.ID, WebhookID: webhookID, TargetID: targetID, TargetName: "auth-target", TargetType: database.TargetTypeHTTP, TargetConfig: string(cfgJSON), MaxRetries: 0, Method: event.Method, Headers: event.Headers, ContentType: event.ContentType, Body: &bodyStr, AttemptNum: 1, } e.processNewTask(context.TODO(), &task) assert.Equal(t, "Bearer secret-token", receivedAuth) } // --- HTTP delivery with custom timeout --- func TestDeliverHTTP_TargetTimeout(t *testing.T) { t.Parallel() db := testWebhookDB(t) e := testEngine(t, 1) // Server that sleeps longer than the target timeout ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { time.Sleep(2 * time.Second) w.WriteHeader(http.StatusOK) })) defer ts.Close() cfg := HTTPTargetConfig{ URL: ts.URL, Timeout: 1, // 1 second timeout — shorter than server sleep } cfgJSON, err := json.Marshal(cfg) require.NoError(t, err) targetID := uuid.New().String() event := seedEvent(t, db, `{"timeout":"test"}`) delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending) task := &DeliveryTask{ DeliveryID: delivery.ID, EventID: event.ID, WebhookID: event.WebhookID, TargetID: targetID, TargetName: "timeout-target", TargetType: database.TargetTypeHTTP, TargetConfig: string(cfgJSON), MaxRetries: 0, AttemptNum: 1, } d := &database.Delivery{ EventID: event.ID, TargetID: targetID, Status: database.DeliveryStatusPending, Event: event, Target: database.Target{ Name: "timeout-target", Type: database.TargetTypeHTTP, Config: string(cfgJSON), }, } d.ID = delivery.ID e.deliverHTTP(context.TODO(), db, d, task) // Should fail due to timeout var updated database.Delivery require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error) assert.Equal(t, database.DeliveryStatusFailed, updated.Status) var result database.DeliveryResult require.NoError(t, db.Where("delivery_id = ?", delivery.ID).First(&result).Error) assert.False(t, result.Success) assert.NotEmpty(t, result.Error, "should have error message for timeout") } // --- HTTP request with invalid config --- func TestDeliverHTTP_InvalidConfig(t *testing.T) { t.Parallel() db := testWebhookDB(t) e := testEngine(t, 1) targetID := uuid.New().String() event := seedEvent(t, db, `{"config":"invalid"}`) delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending) task := &DeliveryTask{ DeliveryID: delivery.ID, EventID: event.ID, WebhookID: event.WebhookID, TargetID: targetID, TargetName: "bad-config", TargetType: database.TargetTypeHTTP, TargetConfig: `not-json`, MaxRetries: 0, AttemptNum: 1, } d := &database.Delivery{ EventID: event.ID, TargetID: targetID, Status: database.DeliveryStatusPending, Event: event, Target: database.Target{ Name: "bad-config", Type: database.TargetTypeHTTP, Config: `not-json`, }, } d.ID = delivery.ID e.deliverHTTP(context.TODO(), db, d, task) var updated database.Delivery require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error) assert.Equal(t, database.DeliveryStatusFailed, updated.Status) } // --- Notify batching --- func TestNotify_MultipleTasks(t *testing.T) { t.Parallel() e := testEngine(t, 1) tasks := make([]DeliveryTask, 5) for i := range tasks { tasks[i] = DeliveryTask{ DeliveryID: fmt.Sprintf("task-%d", i), } } e.Notify(tasks) // All tasks should be in the channel for i := 0; i < 5; i++ { select { case task := <-e.deliveryCh: assert.Equal(t, fmt.Sprintf("task-%d", i), task.DeliveryID) case <-time.After(time.Second): t.Fatalf("expected task %d on delivery channel", i) } } }