All checks were successful
check / check (push) Successful in 4s
Add comprehensive test coverage for three previously-untested packages: delivery (37% → 75%): - processNewTask with inline and large (DB-fetched) bodies - processRetryTask success, skip non-retrying, large body fetch - Worker lifecycle start/stop, retry channel processing - processDelivery unknown target type handling - recoverPendingDeliveries, recoverWebhookDeliveries, recoverInFlight - HTTP delivery with custom headers, timeout, invalid config - Notify batching middleware (0% → 70%): - Logging middleware status code capture and pass-through - LoggingResponseWriter delegation - CORS dev mode (allow-all) and prod mode (no-op) - RequireAuth redirect for unauthenticated, pass-through for authenticated - MetricsAuth basic auth validation - ipFromHostPort helper session (0% → 52%): - Get/Save round-trip with real cookie store - SetUser, GetUserID, GetUsername, IsAuthenticated - ClearUser removes all keys - Destroy invalidates session (MaxAge -1) - Session persistence across requests - Edge cases: overwrite user, wrong type, constants Test helpers added: - database.NewTestDatabase / NewTestWebhookDBManager for cross-package testing - session.NewForTest for middleware tests without fx lifecycle closes #28
1024 lines
27 KiB
Go
1024 lines
27 KiB
Go
package delivery
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"gorm.io/driver/sqlite"
|
|
"gorm.io/gorm"
|
|
_ "modernc.org/sqlite"
|
|
"sneak.berlin/go/webhooker/internal/database"
|
|
)
|
|
|
|
// testMainDB creates a real SQLite main database with the required tables
|
|
// (Webhook, Target, Setting, User, etc.) for integration tests.
|
|
func testMainDB(t *testing.T) *gorm.DB {
|
|
t.Helper()
|
|
dbPath := filepath.Join(t.TempDir(), "main-test.db")
|
|
dsn := fmt.Sprintf("file:%s?cache=shared&mode=rwc", dbPath)
|
|
|
|
sqlDB, err := sql.Open("sqlite", dsn)
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() { sqlDB.Close() })
|
|
|
|
db, err := gorm.Open(sqlite.Dialector{Conn: sqlDB}, &gorm.Config{})
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, db.AutoMigrate(
|
|
&database.Webhook{},
|
|
&database.Target{},
|
|
&database.User{},
|
|
&database.Setting{},
|
|
))
|
|
|
|
return db
|
|
}
|
|
|
|
// testDatabase wraps a *gorm.DB into a *database.Database for the engine.
|
|
func testDatabase(t *testing.T, db *gorm.DB) *database.Database {
|
|
t.Helper()
|
|
return database.NewTestDatabase(db)
|
|
}
|
|
|
|
// testDBManager creates a WebhookDBManager backed by a temp directory.
|
|
// Register per-webhook databases by calling seedWebhookDB.
|
|
func testDBManager(t *testing.T) *database.WebhookDBManager {
|
|
t.Helper()
|
|
dataDir := t.TempDir()
|
|
return database.NewTestWebhookDBManager(dataDir)
|
|
}
|
|
|
|
// seedWebhookDB creates a per-webhook database and registers it in the manager.
|
|
// Returns the webhookDB and the webhookID.
|
|
func seedWebhookDB(t *testing.T, mgr *database.WebhookDBManager, webhookID string) *gorm.DB {
|
|
t.Helper()
|
|
db, err := mgr.GetDB(webhookID)
|
|
require.NoError(t, err)
|
|
return db
|
|
}
|
|
|
|
// testEngineWithDB builds an Engine with a real database and dbManager.
|
|
func testEngineWithDB(t *testing.T, mainDB *gorm.DB, dbMgr *database.WebhookDBManager) *Engine {
|
|
t.Helper()
|
|
return &Engine{
|
|
database: testDatabase(t, mainDB),
|
|
dbManager: dbMgr,
|
|
log: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})),
|
|
client: &http.Client{Timeout: 5 * time.Second},
|
|
deliveryCh: make(chan DeliveryTask, deliveryChannelSize),
|
|
retryCh: make(chan DeliveryTask, retryChannelSize),
|
|
workers: 2,
|
|
}
|
|
}
|
|
|
|
// --- processNewTask Tests ---
|
|
|
|
func TestProcessNewTask_InlineBody(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
webhookID := uuid.New().String()
|
|
webhookDB := seedWebhookDB(t, dbMgr, webhookID)
|
|
|
|
var received atomic.Bool
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
received.Store(true)
|
|
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
|
|
w.WriteHeader(http.StatusOK)
|
|
fmt.Fprint(w, `{"ok":true}`)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
targetID := uuid.New().String()
|
|
|
|
// Seed event in per-webhook DB
|
|
event := database.Event{
|
|
WebhookID: webhookID,
|
|
EntrypointID: uuid.New().String(),
|
|
Method: "POST",
|
|
Headers: `{"Content-Type":["application/json"]}`,
|
|
Body: `{"hello":"world"}`,
|
|
ContentType: "application/json",
|
|
}
|
|
require.NoError(t, webhookDB.Create(&event).Error)
|
|
|
|
// Seed delivery in per-webhook DB
|
|
delivery := database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusPending,
|
|
}
|
|
require.NoError(t, webhookDB.Create(&delivery).Error)
|
|
|
|
bodyStr := event.Body
|
|
task := DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: webhookID,
|
|
TargetID: targetID,
|
|
TargetName: "test-target",
|
|
TargetType: database.TargetTypeHTTP,
|
|
TargetConfig: newHTTPTargetConfig(ts.URL),
|
|
MaxRetries: 0,
|
|
Method: event.Method,
|
|
Headers: event.Headers,
|
|
ContentType: event.ContentType,
|
|
Body: &bodyStr,
|
|
AttemptNum: 1,
|
|
}
|
|
|
|
e.processNewTask(context.TODO(), &task)
|
|
|
|
assert.True(t, received.Load(), "HTTP target should have received request")
|
|
|
|
var updated database.Delivery
|
|
require.NoError(t, webhookDB.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusDelivered, updated.Status)
|
|
}
|
|
|
|
func TestProcessNewTask_LargeBody_FetchFromDB(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
webhookID := uuid.New().String()
|
|
webhookDB := seedWebhookDB(t, dbMgr, webhookID)
|
|
|
|
largeBody := strings.Repeat("x", MaxInlineBodySize+100)
|
|
var receivedBody string
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
body, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
http.Error(w, "read error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
receivedBody = string(body)
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
targetID := uuid.New().String()
|
|
|
|
// Seed event with large body
|
|
event := database.Event{
|
|
WebhookID: webhookID,
|
|
EntrypointID: uuid.New().String(),
|
|
Method: "POST",
|
|
Headers: `{}`,
|
|
Body: largeBody,
|
|
ContentType: "text/plain",
|
|
}
|
|
require.NoError(t, webhookDB.Create(&event).Error)
|
|
|
|
delivery := database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusPending,
|
|
}
|
|
require.NoError(t, webhookDB.Create(&delivery).Error)
|
|
|
|
// Body is nil — engine should fetch from DB
|
|
task := DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: webhookID,
|
|
TargetID: targetID,
|
|
TargetName: "test-large-body",
|
|
TargetType: database.TargetTypeHTTP,
|
|
TargetConfig: newHTTPTargetConfig(ts.URL),
|
|
MaxRetries: 0,
|
|
Method: event.Method,
|
|
Headers: event.Headers,
|
|
ContentType: event.ContentType,
|
|
Body: nil, // Large body — must be fetched from DB
|
|
AttemptNum: 1,
|
|
}
|
|
|
|
e.processNewTask(context.TODO(), &task)
|
|
|
|
assert.Equal(t, largeBody, receivedBody, "engine should fetch large body from DB")
|
|
|
|
var updated database.Delivery
|
|
require.NoError(t, webhookDB.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusDelivered, updated.Status)
|
|
}
|
|
|
|
func TestProcessNewTask_InvalidWebhookID(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
|
|
// Use a webhook ID that has no database
|
|
// GetDB will create it lazily in the real impl, but the event won't exist
|
|
task := DeliveryTask{
|
|
DeliveryID: uuid.New().String(),
|
|
EventID: uuid.New().String(),
|
|
WebhookID: uuid.New().String(),
|
|
TargetID: uuid.New().String(),
|
|
TargetName: "test",
|
|
TargetType: database.TargetTypeHTTP,
|
|
TargetConfig: newHTTPTargetConfig("http://localhost:9999"),
|
|
MaxRetries: 0,
|
|
Body: nil, // Will try to fetch from DB — event won't be found
|
|
AttemptNum: 1,
|
|
}
|
|
|
|
// Should not panic — error is logged
|
|
e.processNewTask(context.TODO(), &task)
|
|
}
|
|
|
|
// --- processRetryTask Tests ---
|
|
|
|
func TestProcessRetryTask_SuccessfulRetry(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
webhookID := uuid.New().String()
|
|
webhookDB := seedWebhookDB(t, dbMgr, webhookID)
|
|
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
targetID := uuid.New().String()
|
|
|
|
event := database.Event{
|
|
WebhookID: webhookID,
|
|
EntrypointID: uuid.New().String(),
|
|
Method: "POST",
|
|
Headers: `{}`,
|
|
Body: `{"retry":"test"}`,
|
|
ContentType: "application/json",
|
|
}
|
|
require.NoError(t, webhookDB.Create(&event).Error)
|
|
|
|
// Create delivery in retrying status (simulates a prior failure)
|
|
delivery := database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusRetrying,
|
|
}
|
|
require.NoError(t, webhookDB.Create(&delivery).Error)
|
|
|
|
bodyStr := event.Body
|
|
task := DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: webhookID,
|
|
TargetID: targetID,
|
|
TargetName: "retry-target",
|
|
TargetType: database.TargetTypeHTTP,
|
|
TargetConfig: newHTTPTargetConfig(ts.URL),
|
|
MaxRetries: 5,
|
|
Method: event.Method,
|
|
Headers: event.Headers,
|
|
ContentType: event.ContentType,
|
|
Body: &bodyStr,
|
|
AttemptNum: 2,
|
|
}
|
|
|
|
e.processRetryTask(context.TODO(), &task)
|
|
|
|
var updated database.Delivery
|
|
require.NoError(t, webhookDB.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusDelivered, updated.Status)
|
|
}
|
|
|
|
func TestProcessRetryTask_SkipsNonRetryingDelivery(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
webhookID := uuid.New().String()
|
|
webhookDB := seedWebhookDB(t, dbMgr, webhookID)
|
|
|
|
// No HTTP server — if the delivery is processed it will fail,
|
|
// so we can verify it was skipped.
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
targetID := uuid.New().String()
|
|
|
|
event := database.Event{
|
|
WebhookID: webhookID,
|
|
EntrypointID: uuid.New().String(),
|
|
Method: "POST",
|
|
Headers: `{}`,
|
|
Body: `{"skip":"test"}`,
|
|
ContentType: "application/json",
|
|
}
|
|
require.NoError(t, webhookDB.Create(&event).Error)
|
|
|
|
// Delivery is already delivered — processRetryTask should skip it
|
|
delivery := database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusDelivered,
|
|
}
|
|
require.NoError(t, webhookDB.Create(&delivery).Error)
|
|
|
|
bodyStr := event.Body
|
|
task := DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: webhookID,
|
|
TargetID: targetID,
|
|
TargetName: "skip-target",
|
|
TargetType: database.TargetTypeHTTP,
|
|
TargetConfig: newHTTPTargetConfig("http://localhost:1"),
|
|
MaxRetries: 5,
|
|
Method: event.Method,
|
|
Headers: event.Headers,
|
|
ContentType: event.ContentType,
|
|
Body: &bodyStr,
|
|
AttemptNum: 2,
|
|
}
|
|
|
|
e.processRetryTask(context.TODO(), &task)
|
|
|
|
// Status should remain delivered (was not changed)
|
|
var updated database.Delivery
|
|
require.NoError(t, webhookDB.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusDelivered, updated.Status,
|
|
"processRetryTask should skip delivery that is no longer retrying")
|
|
}
|
|
|
|
func TestProcessRetryTask_LargeBody_FetchFromDB(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
webhookID := uuid.New().String()
|
|
webhookDB := seedWebhookDB(t, dbMgr, webhookID)
|
|
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
targetID := uuid.New().String()
|
|
|
|
largeBody := strings.Repeat("z", MaxInlineBodySize+50)
|
|
event := database.Event{
|
|
WebhookID: webhookID,
|
|
EntrypointID: uuid.New().String(),
|
|
Method: "POST",
|
|
Headers: `{}`,
|
|
Body: largeBody,
|
|
ContentType: "text/plain",
|
|
}
|
|
require.NoError(t, webhookDB.Create(&event).Error)
|
|
|
|
delivery := database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusRetrying,
|
|
}
|
|
require.NoError(t, webhookDB.Create(&delivery).Error)
|
|
|
|
task := DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: webhookID,
|
|
TargetID: targetID,
|
|
TargetName: "retry-large",
|
|
TargetType: database.TargetTypeHTTP,
|
|
TargetConfig: newHTTPTargetConfig(ts.URL),
|
|
MaxRetries: 5,
|
|
Method: event.Method,
|
|
Headers: event.Headers,
|
|
ContentType: event.ContentType,
|
|
Body: nil, // Large body — fetch from DB
|
|
AttemptNum: 2,
|
|
}
|
|
|
|
e.processRetryTask(context.TODO(), &task)
|
|
|
|
var updated database.Delivery
|
|
require.NoError(t, webhookDB.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusDelivered, updated.Status)
|
|
}
|
|
|
|
// --- Worker Lifecycle Tests ---
|
|
|
|
func TestWorkerLifecycle_StartStop(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
|
|
// Start the engine
|
|
e.start()
|
|
|
|
// Verify workers are running by sending a task through the channel
|
|
webhookID := uuid.New().String()
|
|
webhookDB := seedWebhookDB(t, dbMgr, webhookID)
|
|
|
|
event := database.Event{
|
|
WebhookID: webhookID,
|
|
EntrypointID: uuid.New().String(),
|
|
Method: "POST",
|
|
Headers: `{}`,
|
|
Body: `{"lifecycle":"test"}`,
|
|
ContentType: "application/json",
|
|
}
|
|
require.NoError(t, webhookDB.Create(&event).Error)
|
|
|
|
delivery := database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: uuid.New().String(),
|
|
Status: database.DeliveryStatusPending,
|
|
}
|
|
require.NoError(t, webhookDB.Create(&delivery).Error)
|
|
|
|
bodyStr := event.Body
|
|
task := DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: webhookID,
|
|
TargetID: delivery.TargetID,
|
|
TargetName: "lifecycle-test",
|
|
TargetType: database.TargetTypeLog,
|
|
TargetConfig: "",
|
|
MaxRetries: 0,
|
|
Method: event.Method,
|
|
Headers: event.Headers,
|
|
ContentType: event.ContentType,
|
|
Body: &bodyStr,
|
|
AttemptNum: 1,
|
|
}
|
|
|
|
e.Notify([]DeliveryTask{task})
|
|
|
|
// Wait for the worker to process the task
|
|
require.Eventually(t, func() bool {
|
|
var d database.Delivery
|
|
if err := webhookDB.First(&d, "id = ?", delivery.ID).Error; err != nil {
|
|
return false
|
|
}
|
|
return d.Status == database.DeliveryStatusDelivered
|
|
}, 5*time.Second, 50*time.Millisecond,
|
|
"worker should process the delivery task")
|
|
|
|
// Stop the engine cleanly
|
|
e.stop()
|
|
}
|
|
|
|
func TestWorkerLifecycle_ProcessesRetryChannel(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
|
|
webhookID := uuid.New().String()
|
|
webhookDB := seedWebhookDB(t, dbMgr, webhookID)
|
|
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
event := database.Event{
|
|
WebhookID: webhookID,
|
|
EntrypointID: uuid.New().String(),
|
|
Method: "POST",
|
|
Headers: `{}`,
|
|
Body: `{"retry-chan":"test"}`,
|
|
ContentType: "application/json",
|
|
}
|
|
require.NoError(t, webhookDB.Create(&event).Error)
|
|
|
|
targetID := uuid.New().String()
|
|
delivery := database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusRetrying,
|
|
}
|
|
require.NoError(t, webhookDB.Create(&delivery).Error)
|
|
|
|
// Start the engine
|
|
e.start()
|
|
|
|
// Send task directly to retry channel
|
|
bodyStr := event.Body
|
|
task := DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: webhookID,
|
|
TargetID: targetID,
|
|
TargetName: "retry-chan-test",
|
|
TargetType: database.TargetTypeHTTP,
|
|
TargetConfig: newHTTPTargetConfig(ts.URL),
|
|
MaxRetries: 5,
|
|
Method: event.Method,
|
|
Headers: event.Headers,
|
|
ContentType: event.ContentType,
|
|
Body: &bodyStr,
|
|
AttemptNum: 2,
|
|
}
|
|
|
|
e.retryCh <- task
|
|
|
|
require.Eventually(t, func() bool {
|
|
var d database.Delivery
|
|
if err := webhookDB.First(&d, "id = ?", delivery.ID).Error; err != nil {
|
|
return false
|
|
}
|
|
return d.Status == database.DeliveryStatusDelivered
|
|
}, 5*time.Second, 50*time.Millisecond,
|
|
"worker should process task from retry channel")
|
|
|
|
e.stop()
|
|
}
|
|
|
|
// --- processDelivery: unknown target type ---
|
|
|
|
func TestProcessDelivery_UnknownTargetType(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
webhookID := uuid.New().String()
|
|
webhookDB := seedWebhookDB(t, dbMgr, webhookID)
|
|
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
|
|
event := database.Event{
|
|
WebhookID: webhookID,
|
|
EntrypointID: uuid.New().String(),
|
|
Method: "POST",
|
|
Headers: `{}`,
|
|
Body: `{"unknown":"type"}`,
|
|
ContentType: "application/json",
|
|
}
|
|
require.NoError(t, webhookDB.Create(&event).Error)
|
|
|
|
delivery := database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: uuid.New().String(),
|
|
Status: database.DeliveryStatusPending,
|
|
}
|
|
require.NoError(t, webhookDB.Create(&delivery).Error)
|
|
|
|
d := &database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: delivery.TargetID,
|
|
Status: database.DeliveryStatusPending,
|
|
Event: event,
|
|
Target: database.Target{
|
|
Name: "unknown",
|
|
Type: database.TargetType("unknown"),
|
|
},
|
|
}
|
|
d.ID = delivery.ID
|
|
|
|
task := &DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
TargetType: database.TargetType("unknown"),
|
|
}
|
|
|
|
e.processDelivery(context.TODO(), webhookDB, d, task)
|
|
|
|
var updated database.Delivery
|
|
require.NoError(t, webhookDB.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusFailed, updated.Status,
|
|
"unknown target type should result in failed status")
|
|
}
|
|
|
|
// --- Recovery Tests ---
|
|
|
|
func TestRecoverPendingDeliveries(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
webhookID := uuid.New().String()
|
|
webhookDB := seedWebhookDB(t, dbMgr, webhookID)
|
|
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
targetID := uuid.New().String()
|
|
|
|
// Create a target in the main DB
|
|
target := database.Target{
|
|
WebhookID: webhookID,
|
|
Name: "recovery-target",
|
|
Type: database.TargetTypeLog,
|
|
Active: true,
|
|
Config: "",
|
|
MaxRetries: 0,
|
|
}
|
|
target.ID = targetID
|
|
require.NoError(t, mainDB.Create(&target).Error)
|
|
|
|
// Create pending deliveries in the per-webhook DB
|
|
events := make([]database.Event, 3)
|
|
deliveries := make([]database.Delivery, 3)
|
|
for i := 0; i < 3; i++ {
|
|
events[i] = database.Event{
|
|
WebhookID: webhookID,
|
|
EntrypointID: uuid.New().String(),
|
|
Method: "POST",
|
|
Headers: `{}`,
|
|
Body: fmt.Sprintf(`{"recovery":%d}`, i),
|
|
ContentType: "application/json",
|
|
}
|
|
require.NoError(t, webhookDB.Create(&events[i]).Error)
|
|
|
|
deliveries[i] = database.Delivery{
|
|
EventID: events[i].ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusPending,
|
|
}
|
|
require.NoError(t, webhookDB.Create(&deliveries[i]).Error)
|
|
}
|
|
|
|
// Run recovery — should send tasks to the delivery channel
|
|
e.recoverPendingDeliveries(context.Background(), webhookDB, webhookID)
|
|
|
|
// Verify tasks were sent to the delivery channel
|
|
for i := 0; i < 3; i++ {
|
|
select {
|
|
case task := <-e.deliveryCh:
|
|
assert.Equal(t, targetID, task.TargetID)
|
|
assert.Equal(t, database.TargetTypeLog, task.TargetType)
|
|
assert.Equal(t, 1, task.AttemptNum)
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatalf("expected task %d on delivery channel", i)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestRecoverWebhookDeliveries_RetryingDeliveries(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
webhookID := uuid.New().String()
|
|
webhookDB := seedWebhookDB(t, dbMgr, webhookID)
|
|
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
targetID := uuid.New().String()
|
|
|
|
// Create target in main DB
|
|
target := database.Target{
|
|
WebhookID: webhookID,
|
|
Name: "retry-recovery",
|
|
Type: database.TargetTypeHTTP,
|
|
Active: true,
|
|
Config: newHTTPTargetConfig("http://example.com/hook"),
|
|
MaxRetries: 5,
|
|
}
|
|
target.ID = targetID
|
|
require.NoError(t, mainDB.Create(&target).Error)
|
|
|
|
// Create a retrying delivery with a prior result
|
|
event := database.Event{
|
|
WebhookID: webhookID,
|
|
EntrypointID: uuid.New().String(),
|
|
Method: "POST",
|
|
Headers: `{}`,
|
|
Body: `{"retry-recovery":"test"}`,
|
|
ContentType: "application/json",
|
|
}
|
|
require.NoError(t, webhookDB.Create(&event).Error)
|
|
|
|
delivery := database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusRetrying,
|
|
}
|
|
require.NoError(t, webhookDB.Create(&delivery).Error)
|
|
|
|
// Create a delivery result (simulates a prior failed attempt)
|
|
result := database.DeliveryResult{
|
|
DeliveryID: delivery.ID,
|
|
AttemptNum: 1,
|
|
Success: false,
|
|
StatusCode: 500,
|
|
Error: "server error",
|
|
}
|
|
require.NoError(t, webhookDB.Create(&result).Error)
|
|
|
|
// Create a webhook record in the main DB so recoverInFlight can find it
|
|
webhook := database.Webhook{
|
|
UserID: uuid.New().String(),
|
|
Name: "test-webhook",
|
|
}
|
|
webhook.ID = webhookID
|
|
require.NoError(t, mainDB.Create(&webhook).Error)
|
|
|
|
// Run recovery — retrying deliveries get timers scheduled
|
|
e.recoverWebhookDeliveries(context.Background(), webhookID)
|
|
|
|
// The delivery timer fires into the retry channel. Since the last result
|
|
// was just created, the remaining backoff should be ~1s (2^0=1s for
|
|
// attempt 1). We'll wait a bit and check if a task appears.
|
|
select {
|
|
case task := <-e.retryCh:
|
|
assert.Equal(t, delivery.ID, task.DeliveryID)
|
|
assert.Equal(t, targetID, task.TargetID)
|
|
assert.Equal(t, 2, task.AttemptNum)
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("expected retry task on retry channel from recovery")
|
|
}
|
|
}
|
|
|
|
// --- recoverInFlight Tests ---
|
|
|
|
func TestRecoverInFlight_NoWebhooks(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
|
|
// Should not panic with no webhooks
|
|
e.recoverInFlight(context.Background())
|
|
}
|
|
|
|
func TestRecoverInFlight_WithPendingDeliveries(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
webhookID := uuid.New().String()
|
|
webhookDB := seedWebhookDB(t, dbMgr, webhookID)
|
|
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
targetID := uuid.New().String()
|
|
|
|
// Create webhook in main DB
|
|
webhook := database.Webhook{
|
|
UserID: uuid.New().String(),
|
|
Name: "recover-test",
|
|
}
|
|
webhook.ID = webhookID
|
|
require.NoError(t, mainDB.Create(&webhook).Error)
|
|
|
|
// Create target in main DB
|
|
target := database.Target{
|
|
WebhookID: webhookID,
|
|
Name: "recover-target",
|
|
Type: database.TargetTypeLog,
|
|
Active: true,
|
|
MaxRetries: 0,
|
|
}
|
|
target.ID = targetID
|
|
require.NoError(t, mainDB.Create(&target).Error)
|
|
|
|
// Create pending delivery
|
|
event := database.Event{
|
|
WebhookID: webhookID,
|
|
EntrypointID: uuid.New().String(),
|
|
Method: "POST",
|
|
Headers: `{}`,
|
|
Body: `{"recover":"inflight"}`,
|
|
ContentType: "application/json",
|
|
}
|
|
require.NoError(t, webhookDB.Create(&event).Error)
|
|
|
|
delivery := database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusPending,
|
|
}
|
|
require.NoError(t, webhookDB.Create(&delivery).Error)
|
|
|
|
// Run recovery
|
|
e.recoverInFlight(context.Background())
|
|
|
|
// Should have pushed a task to the delivery channel
|
|
select {
|
|
case task := <-e.deliveryCh:
|
|
assert.Equal(t, delivery.ID, task.DeliveryID)
|
|
assert.Equal(t, database.TargetTypeLog, task.TargetType)
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("expected task on delivery channel from recoverInFlight")
|
|
}
|
|
}
|
|
|
|
// --- HTTP Config with custom headers ---
|
|
|
|
func TestDeliverHTTP_CustomTargetHeaders(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
mainDB := testMainDB(t)
|
|
dbMgr := testDBManager(t)
|
|
webhookID := uuid.New().String()
|
|
webhookDB := seedWebhookDB(t, dbMgr, webhookID)
|
|
|
|
var receivedAuth string
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
receivedAuth = r.Header.Get("Authorization")
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
cfg := HTTPTargetConfig{
|
|
URL: ts.URL,
|
|
Headers: map[string]string{"Authorization": "Bearer secret-token"},
|
|
}
|
|
cfgJSON, err := json.Marshal(cfg)
|
|
require.NoError(t, err)
|
|
|
|
e := testEngineWithDB(t, mainDB, dbMgr)
|
|
targetID := uuid.New().String()
|
|
|
|
event := database.Event{
|
|
WebhookID: webhookID,
|
|
EntrypointID: uuid.New().String(),
|
|
Method: "POST",
|
|
Headers: `{}`,
|
|
Body: `{"auth":"test"}`,
|
|
ContentType: "application/json",
|
|
}
|
|
require.NoError(t, webhookDB.Create(&event).Error)
|
|
|
|
delivery := database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusPending,
|
|
}
|
|
require.NoError(t, webhookDB.Create(&delivery).Error)
|
|
|
|
bodyStr := event.Body
|
|
task := DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: webhookID,
|
|
TargetID: targetID,
|
|
TargetName: "auth-target",
|
|
TargetType: database.TargetTypeHTTP,
|
|
TargetConfig: string(cfgJSON),
|
|
MaxRetries: 0,
|
|
Method: event.Method,
|
|
Headers: event.Headers,
|
|
ContentType: event.ContentType,
|
|
Body: &bodyStr,
|
|
AttemptNum: 1,
|
|
}
|
|
|
|
e.processNewTask(context.TODO(), &task)
|
|
|
|
assert.Equal(t, "Bearer secret-token", receivedAuth)
|
|
}
|
|
|
|
// --- HTTP delivery with custom timeout ---
|
|
|
|
func TestDeliverHTTP_TargetTimeout(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
db := testWebhookDB(t)
|
|
e := testEngine(t, 1)
|
|
|
|
// Server that sleeps longer than the target timeout
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
time.Sleep(2 * time.Second)
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
cfg := HTTPTargetConfig{
|
|
URL: ts.URL,
|
|
Timeout: 1, // 1 second timeout — shorter than server sleep
|
|
}
|
|
cfgJSON, err := json.Marshal(cfg)
|
|
require.NoError(t, err)
|
|
|
|
targetID := uuid.New().String()
|
|
event := seedEvent(t, db, `{"timeout":"test"}`)
|
|
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending)
|
|
|
|
task := &DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: event.WebhookID,
|
|
TargetID: targetID,
|
|
TargetName: "timeout-target",
|
|
TargetType: database.TargetTypeHTTP,
|
|
TargetConfig: string(cfgJSON),
|
|
MaxRetries: 0,
|
|
AttemptNum: 1,
|
|
}
|
|
|
|
d := &database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusPending,
|
|
Event: event,
|
|
Target: database.Target{
|
|
Name: "timeout-target",
|
|
Type: database.TargetTypeHTTP,
|
|
Config: string(cfgJSON),
|
|
},
|
|
}
|
|
d.ID = delivery.ID
|
|
|
|
e.deliverHTTP(context.TODO(), db, d, task)
|
|
|
|
// Should fail due to timeout
|
|
var updated database.Delivery
|
|
require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusFailed, updated.Status)
|
|
|
|
var result database.DeliveryResult
|
|
require.NoError(t, db.Where("delivery_id = ?", delivery.ID).First(&result).Error)
|
|
assert.False(t, result.Success)
|
|
assert.NotEmpty(t, result.Error, "should have error message for timeout")
|
|
}
|
|
|
|
// --- HTTP request with invalid config ---
|
|
|
|
func TestDeliverHTTP_InvalidConfig(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
db := testWebhookDB(t)
|
|
e := testEngine(t, 1)
|
|
|
|
targetID := uuid.New().String()
|
|
event := seedEvent(t, db, `{"config":"invalid"}`)
|
|
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending)
|
|
|
|
task := &DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: event.WebhookID,
|
|
TargetID: targetID,
|
|
TargetName: "bad-config",
|
|
TargetType: database.TargetTypeHTTP,
|
|
TargetConfig: `not-json`,
|
|
MaxRetries: 0,
|
|
AttemptNum: 1,
|
|
}
|
|
|
|
d := &database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusPending,
|
|
Event: event,
|
|
Target: database.Target{
|
|
Name: "bad-config",
|
|
Type: database.TargetTypeHTTP,
|
|
Config: `not-json`,
|
|
},
|
|
}
|
|
d.ID = delivery.ID
|
|
|
|
e.deliverHTTP(context.TODO(), db, d, task)
|
|
|
|
var updated database.Delivery
|
|
require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusFailed, updated.Status)
|
|
}
|
|
|
|
// --- Notify batching ---
|
|
|
|
func TestNotify_MultipleTasks(t *testing.T) {
|
|
t.Parallel()
|
|
e := testEngine(t, 1)
|
|
|
|
tasks := make([]DeliveryTask, 5)
|
|
for i := range tasks {
|
|
tasks[i] = DeliveryTask{
|
|
DeliveryID: fmt.Sprintf("task-%d", i),
|
|
}
|
|
}
|
|
|
|
e.Notify(tasks)
|
|
|
|
// All tasks should be in the channel
|
|
for i := 0; i < 5; i++ {
|
|
select {
|
|
case task := <-e.deliveryCh:
|
|
assert.Equal(t, fmt.Sprintf("task-%d", i), task.DeliveryID)
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("expected task %d on delivery channel", i)
|
|
}
|
|
}
|
|
}
|