Files
webhooker/internal/delivery/engine_integration_test.go
clawbot afe88c601a
All checks were successful
check / check (push) Successful in 5s
refactor: use pinned golangci-lint Docker image for linting (#55)
Closes [issue #50](#50)

## Summary

Refactors the Dockerfile to use a separate lint stage with a pinned golangci-lint Docker image, following the pattern used by [sneak/pixa](https://git.eeqj.de/sneak/pixa). This replaces the previous approach of installing golangci-lint via curl in the builder stage.

## Changes

### Dockerfile
- **New `lint` stage** using `golangci/golangci-lint:v2.11.3` (Debian-based, pinned by sha256 digest) as a separate build stage
- **Builder stage** depends on lint via `COPY --from=lint /src/go.sum /dev/null` — build won't proceed unless linting passes
- **Go bumped** from 1.24 to 1.26.1 (`golang:1.26.1-bookworm`, pinned by sha256)
- **golangci-lint bumped** from v1.64.8 to v2.11.3
- All three Docker images (golangci-lint, golang, alpine) pinned by sha256 digest
- Debian-based golangci-lint image used (not Alpine) because mattn/go-sqlite3 CGO does not compile on musl (off64_t)

### Linter Config (.golangci.yml)
- Migrated from v1 to v2 format (`version: "2"` added)
- Removed linters no longer available in v2: `gofmt` (handled by `make fmt-check`), `gosimple` (merged into `staticcheck`), `typecheck` (always-on in v2)
- Same set of linters enabled — no rules weakened

### Code Fixes (all lint issues from v2 upgrade)
- Added package comments to all packages
- Added doc comments to all exported types, functions, and methods
- Fixed unchecked errors flagged by `errcheck` (sqlDB.Close, os.Setenv in tests, resp.Body.Close, fmt.Fprint)
- Fixed unused parameters flagged by `revive` (renamed to `_`)
- Fixed `gosec` G120 warnings: added `http.MaxBytesReader` before `r.ParseForm()` calls
- Fixed `staticcheck` QF1012: replaced `WriteString(fmt.Sprintf(...))` with `fmt.Fprintf`
- Fixed `staticcheck` QF1003: converted if/else chain to tagged switch
- Renamed `DeliveryTask` → `Task` to avoid package stutter (`delivery.Task` instead of `delivery.DeliveryTask`)
- Renamed shadowed builtin `max` parameter to `upperBound` in `cryptoRandInt`
- Used `t.Setenv` instead of `os.Setenv` in tests (auto-restores)

### README.md
- Updated version requirements: Go 1.26+, golangci-lint v2.11+
- Updated Dockerfile description in project structure

## Verification

`docker build .` passes cleanly — formatting check, linting, all tests, and build all succeed.

Co-authored-by: clawbot <clawbot@noreply.git.eeqj.de>
Reviewed-on: #55
Co-authored-by: clawbot <clawbot@noreply.example.org>
Co-committed-by: clawbot <clawbot@noreply.example.org>
2026-03-25 02:16:38 +01:00

1117 lines
20 KiB
Go

package delivery_test
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
_ "modernc.org/sqlite"
"sneak.berlin/go/webhooker/internal/database"
"sneak.berlin/go/webhooker/internal/delivery"
)
// iSetup holds common integration test dependencies.
type iSetup struct {
MainDB *gorm.DB
DBMgr *database.WebhookDBManager
WebhookID string
WebhookDB *gorm.DB
Engine *delivery.Engine
}
func newISetup(t *testing.T) iSetup {
t.Helper()
mainDB := iMainDB(t)
dbMgr := iDBManager(t)
wID := uuid.New().String()
wDB := iSeedWebhookDB(t, dbMgr, wID)
return iSetup{
MainDB: mainDB,
DBMgr: dbMgr,
WebhookID: wID,
WebhookDB: wDB,
Engine: delivery.NewTestEngineWithDB(
database.NewTestDatabase(mainDB),
dbMgr,
slog.New(slog.NewTextHandler(
os.Stderr,
&slog.HandlerOptions{
Level: slog.LevelDebug,
},
)),
&http.Client{Timeout: 5 * time.Second},
2,
),
}
}
func iMainDB(t *testing.T) *gorm.DB {
t.Helper()
dbPath := filepath.Join(
t.TempDir(), "main-test.db",
)
dsn := fmt.Sprintf(
"file:%s?cache=shared&mode=rwc", dbPath,
)
sqlDB, err := sql.Open("sqlite", dsn)
require.NoError(t, err)
t.Cleanup(func() { _ = sqlDB.Close() })
db, err := gorm.Open(
sqlite.Dialector{Conn: sqlDB}, &gorm.Config{},
)
require.NoError(t, err)
require.NoError(t, db.AutoMigrate(
&database.Webhook{},
&database.Target{},
&database.User{},
&database.Setting{},
))
return db
}
func iDBManager(
t *testing.T,
) *database.WebhookDBManager {
t.Helper()
return database.NewTestWebhookDBManager(t.TempDir())
}
func iSeedWebhookDB(
t *testing.T,
mgr *database.WebhookDBManager,
webhookID string,
) *gorm.DB {
t.Helper()
db, err := mgr.GetDB(webhookID)
require.NoError(t, err)
return db
}
func iHTTPConfig(url string) string {
cfg := delivery.HTTPTargetConfig{URL: url}
data, err := json.Marshal(cfg)
if err != nil {
panic("failed to marshal HTTPTargetConfig")
}
return string(data)
}
func iWebhookDB(t *testing.T) *gorm.DB {
t.Helper()
dbPath := filepath.Join(
t.TempDir(), "events-test.db",
)
dsn := fmt.Sprintf(
"file:%s?cache=shared&mode=rwc", dbPath,
)
sqlDB, err := sql.Open("sqlite", dsn)
require.NoError(t, err)
t.Cleanup(func() { _ = sqlDB.Close() })
db, err := gorm.Open(
sqlite.Dialector{Conn: sqlDB}, &gorm.Config{},
)
require.NoError(t, err)
require.NoError(t, db.AutoMigrate(
&database.Event{},
&database.Delivery{},
&database.DeliveryResult{},
))
return db
}
func iEngine(
t *testing.T, workers int,
) *delivery.Engine {
t.Helper()
return delivery.NewTestEngine(
slog.New(slog.NewTextHandler(
os.Stderr,
&slog.HandlerOptions{Level: slog.LevelDebug},
)),
&http.Client{Timeout: 5 * time.Second},
workers,
)
}
// iSeedEvent creates a test event in the database.
func iSeedEvent(
t *testing.T,
db *gorm.DB,
webhookID, body string,
) database.Event {
t.Helper()
event := database.Event{
WebhookID: webhookID,
EntrypointID: uuid.New().String(),
Method: "POST",
Headers: `{}`,
Body: body,
ContentType: "application/json",
}
require.NoError(t, db.Create(&event).Error)
return event
}
// iSeedDelivery creates a test delivery record.
func iSeedDelivery(
t *testing.T,
db *gorm.DB,
eventID, targetID string,
status database.DeliveryStatus,
) database.Delivery {
t.Helper()
d := database.Delivery{
EventID: eventID,
TargetID: targetID,
Status: status,
}
require.NoError(t, db.Create(&d).Error)
return d
}
// iTask builds a delivery.Task for integration tests.
func iTask(
d database.Delivery,
event database.Event,
webhookID, targetID, name, config string,
maxRetries, attemptNum int,
body *string,
) delivery.Task {
return delivery.Task{
DeliveryID: d.ID,
EventID: event.ID,
WebhookID: webhookID,
TargetID: targetID,
TargetName: name,
TargetType: database.TargetTypeHTTP,
TargetConfig: config,
MaxRetries: maxRetries,
Method: event.Method,
Headers: event.Headers,
ContentType: event.ContentType,
Body: body,
AttemptNum: attemptNum,
}
}
// iAssertStatus checks the delivery status.
func iAssertStatus(
t *testing.T,
db *gorm.DB,
deliveryID string,
expected database.DeliveryStatus,
) {
t.Helper()
var updated database.Delivery
require.NoError(t, db.First(
&updated, "id = ?", deliveryID,
).Error)
assert.Equal(t, expected, updated.Status)
}
// --- processNewTask Tests ---
func TestProcessNewTask_InlineBody(t *testing.T) {
t.Parallel()
s := newISetup(t)
var received atomic.Bool
ts := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, _ *http.Request) {
received.Store(true)
w.WriteHeader(http.StatusOK)
},
),
)
defer ts.Close()
event := iSeedEvent(
t, s.WebhookDB, s.WebhookID,
`{"hello":"world"}`,
)
targetID := uuid.New().String()
d := iSeedDelivery(
t, s.WebhookDB, event.ID, targetID,
database.DeliveryStatusPending,
)
bodyStr := event.Body
cfg := iHTTPConfig(ts.URL)
task := iTask(
d, event, s.WebhookID, targetID,
"test-target", cfg, 0, 1, &bodyStr,
)
s.Engine.ExportProcessNewTask(
context.TODO(), &task,
)
assert.True(t, received.Load())
iAssertStatus(t, s.WebhookDB, d.ID,
database.DeliveryStatusDelivered,
)
}
func TestProcessNewTask_LargeBody_FetchFromDB(
t *testing.T,
) {
t.Parallel()
s := newISetup(t)
largeBody := strings.Repeat(
"x", delivery.MaxInlineBodySize+100,
)
var receivedBody string
ts := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
receivedBody = string(body)
w.WriteHeader(http.StatusOK)
},
),
)
defer ts.Close()
event := iSeedEvent(
t, s.WebhookDB, s.WebhookID, largeBody,
)
targetID := uuid.New().String()
d := iSeedDelivery(
t, s.WebhookDB, event.ID, targetID,
database.DeliveryStatusPending,
)
cfg := iHTTPConfig(ts.URL)
task := iTask(
d, event, s.WebhookID, targetID,
"test-large", cfg, 0, 1, nil,
)
s.Engine.ExportProcessNewTask(
context.TODO(), &task,
)
assert.Equal(t, largeBody, receivedBody)
iAssertStatus(t, s.WebhookDB, d.ID,
database.DeliveryStatusDelivered,
)
}
func TestProcessNewTask_InvalidWebhookID(t *testing.T) {
t.Parallel()
s := newISetup(t)
task := delivery.Task{
DeliveryID: uuid.New().String(),
EventID: uuid.New().String(),
WebhookID: uuid.New().String(),
TargetID: uuid.New().String(),
TargetName: "test",
TargetType: database.TargetTypeHTTP,
TargetConfig: iHTTPConfig("http://localhost:9999"),
MaxRetries: 0,
Body: nil,
AttemptNum: 1,
}
s.Engine.ExportProcessNewTask(
context.TODO(), &task,
)
}
// --- processRetryTask Tests ---
func TestProcessRetryTask_SuccessfulRetry(t *testing.T) {
t.Parallel()
s := newISetup(t)
ts := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
},
),
)
defer ts.Close()
event := iSeedEvent(
t, s.WebhookDB, s.WebhookID,
`{"retry":"test"}`,
)
targetID := uuid.New().String()
d := iSeedDelivery(
t, s.WebhookDB, event.ID, targetID,
database.DeliveryStatusRetrying,
)
bodyStr := event.Body
cfg := iHTTPConfig(ts.URL)
task := iTask(
d, event, s.WebhookID, targetID,
"retry-target", cfg, 5, 2, &bodyStr,
)
s.Engine.ExportProcessRetryTask(
context.TODO(), &task,
)
iAssertStatus(t, s.WebhookDB, d.ID,
database.DeliveryStatusDelivered,
)
}
func TestProcessRetryTask_SkipsNonRetryingDelivery(
t *testing.T,
) {
t.Parallel()
s := newISetup(t)
event := iSeedEvent(
t, s.WebhookDB, s.WebhookID,
`{"skip":"test"}`,
)
targetID := uuid.New().String()
d := iSeedDelivery(
t, s.WebhookDB, event.ID, targetID,
database.DeliveryStatusDelivered,
)
bodyStr := event.Body
cfg := iHTTPConfig("http://localhost:1")
task := iTask(
d, event, s.WebhookID, targetID,
"skip-target", cfg, 5, 2, &bodyStr,
)
s.Engine.ExportProcessRetryTask(
context.TODO(), &task,
)
iAssertStatus(t, s.WebhookDB, d.ID,
database.DeliveryStatusDelivered,
)
}
func TestProcessRetryTask_LargeBody_FetchFromDB(
t *testing.T,
) {
t.Parallel()
s := newISetup(t)
ts := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
},
),
)
defer ts.Close()
largeBody := strings.Repeat(
"z", delivery.MaxInlineBodySize+50,
)
event := iSeedEvent(
t, s.WebhookDB, s.WebhookID, largeBody,
)
targetID := uuid.New().String()
d := iSeedDelivery(
t, s.WebhookDB, event.ID, targetID,
database.DeliveryStatusRetrying,
)
cfg := iHTTPConfig(ts.URL)
task := iTask(
d, event, s.WebhookID, targetID,
"retry-large", cfg, 5, 2, nil,
)
s.Engine.ExportProcessRetryTask(
context.TODO(), &task,
)
iAssertStatus(t, s.WebhookDB, d.ID,
database.DeliveryStatusDelivered,
)
}
// --- Worker Lifecycle Tests ---
func TestWorkerLifecycle_StartStop(t *testing.T) {
t.Parallel()
s := newISetup(t)
s.Engine.ExportStart(context.Background())
event := iSeedEvent(
t, s.WebhookDB, s.WebhookID,
`{"lifecycle":"test"}`,
)
targetID := uuid.New().String()
d := iSeedDelivery(
t, s.WebhookDB, event.ID, targetID,
database.DeliveryStatusPending,
)
bodyStr := event.Body
task := iTask(
d, event, s.WebhookID, targetID,
"lifecycle-test", "",
0, 1, &bodyStr,
)
task.TargetType = database.TargetTypeLog
s.Engine.Notify([]delivery.Task{task})
iWaitForStatus(
t, s.WebhookDB, d.ID,
database.DeliveryStatusDelivered,
)
s.Engine.ExportStop()
}
// iWaitForStatus polls until the delivery reaches the
// expected status.
func iWaitForStatus(
t *testing.T,
db *gorm.DB,
deliveryID string,
expected database.DeliveryStatus,
) {
t.Helper()
require.Eventually(t, func() bool {
var d database.Delivery
err := db.First(
&d, "id = ?", deliveryID,
).Error
if err != nil {
return false
}
return d.Status == expected
}, 5*time.Second, 50*time.Millisecond)
}
func TestWorkerLifecycle_ProcessesRetryChannel(
t *testing.T,
) {
t.Parallel()
s := newISetup(t)
ts := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
},
),
)
defer ts.Close()
event := iSeedEvent(
t, s.WebhookDB, s.WebhookID,
`{"retry-chan":"test"}`,
)
targetID := uuid.New().String()
d := iSeedDelivery(
t, s.WebhookDB, event.ID, targetID,
database.DeliveryStatusRetrying,
)
s.Engine.ExportStart(context.Background())
bodyStr := event.Body
cfg := iHTTPConfig(ts.URL)
task := iTask(
d, event, s.WebhookID, targetID,
"retry-chan-test", cfg, 5, 2, &bodyStr,
)
s.Engine.ExportRetryCh() <- task
iWaitForStatus(
t, s.WebhookDB, d.ID,
database.DeliveryStatusDelivered,
)
s.Engine.ExportStop()
}
// --- processDelivery: unknown target type ---
func TestProcessDelivery_UnknownTargetType(
t *testing.T,
) {
t.Parallel()
s := newISetup(t)
event := iSeedEvent(
t, s.WebhookDB, s.WebhookID,
`{"unknown":"type"}`,
)
del := iSeedDelivery(
t, s.WebhookDB, event.ID,
uuid.New().String(),
database.DeliveryStatusPending,
)
d := &database.Delivery{
EventID: event.ID,
TargetID: del.TargetID,
Status: database.DeliveryStatusPending,
Event: event,
Target: database.Target{
Name: "unknown",
Type: database.TargetType("unknown"),
},
}
d.ID = del.ID
task := &delivery.Task{
DeliveryID: del.ID,
TargetType: database.TargetType("unknown"),
}
s.Engine.ExportProcessDelivery(
context.TODO(), s.WebhookDB, d, task,
)
iAssertStatus(t, s.WebhookDB, del.ID,
database.DeliveryStatusFailed,
)
}
// --- Recovery Tests ---
func TestRecoverPendingDeliveries(t *testing.T) {
t.Parallel()
s := newISetup(t)
targetID := uuid.New().String()
iCreateTarget(t, s.MainDB, targetID,
s.WebhookID, "recovery-target",
database.TargetTypeLog, "", 0,
)
iSeedPendingDeliveries(
t, s.WebhookDB, s.WebhookID, targetID, 3,
)
s.Engine.ExportRecoverPendingDeliveries(
context.Background(), s.WebhookDB,
s.WebhookID,
)
for i := range 3 {
select {
case task := <-s.Engine.ExportDeliveryCh():
assert.Equal(t, targetID, task.TargetID)
assert.Equal(t,
database.TargetTypeLog,
task.TargetType,
)
case <-time.After(2 * time.Second):
t.Fatalf("expected task %d", i)
}
}
}
// iCreateTarget creates a target in the main DB.
func iCreateTarget(
t *testing.T,
mainDB *gorm.DB,
targetID, webhookID, name string,
targetType database.TargetType,
config string,
maxRetries int,
) {
t.Helper()
target := database.Target{
WebhookID: webhookID,
Name: name,
Type: targetType,
Active: true,
Config: config,
MaxRetries: maxRetries,
}
target.ID = targetID
require.NoError(t, mainDB.Create(&target).Error)
}
// iSeedPendingDeliveries creates n pending deliveries.
func iSeedPendingDeliveries(
t *testing.T,
webhookDB *gorm.DB,
webhookID, targetID string,
n int,
) {
t.Helper()
for i := range n {
event := iSeedEvent(
t, webhookDB, webhookID,
fmt.Sprintf(`{"recovery":%d}`, i),
)
iSeedDelivery(
t, webhookDB, event.ID, targetID,
database.DeliveryStatusPending,
)
}
}
func TestRecoverWebhookDeliveries_RetryingDeliveries(
t *testing.T,
) {
t.Parallel()
s := newISetup(t)
targetID := uuid.New().String()
iCreateTarget(t, s.MainDB, targetID,
s.WebhookID, "retry-recovery",
database.TargetTypeHTTP,
iHTTPConfig("http://example.com/hook"), 5,
)
event := iSeedEvent(
t, s.WebhookDB, s.WebhookID,
`{"retry-recovery":"test"}`,
)
d := iSeedDelivery(
t, s.WebhookDB, event.ID, targetID,
database.DeliveryStatusRetrying,
)
iSeedFailedResult(t, s.WebhookDB, d.ID)
iCreateWebhook(
t, s.MainDB, s.WebhookID, "test-webhook",
)
s.Engine.ExportRecoverWebhookDeliveries(
context.Background(), s.WebhookID,
)
select {
case task := <-s.Engine.ExportRetryCh():
assert.Equal(t, d.ID, task.DeliveryID)
assert.Equal(t, targetID, task.TargetID)
assert.Equal(t, 2, task.AttemptNum)
case <-time.After(5 * time.Second):
t.Fatal("expected retry task from recovery")
}
}
// iSeedFailedResult creates a failed delivery result.
func iSeedFailedResult(
t *testing.T,
db *gorm.DB,
deliveryID string,
) {
t.Helper()
result := database.DeliveryResult{
DeliveryID: deliveryID,
AttemptNum: 1,
Success: false,
StatusCode: 500,
Error: "server error",
}
require.NoError(t, db.Create(&result).Error)
}
// iCreateWebhook creates a webhook record in main DB.
func iCreateWebhook(
t *testing.T,
mainDB *gorm.DB,
webhookID, name string,
) {
t.Helper()
webhook := database.Webhook{
UserID: uuid.New().String(),
Name: name,
}
webhook.ID = webhookID
require.NoError(t, mainDB.Create(&webhook).Error)
}
// --- recoverInFlight Tests ---
func TestRecoverInFlight_NoWebhooks(t *testing.T) {
t.Parallel()
s := newISetup(t)
s.Engine.ExportRecoverInFlight(
context.Background(),
)
}
func TestRecoverInFlight_WithPendingDeliveries(
t *testing.T,
) {
t.Parallel()
s := newISetup(t)
targetID := uuid.New().String()
iCreateWebhook(
t, s.MainDB, s.WebhookID, "recover-test",
)
iCreateTarget(t, s.MainDB, targetID,
s.WebhookID, "recover-target",
database.TargetTypeLog, "", 0,
)
event := iSeedEvent(
t, s.WebhookDB, s.WebhookID,
`{"recover":"inflight"}`,
)
d := iSeedDelivery(
t, s.WebhookDB, event.ID, targetID,
database.DeliveryStatusPending,
)
s.Engine.ExportRecoverInFlight(
context.Background(),
)
select {
case task := <-s.Engine.ExportDeliveryCh():
assert.Equal(t, d.ID, task.DeliveryID)
assert.Equal(t,
database.TargetTypeLog, task.TargetType,
)
case <-time.After(2 * time.Second):
t.Fatal("expected task from recoverInFlight")
}
}
// --- HTTP Config with custom headers ---
func TestDeliverHTTP_CustomTargetHeaders(t *testing.T) {
t.Parallel()
s := newISetup(t)
var receivedAuth string
ts := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
receivedAuth = r.Header.Get(
"Authorization",
)
w.WriteHeader(http.StatusOK)
},
),
)
defer ts.Close()
cfg := delivery.HTTPTargetConfig{
URL: ts.URL,
Headers: map[string]string{
"Authorization": "Bearer secret-token",
},
}
cfgJSON, err := json.Marshal(cfg)
require.NoError(t, err)
event := iSeedEvent(
t, s.WebhookDB, s.WebhookID,
`{"auth":"test"}`,
)
targetID := uuid.New().String()
d := iSeedDelivery(
t, s.WebhookDB, event.ID, targetID,
database.DeliveryStatusPending,
)
bodyStr := event.Body
task := iTask(
d, event, s.WebhookID, targetID,
"auth-target", string(cfgJSON),
0, 1, &bodyStr,
)
s.Engine.ExportProcessNewTask(
context.TODO(), &task,
)
assert.Equal(t,
"Bearer secret-token", receivedAuth,
)
}
// --- HTTP delivery with custom timeout ---
func TestDeliverHTTP_TargetTimeout(t *testing.T) {
t.Parallel()
db := iWebhookDB(t)
e := iEngine(t, 1)
ts := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, _ *http.Request) {
time.Sleep(2 * time.Second)
w.WriteHeader(http.StatusOK)
},
),
)
defer ts.Close()
cfg := delivery.HTTPTargetConfig{
URL: ts.URL,
Timeout: 1,
}
cfgJSON, err := json.Marshal(cfg)
require.NoError(t, err)
event, del := iSeedEventAndDelivery(
t, db, `{"timeout":"test"}`,
string(cfgJSON),
)
task, d := iHTTPTaskAndDelivery(
event, del, "timeout-target",
string(cfgJSON), 0, 1,
)
e.ExportDeliverHTTP(context.TODO(), db, d, task)
iAssertStatus(t, db, del.ID,
database.DeliveryStatusFailed,
)
iAssertResultFailed(t, db, del.ID)
}
// iSeedEventAndDelivery creates event + delivery
// for standalone tests.
func iSeedEventAndDelivery(
t *testing.T,
db *gorm.DB,
body, _ string,
) (database.Event, database.Delivery) {
t.Helper()
event := database.Event{
WebhookID: uuid.New().String(),
EntrypointID: uuid.New().String(),
Method: "POST",
Headers: `{"Content-Type":["application/json"]}`,
Body: body,
ContentType: "application/json",
}
require.NoError(t, db.Create(&event).Error)
d := database.Delivery{
EventID: event.ID,
TargetID: uuid.New().String(),
Status: database.DeliveryStatusPending,
}
require.NoError(t, db.Create(&d).Error)
return event, d
}
// iHTTPTaskAndDelivery builds a task/delivery pair for
// standalone HTTP tests.
func iHTTPTaskAndDelivery(
event database.Event,
del database.Delivery,
name, config string,
maxRetries, attemptNum int,
) (*delivery.Task, *database.Delivery) {
task := &delivery.Task{
DeliveryID: del.ID,
EventID: event.ID,
WebhookID: event.WebhookID,
TargetID: del.TargetID,
TargetName: name,
TargetType: database.TargetTypeHTTP,
TargetConfig: config,
MaxRetries: maxRetries,
AttemptNum: attemptNum,
}
d := &database.Delivery{
EventID: event.ID,
TargetID: del.TargetID,
Status: database.DeliveryStatusPending,
Event: event,
Target: database.Target{
Name: name,
Type: database.TargetTypeHTTP,
Config: config,
},
}
d.ID = del.ID
return task, d
}
// iAssertResultFailed checks that a failed delivery
// result exists.
func iAssertResultFailed(
t *testing.T,
db *gorm.DB,
deliveryID string,
) {
t.Helper()
var result database.DeliveryResult
require.NoError(t, db.Where(
"delivery_id = ?", deliveryID,
).First(&result).Error)
assert.False(t, result.Success)
assert.NotEmpty(t, result.Error)
}
// --- HTTP request with invalid config ---
func TestDeliverHTTP_InvalidConfig(t *testing.T) {
t.Parallel()
db := iWebhookDB(t)
e := iEngine(t, 1)
event, del := iSeedEventAndDelivery(
t, db, `{"config":"invalid"}`, "",
)
task, d := iHTTPTaskAndDelivery(
event, del, "bad-config", `not-json`, 0, 1,
)
e.ExportDeliverHTTP(context.TODO(), db, d, task)
iAssertStatus(t, db, del.ID,
database.DeliveryStatusFailed,
)
}
// --- Notify batching ---
func TestNotify_MultipleTasks(t *testing.T) {
t.Parallel()
e := iEngine(t, 1)
tasks := make([]delivery.Task, 5)
for i := range tasks {
tasks[i] = delivery.Task{
DeliveryID: fmt.Sprintf("task-%d", i),
}
}
e.Notify(tasks)
for i := range 5 {
select {
case task := <-e.ExportDeliveryCh():
assert.Equal(t,
fmt.Sprintf("task-%d", i),
task.DeliveryID,
)
case <-time.After(time.Second):
t.Fatalf("expected task %d", i)
}
}
}