All checks were successful
check / check (push) Successful in 1m48s
Add unit tests for internal/delivery/ package covering: Circuit breaker tests (circuit_breaker_test.go): - Closed state allows deliveries - Failure counting below threshold - Open transition after threshold failures - Cooldown blocks during cooldown period - Half-open transition after cooldown expires - Probe success closes circuit - Probe failure reopens circuit - Success resets failure counter - Concurrent access safety (race-safe) - CooldownRemaining for all states - CircuitState String() output Engine tests (engine_test.go): - Non-blocking Notify when channel is full - HTTP target success and failure delivery - Database target immediate success - Log target immediate success - Retry target success with circuit breaker - Max retries exhausted marks delivery failed - Retry scheduling on failure - Exponential backoff duration verification - Backoff cap at shift 30 - Body pointer semantics (inline <16KB, nil >=16KB) - Worker pool bounded concurrency - Circuit breaker blocks delivery attempts - Circuit breaker per-target creation - HTTP config parsing (valid, empty, missing URL) - scheduleRetry sends to retry channel - scheduleRetry drops when channel full - Header forwarding (forwardable vs hop-by-hop) - processDelivery routing to correct handler - Truncate helper function All tests use real SQLite databases and httptest servers. All tests pass with -race flag.
896 lines
25 KiB
Go
896 lines
25 KiB
Go
package delivery
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"gorm.io/driver/sqlite"
|
|
"gorm.io/gorm"
|
|
_ "modernc.org/sqlite"
|
|
"sneak.berlin/go/webhooker/internal/database"
|
|
)
|
|
|
|
// testWebhookDB creates a real SQLite per-webhook database in a temp dir
|
|
// and runs the event-tier migrations (Event, Delivery, DeliveryResult).
|
|
func testWebhookDB(t *testing.T) *gorm.DB {
|
|
t.Helper()
|
|
dbPath := filepath.Join(t.TempDir(), "events-test.db")
|
|
dsn := fmt.Sprintf("file:%s?cache=shared&mode=rwc", dbPath)
|
|
|
|
sqlDB, err := sql.Open("sqlite", dsn)
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() { sqlDB.Close() })
|
|
|
|
db, err := gorm.Open(sqlite.Dialector{Conn: sqlDB}, &gorm.Config{})
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, db.AutoMigrate(
|
|
&database.Event{},
|
|
&database.Delivery{},
|
|
&database.DeliveryResult{},
|
|
))
|
|
|
|
return db
|
|
}
|
|
|
|
// testEngine builds an Engine with custom settings for testing. It does
|
|
// NOT call start() — callers control lifecycle for deterministic tests.
|
|
func testEngine(t *testing.T, workers int) *Engine {
|
|
t.Helper()
|
|
return &Engine{
|
|
log: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})),
|
|
client: &http.Client{Timeout: 5 * time.Second},
|
|
deliveryCh: make(chan DeliveryTask, deliveryChannelSize),
|
|
retryCh: make(chan DeliveryTask, retryChannelSize),
|
|
workers: workers,
|
|
}
|
|
}
|
|
|
|
// newHTTPTargetConfig returns a JSON config for an HTTP/retry target
|
|
// pointing at the given URL.
|
|
func newHTTPTargetConfig(url string) string {
|
|
cfg := HTTPTargetConfig{URL: url}
|
|
data, err := json.Marshal(cfg)
|
|
if err != nil {
|
|
panic("failed to marshal HTTPTargetConfig: " + err.Error())
|
|
}
|
|
return string(data)
|
|
}
|
|
|
|
// seedEvent inserts an event into the per-webhook DB and returns it.
|
|
func seedEvent(t *testing.T, db *gorm.DB, body string) database.Event {
|
|
t.Helper()
|
|
event := database.Event{
|
|
WebhookID: uuid.New().String(),
|
|
EntrypointID: uuid.New().String(),
|
|
Method: "POST",
|
|
Headers: `{"Content-Type":["application/json"]}`,
|
|
Body: body,
|
|
ContentType: "application/json",
|
|
}
|
|
require.NoError(t, db.Create(&event).Error)
|
|
return event
|
|
}
|
|
|
|
// seedDelivery inserts a delivery for an event + target and returns it.
|
|
func seedDelivery(t *testing.T, db *gorm.DB, eventID, targetID string, status database.DeliveryStatus) database.Delivery {
|
|
t.Helper()
|
|
d := database.Delivery{
|
|
EventID: eventID,
|
|
TargetID: targetID,
|
|
Status: status,
|
|
}
|
|
require.NoError(t, db.Create(&d).Error)
|
|
return d
|
|
}
|
|
|
|
// --- Tests ---
|
|
|
|
func TestNotify_NonBlocking(t *testing.T) {
|
|
t.Parallel()
|
|
e := testEngine(t, 1)
|
|
|
|
// Fill the delivery channel to capacity
|
|
for i := 0; i < deliveryChannelSize; i++ {
|
|
e.deliveryCh <- DeliveryTask{DeliveryID: fmt.Sprintf("fill-%d", i)}
|
|
}
|
|
|
|
// Notify should NOT block even though channel is full
|
|
done := make(chan struct{})
|
|
go func() {
|
|
e.Notify([]DeliveryTask{
|
|
{DeliveryID: "overflow-1"},
|
|
{DeliveryID: "overflow-2"},
|
|
})
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
// success: Notify returned without blocking
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("Notify blocked when delivery channel was full")
|
|
}
|
|
}
|
|
|
|
func TestDeliverHTTP_Success(t *testing.T) {
|
|
t.Parallel()
|
|
db := testWebhookDB(t)
|
|
|
|
var received atomic.Bool
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
received.Store(true)
|
|
w.WriteHeader(http.StatusOK)
|
|
fmt.Fprint(w, `{"ok":true}`)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
e := testEngine(t, 1)
|
|
|
|
event := seedEvent(t, db, `{"hello":"world"}`)
|
|
delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending)
|
|
|
|
d := &database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: delivery.TargetID,
|
|
Status: database.DeliveryStatusPending,
|
|
Event: event,
|
|
Target: database.Target{
|
|
Name: "test-http",
|
|
Type: database.TargetTypeHTTP,
|
|
Config: newHTTPTargetConfig(ts.URL),
|
|
},
|
|
}
|
|
d.ID = delivery.ID
|
|
|
|
e.deliverHTTP(context.TODO(), db, d)
|
|
|
|
assert.True(t, received.Load(), "HTTP target should have received request")
|
|
|
|
// Check DB: delivery should be delivered
|
|
var updated database.Delivery
|
|
require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusDelivered, updated.Status)
|
|
|
|
// Check that a result was recorded
|
|
var result database.DeliveryResult
|
|
require.NoError(t, db.Where("delivery_id = ?", delivery.ID).First(&result).Error)
|
|
assert.True(t, result.Success)
|
|
assert.Equal(t, http.StatusOK, result.StatusCode)
|
|
}
|
|
|
|
func TestDeliverHTTP_Failure(t *testing.T) {
|
|
t.Parallel()
|
|
db := testWebhookDB(t)
|
|
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
fmt.Fprint(w, "internal error")
|
|
}))
|
|
defer ts.Close()
|
|
|
|
e := testEngine(t, 1)
|
|
|
|
event := seedEvent(t, db, `{"test":true}`)
|
|
delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending)
|
|
|
|
d := &database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: delivery.TargetID,
|
|
Status: database.DeliveryStatusPending,
|
|
Event: event,
|
|
Target: database.Target{
|
|
Name: "test-http-fail",
|
|
Type: database.TargetTypeHTTP,
|
|
Config: newHTTPTargetConfig(ts.URL),
|
|
},
|
|
}
|
|
d.ID = delivery.ID
|
|
|
|
e.deliverHTTP(context.TODO(), db, d)
|
|
|
|
// HTTP (fire-and-forget) marks as failed on non-2xx
|
|
var updated database.Delivery
|
|
require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusFailed, updated.Status)
|
|
|
|
var result database.DeliveryResult
|
|
require.NoError(t, db.Where("delivery_id = ?", delivery.ID).First(&result).Error)
|
|
assert.False(t, result.Success)
|
|
assert.Equal(t, http.StatusInternalServerError, result.StatusCode)
|
|
}
|
|
|
|
func TestDeliverDatabase_ImmediateSuccess(t *testing.T) {
|
|
t.Parallel()
|
|
db := testWebhookDB(t)
|
|
e := testEngine(t, 1)
|
|
|
|
event := seedEvent(t, db, `{"db":"target"}`)
|
|
delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending)
|
|
|
|
d := &database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: delivery.TargetID,
|
|
Status: database.DeliveryStatusPending,
|
|
Event: event,
|
|
Target: database.Target{
|
|
Name: "test-db",
|
|
Type: database.TargetTypeDatabase,
|
|
},
|
|
}
|
|
d.ID = delivery.ID
|
|
|
|
e.deliverDatabase(db, d)
|
|
|
|
var updated database.Delivery
|
|
require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusDelivered, updated.Status,
|
|
"database target should immediately succeed")
|
|
|
|
var result database.DeliveryResult
|
|
require.NoError(t, db.Where("delivery_id = ?", delivery.ID).First(&result).Error)
|
|
assert.True(t, result.Success)
|
|
assert.Equal(t, 0, result.StatusCode, "database target should not have an HTTP status code")
|
|
}
|
|
|
|
func TestDeliverLog_ImmediateSuccess(t *testing.T) {
|
|
t.Parallel()
|
|
db := testWebhookDB(t)
|
|
e := testEngine(t, 1)
|
|
|
|
event := seedEvent(t, db, `{"log":"target"}`)
|
|
delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending)
|
|
|
|
d := &database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: delivery.TargetID,
|
|
Status: database.DeliveryStatusPending,
|
|
Event: event,
|
|
Target: database.Target{
|
|
Name: "test-log",
|
|
Type: database.TargetTypeLog,
|
|
},
|
|
}
|
|
d.ID = delivery.ID
|
|
|
|
e.deliverLog(db, d)
|
|
|
|
var updated database.Delivery
|
|
require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusDelivered, updated.Status,
|
|
"log target should immediately succeed")
|
|
|
|
var result database.DeliveryResult
|
|
require.NoError(t, db.Where("delivery_id = ?", delivery.ID).First(&result).Error)
|
|
assert.True(t, result.Success)
|
|
}
|
|
|
|
func TestDeliverRetry_Success(t *testing.T) {
|
|
t.Parallel()
|
|
db := testWebhookDB(t)
|
|
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
e := testEngine(t, 1)
|
|
targetID := uuid.New().String()
|
|
|
|
event := seedEvent(t, db, `{"retry":"ok"}`)
|
|
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending)
|
|
|
|
task := &DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: event.WebhookID,
|
|
TargetID: targetID,
|
|
TargetName: "test-retry",
|
|
TargetType: database.TargetTypeRetry,
|
|
TargetConfig: newHTTPTargetConfig(ts.URL),
|
|
MaxRetries: 5,
|
|
AttemptNum: 1,
|
|
}
|
|
|
|
d := &database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusPending,
|
|
Event: event,
|
|
Target: database.Target{
|
|
Name: "test-retry",
|
|
Type: database.TargetTypeRetry,
|
|
Config: newHTTPTargetConfig(ts.URL),
|
|
MaxRetries: 5,
|
|
},
|
|
}
|
|
d.ID = delivery.ID
|
|
d.Target.ID = targetID
|
|
|
|
e.deliverRetry(context.TODO(), db, d, task)
|
|
|
|
var updated database.Delivery
|
|
require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusDelivered, updated.Status)
|
|
|
|
// Circuit breaker should have recorded success
|
|
cb := e.getCircuitBreaker(targetID)
|
|
assert.Equal(t, CircuitClosed, cb.State())
|
|
}
|
|
|
|
func TestDeliverRetry_MaxRetriesExhausted(t *testing.T) {
|
|
t.Parallel()
|
|
db := testWebhookDB(t)
|
|
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.WriteHeader(http.StatusBadGateway)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
e := testEngine(t, 1)
|
|
targetID := uuid.New().String()
|
|
|
|
event := seedEvent(t, db, `{"retry":"exhaust"}`)
|
|
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusRetrying)
|
|
|
|
maxRetries := 3
|
|
task := &DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: event.WebhookID,
|
|
TargetID: targetID,
|
|
TargetName: "test-retry-exhaust",
|
|
TargetType: database.TargetTypeRetry,
|
|
TargetConfig: newHTTPTargetConfig(ts.URL),
|
|
MaxRetries: maxRetries,
|
|
AttemptNum: maxRetries, // final attempt
|
|
}
|
|
|
|
d := &database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusRetrying,
|
|
Event: event,
|
|
Target: database.Target{
|
|
Name: "test-retry-exhaust",
|
|
Type: database.TargetTypeRetry,
|
|
Config: newHTTPTargetConfig(ts.URL),
|
|
MaxRetries: maxRetries,
|
|
},
|
|
}
|
|
d.ID = delivery.ID
|
|
d.Target.ID = targetID
|
|
|
|
e.deliverRetry(context.TODO(), db, d, task)
|
|
|
|
// After max retries exhausted, delivery should be failed
|
|
var updated database.Delivery
|
|
require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusFailed, updated.Status,
|
|
"delivery should be failed after max retries exhausted")
|
|
}
|
|
|
|
func TestDeliverRetry_SchedulesRetryOnFailure(t *testing.T) {
|
|
t.Parallel()
|
|
db := testWebhookDB(t)
|
|
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
e := testEngine(t, 1)
|
|
targetID := uuid.New().String()
|
|
|
|
event := seedEvent(t, db, `{"retry":"schedule"}`)
|
|
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending)
|
|
|
|
task := &DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: event.WebhookID,
|
|
TargetID: targetID,
|
|
TargetName: "test-retry-schedule",
|
|
TargetType: database.TargetTypeRetry,
|
|
TargetConfig: newHTTPTargetConfig(ts.URL),
|
|
MaxRetries: 5,
|
|
AttemptNum: 1,
|
|
}
|
|
|
|
d := &database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusPending,
|
|
Event: event,
|
|
Target: database.Target{
|
|
Name: "test-retry-schedule",
|
|
Type: database.TargetTypeRetry,
|
|
Config: newHTTPTargetConfig(ts.URL),
|
|
MaxRetries: 5,
|
|
},
|
|
}
|
|
d.ID = delivery.ID
|
|
d.Target.ID = targetID
|
|
|
|
e.deliverRetry(context.TODO(), db, d, task)
|
|
|
|
// Delivery should be in retrying status (not failed — retries remain)
|
|
var updated database.Delivery
|
|
require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusRetrying, updated.Status,
|
|
"delivery should be retrying when retries remain")
|
|
|
|
// The timer should fire a task into the retry channel. Wait briefly
|
|
// for the timer (backoff for attempt 1 is 1s, but we're just verifying
|
|
// the status was set correctly and a result was recorded).
|
|
var result database.DeliveryResult
|
|
require.NoError(t, db.Where("delivery_id = ?", delivery.ID).First(&result).Error)
|
|
assert.False(t, result.Success)
|
|
assert.Equal(t, 1, result.AttemptNum)
|
|
}
|
|
|
|
func TestExponentialBackoff_Durations(t *testing.T) {
|
|
t.Parallel()
|
|
// The engine uses: backoff = 2^(attemptNum-1) seconds
|
|
// attempt 1 → shift=0 → 1s
|
|
// attempt 2 → shift=1 → 2s
|
|
// attempt 3 → shift=2 → 4s
|
|
// attempt 4 → shift=3 → 8s
|
|
// attempt 5 → shift=4 → 16s
|
|
|
|
expected := []time.Duration{
|
|
1 * time.Second,
|
|
2 * time.Second,
|
|
4 * time.Second,
|
|
8 * time.Second,
|
|
16 * time.Second,
|
|
}
|
|
|
|
for attemptNum := 1; attemptNum <= 5; attemptNum++ {
|
|
shift := attemptNum - 1
|
|
if shift > 30 {
|
|
shift = 30
|
|
}
|
|
backoff := time.Duration(1<<uint(shift)) * time.Second //nolint:gosec // bounded above
|
|
assert.Equal(t, expected[attemptNum-1], backoff,
|
|
"backoff for attempt %d should be %v", attemptNum, expected[attemptNum-1])
|
|
}
|
|
}
|
|
|
|
func TestExponentialBackoff_CappedAt30(t *testing.T) {
|
|
t.Parallel()
|
|
// Verify shift is capped at 30 to avoid overflow
|
|
attemptNum := 50
|
|
shift := attemptNum - 1
|
|
if shift > 30 {
|
|
shift = 30
|
|
}
|
|
backoff := time.Duration(1<<uint(shift)) * time.Second //nolint:gosec // bounded above
|
|
assert.Equal(t, time.Duration(1<<30)*time.Second, backoff,
|
|
"backoff shift should be capped at 30")
|
|
}
|
|
|
|
func TestBodyPointer_SmallBodyInline(t *testing.T) {
|
|
t.Parallel()
|
|
// Body under MaxInlineBodySize should be included inline
|
|
smallBody := `{"small": true}`
|
|
assert.Less(t, len(smallBody), MaxInlineBodySize)
|
|
|
|
var bodyPtr *string
|
|
if len(smallBody) < MaxInlineBodySize {
|
|
bodyPtr = &smallBody
|
|
}
|
|
|
|
require.NotNil(t, bodyPtr, "small body should be inline (non-nil)")
|
|
assert.Equal(t, smallBody, *bodyPtr)
|
|
}
|
|
|
|
func TestBodyPointer_LargeBodyNil(t *testing.T) {
|
|
t.Parallel()
|
|
// Body at or above MaxInlineBodySize should be nil
|
|
largeBody := strings.Repeat("x", MaxInlineBodySize)
|
|
assert.GreaterOrEqual(t, len(largeBody), MaxInlineBodySize)
|
|
|
|
var bodyPtr *string
|
|
if len(largeBody) < MaxInlineBodySize {
|
|
bodyPtr = &largeBody
|
|
}
|
|
|
|
assert.Nil(t, bodyPtr, "large body (≥16KB) should be nil")
|
|
}
|
|
|
|
func TestBodyPointer_ExactBoundary(t *testing.T) {
|
|
t.Parallel()
|
|
// Body of exactly MaxInlineBodySize should be nil (the check is <, not <=)
|
|
exactBody := strings.Repeat("y", MaxInlineBodySize)
|
|
assert.Equal(t, MaxInlineBodySize, len(exactBody))
|
|
|
|
var bodyPtr *string
|
|
if len(exactBody) < MaxInlineBodySize {
|
|
bodyPtr = &exactBody
|
|
}
|
|
|
|
assert.Nil(t, bodyPtr, "body at exactly MaxInlineBodySize should be nil")
|
|
}
|
|
|
|
func TestWorkerPool_BoundedConcurrency(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping concurrency test in short mode")
|
|
}
|
|
t.Parallel()
|
|
|
|
const numWorkers = 3
|
|
db := testWebhookDB(t)
|
|
|
|
// Track concurrent tasks
|
|
var (
|
|
mu sync.Mutex
|
|
concurrent int
|
|
maxSeen int
|
|
)
|
|
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
mu.Lock()
|
|
concurrent++
|
|
if concurrent > maxSeen {
|
|
maxSeen = concurrent
|
|
}
|
|
mu.Unlock()
|
|
|
|
time.Sleep(100 * time.Millisecond) // simulate slow target
|
|
|
|
mu.Lock()
|
|
concurrent--
|
|
mu.Unlock()
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
e := testEngine(t, numWorkers)
|
|
// We need a minimal dbManager-like setup. Since processNewTask
|
|
// needs dbManager, we'll drive workers by sending tasks through
|
|
// the delivery channel and manually calling deliverHTTP instead.
|
|
// Instead, let's directly test the worker pool by creating tasks
|
|
// and processing them through the channel.
|
|
|
|
// Create tasks for more work than workers
|
|
const numTasks = 10
|
|
tasks := make([]database.Delivery, numTasks)
|
|
targetCfg := newHTTPTargetConfig(ts.URL)
|
|
|
|
for i := 0; i < numTasks; i++ {
|
|
event := seedEvent(t, db, fmt.Sprintf(`{"task":%d}`, i))
|
|
delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending)
|
|
tasks[i] = database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: delivery.TargetID,
|
|
Status: database.DeliveryStatusPending,
|
|
Event: event,
|
|
Target: database.Target{
|
|
Name: fmt.Sprintf("task-%d", i),
|
|
Type: database.TargetTypeHTTP,
|
|
Config: targetCfg,
|
|
},
|
|
}
|
|
tasks[i].ID = delivery.ID
|
|
}
|
|
|
|
// Process all tasks through a bounded pool of goroutines to simulate
|
|
// the engine's worker pool behavior
|
|
var wg sync.WaitGroup
|
|
taskCh := make(chan int, numTasks)
|
|
for i := 0; i < numTasks; i++ {
|
|
taskCh <- i
|
|
}
|
|
close(taskCh)
|
|
|
|
// Start exactly numWorkers goroutines
|
|
for w := 0; w < numWorkers; w++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for idx := range taskCh {
|
|
e.deliverHTTP(context.TODO(), db, &tasks[idx])
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
mu.Lock()
|
|
observedMax := maxSeen
|
|
mu.Unlock()
|
|
|
|
assert.LessOrEqual(t, observedMax, numWorkers,
|
|
"should never exceed %d concurrent deliveries, saw %d", numWorkers, observedMax)
|
|
|
|
// All deliveries should be completed
|
|
for i := 0; i < numTasks; i++ {
|
|
var d database.Delivery
|
|
require.NoError(t, db.First(&d, "id = ?", tasks[i].ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusDelivered, d.Status,
|
|
"task %d should be delivered", i)
|
|
}
|
|
}
|
|
|
|
func TestDeliverRetry_CircuitBreakerBlocks(t *testing.T) {
|
|
t.Parallel()
|
|
db := testWebhookDB(t)
|
|
e := testEngine(t, 1)
|
|
targetID := uuid.New().String()
|
|
|
|
// Pre-trip the circuit breaker for this target
|
|
cb := e.getCircuitBreaker(targetID)
|
|
for i := 0; i < defaultFailureThreshold; i++ {
|
|
cb.RecordFailure()
|
|
}
|
|
require.Equal(t, CircuitOpen, cb.State())
|
|
|
|
event := seedEvent(t, db, `{"cb":"blocked"}`)
|
|
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending)
|
|
|
|
task := &DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
EventID: event.ID,
|
|
WebhookID: event.WebhookID,
|
|
TargetID: targetID,
|
|
TargetName: "test-cb-block",
|
|
TargetType: database.TargetTypeRetry,
|
|
TargetConfig: newHTTPTargetConfig("http://will-not-be-called.invalid"),
|
|
MaxRetries: 5,
|
|
AttemptNum: 1,
|
|
}
|
|
|
|
d := &database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: targetID,
|
|
Status: database.DeliveryStatusPending,
|
|
Event: event,
|
|
Target: database.Target{
|
|
Name: "test-cb-block",
|
|
Type: database.TargetTypeRetry,
|
|
Config: newHTTPTargetConfig("http://will-not-be-called.invalid"),
|
|
MaxRetries: 5,
|
|
},
|
|
}
|
|
d.ID = delivery.ID
|
|
d.Target.ID = targetID
|
|
|
|
e.deliverRetry(context.TODO(), db, d, task)
|
|
|
|
// Delivery should be retrying (circuit open, no attempt made)
|
|
var updated database.Delivery
|
|
require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, database.DeliveryStatusRetrying, updated.Status,
|
|
"delivery should be retrying when circuit breaker is open")
|
|
|
|
// No delivery result should have been recorded (no attempt was made)
|
|
var resultCount int64
|
|
db.Model(&database.DeliveryResult{}).Where("delivery_id = ?", delivery.ID).Count(&resultCount)
|
|
assert.Equal(t, int64(0), resultCount,
|
|
"no delivery result should be recorded when circuit is open")
|
|
}
|
|
|
|
func TestGetCircuitBreaker_CreatesOnDemand(t *testing.T) {
|
|
t.Parallel()
|
|
e := testEngine(t, 1)
|
|
|
|
targetID := uuid.New().String()
|
|
cb1 := e.getCircuitBreaker(targetID)
|
|
require.NotNil(t, cb1)
|
|
assert.Equal(t, CircuitClosed, cb1.State())
|
|
|
|
// Same target should return the same circuit breaker
|
|
cb2 := e.getCircuitBreaker(targetID)
|
|
assert.Same(t, cb1, cb2, "same target ID should return the same circuit breaker")
|
|
|
|
// Different target should return a different circuit breaker
|
|
otherID := uuid.New().String()
|
|
cb3 := e.getCircuitBreaker(otherID)
|
|
assert.NotSame(t, cb1, cb3, "different target ID should return a different circuit breaker")
|
|
}
|
|
|
|
func TestParseHTTPConfig_Valid(t *testing.T) {
|
|
t.Parallel()
|
|
e := testEngine(t, 1)
|
|
|
|
cfg, err := e.parseHTTPConfig(`{"url":"https://example.com/hook","headers":{"X-Token":"secret"}}`)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, "https://example.com/hook", cfg.URL)
|
|
assert.Equal(t, "secret", cfg.Headers["X-Token"])
|
|
}
|
|
|
|
func TestParseHTTPConfig_Empty(t *testing.T) {
|
|
t.Parallel()
|
|
e := testEngine(t, 1)
|
|
|
|
_, err := e.parseHTTPConfig("")
|
|
assert.Error(t, err, "empty config should return error")
|
|
}
|
|
|
|
func TestParseHTTPConfig_MissingURL(t *testing.T) {
|
|
t.Parallel()
|
|
e := testEngine(t, 1)
|
|
|
|
_, err := e.parseHTTPConfig(`{"headers":{"X-Token":"secret"}}`)
|
|
assert.Error(t, err, "config without URL should return error")
|
|
}
|
|
|
|
func TestScheduleRetry_SendsToRetryChannel(t *testing.T) {
|
|
t.Parallel()
|
|
e := testEngine(t, 1)
|
|
|
|
task := DeliveryTask{
|
|
DeliveryID: uuid.New().String(),
|
|
EventID: uuid.New().String(),
|
|
WebhookID: uuid.New().String(),
|
|
TargetID: uuid.New().String(),
|
|
AttemptNum: 2,
|
|
}
|
|
|
|
e.scheduleRetry(task, 10*time.Millisecond)
|
|
|
|
// Wait for the timer to fire
|
|
select {
|
|
case received := <-e.retryCh:
|
|
assert.Equal(t, task.DeliveryID, received.DeliveryID)
|
|
assert.Equal(t, task.AttemptNum, received.AttemptNum)
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("retry task was not sent to retry channel within timeout")
|
|
}
|
|
}
|
|
|
|
func TestScheduleRetry_DropsWhenChannelFull(t *testing.T) {
|
|
t.Parallel()
|
|
e := &Engine{
|
|
log: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})),
|
|
retryCh: make(chan DeliveryTask, 1), // tiny buffer
|
|
}
|
|
|
|
// Fill the retry channel
|
|
e.retryCh <- DeliveryTask{DeliveryID: "fill"}
|
|
|
|
task := DeliveryTask{
|
|
DeliveryID: "overflow",
|
|
AttemptNum: 2,
|
|
}
|
|
|
|
// Should not panic or block
|
|
e.scheduleRetry(task, 0)
|
|
|
|
// Give timer a moment to fire
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
// Only the original task should be in the channel
|
|
received := <-e.retryCh
|
|
assert.Equal(t, "fill", received.DeliveryID,
|
|
"only the original task should be in the channel (overflow was dropped)")
|
|
}
|
|
|
|
func TestIsForwardableHeader(t *testing.T) {
|
|
t.Parallel()
|
|
// Should forward
|
|
assert.True(t, isForwardableHeader("X-Custom-Header"))
|
|
assert.True(t, isForwardableHeader("Authorization"))
|
|
assert.True(t, isForwardableHeader("Accept"))
|
|
assert.True(t, isForwardableHeader("X-GitHub-Event"))
|
|
|
|
// Should NOT forward (hop-by-hop)
|
|
assert.False(t, isForwardableHeader("Host"))
|
|
assert.False(t, isForwardableHeader("Connection"))
|
|
assert.False(t, isForwardableHeader("Keep-Alive"))
|
|
assert.False(t, isForwardableHeader("Transfer-Encoding"))
|
|
assert.False(t, isForwardableHeader("Content-Length"))
|
|
}
|
|
|
|
func TestTruncate(t *testing.T) {
|
|
t.Parallel()
|
|
assert.Equal(t, "hello", truncate("hello", 10))
|
|
assert.Equal(t, "hello", truncate("hello", 5))
|
|
assert.Equal(t, "hel", truncate("hello", 3))
|
|
assert.Equal(t, "", truncate("", 5))
|
|
}
|
|
|
|
func TestDoHTTPRequest_ForwardsHeaders(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var receivedHeaders http.Header
|
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
receivedHeaders = r.Header.Clone()
|
|
w.WriteHeader(http.StatusOK)
|
|
}))
|
|
defer ts.Close()
|
|
|
|
e := testEngine(t, 1)
|
|
cfg := &HTTPTargetConfig{
|
|
URL: ts.URL,
|
|
Headers: map[string]string{"X-Target-Auth": "bearer xyz"},
|
|
}
|
|
|
|
event := &database.Event{
|
|
Method: "POST",
|
|
Headers: `{"X-Custom":["value1"],"Content-Type":["application/json"]}`,
|
|
Body: `{"test":true}`,
|
|
ContentType: "application/json",
|
|
}
|
|
|
|
statusCode, _, _, err := e.doHTTPRequest(cfg, event)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, http.StatusOK, statusCode)
|
|
|
|
// Check forwarded headers
|
|
assert.Equal(t, "value1", receivedHeaders.Get("X-Custom"))
|
|
assert.Equal(t, "bearer xyz", receivedHeaders.Get("X-Target-Auth"))
|
|
assert.Equal(t, "application/json", receivedHeaders.Get("Content-Type"))
|
|
assert.Equal(t, "webhooker/1.0", receivedHeaders.Get("User-Agent"))
|
|
}
|
|
|
|
func TestProcessDelivery_RoutesToCorrectHandler(t *testing.T) {
|
|
t.Parallel()
|
|
db := testWebhookDB(t)
|
|
e := testEngine(t, 1)
|
|
|
|
tests := []struct {
|
|
name string
|
|
targetType database.TargetType
|
|
wantStatus database.DeliveryStatus
|
|
}{
|
|
{"database target", database.TargetTypeDatabase, database.DeliveryStatusDelivered},
|
|
{"log target", database.TargetTypeLog, database.DeliveryStatusDelivered},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
t.Parallel()
|
|
event := seedEvent(t, db, `{"routing":"test"}`)
|
|
delivery := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending)
|
|
|
|
d := &database.Delivery{
|
|
EventID: event.ID,
|
|
TargetID: delivery.TargetID,
|
|
Status: database.DeliveryStatusPending,
|
|
Event: event,
|
|
Target: database.Target{
|
|
Name: "test-" + string(tt.targetType),
|
|
Type: tt.targetType,
|
|
},
|
|
}
|
|
d.ID = delivery.ID
|
|
|
|
task := &DeliveryTask{
|
|
DeliveryID: delivery.ID,
|
|
TargetType: tt.targetType,
|
|
}
|
|
|
|
e.processDelivery(context.TODO(), db, d, task)
|
|
|
|
var updated database.Delivery
|
|
require.NoError(t, db.First(&updated, "id = ?", delivery.ID).Error)
|
|
assert.Equal(t, tt.wantStatus, updated.Status)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestMaxInlineBodySize_Constant(t *testing.T) {
|
|
t.Parallel()
|
|
// Verify the constant is 16KB as documented
|
|
assert.Equal(t, 16*1024, MaxInlineBodySize,
|
|
"MaxInlineBodySize should be 16KB (16384 bytes)")
|
|
}
|