refactor: use pinned golangci-lint Docker image for linting
All checks were successful
check / check (push) Successful in 1m41s
All checks were successful
check / check (push) Successful in 1m41s
Refactor Dockerfile to use a separate lint stage with a pinned golangci-lint v2.11.3 Docker image instead of installing golangci-lint via curl in the builder stage. This follows the pattern used by sneak/pixa. Changes: - Dockerfile: separate lint stage using golangci/golangci-lint:v2.11.3 (Debian-based, pinned by sha256) with COPY --from=lint dependency - Bump Go from 1.24 to 1.26.1 (golang:1.26.1-bookworm, pinned) - Bump golangci-lint from v1.64.8 to v2.11.3 - Migrate .golangci.yml from v1 to v2 format (same linters, format only) - All Docker images pinned by sha256 digest - Fix all lint issues from the v2 linter upgrade: - Add package comments to all packages - Add doc comments to all exported types, functions, and methods - Fix unchecked errors (errcheck) - Fix unused parameters (revive) - Fix gosec warnings (MaxBytesReader for form parsing) - Fix staticcheck suggestions (fmt.Fprintf instead of WriteString) - Rename DeliveryTask to Task to avoid stutter (delivery.Task) - Rename shadowed builtin 'max' parameter - Update README.md version requirements
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
// Package delivery manages asynchronous event delivery to configured targets.
|
||||
package delivery
|
||||
|
||||
import (
|
||||
@@ -20,7 +21,7 @@ import (
|
||||
|
||||
const (
|
||||
// deliveryChannelSize is the buffer size for the delivery channel.
|
||||
// New DeliveryTasks from the webhook handler are sent here. Workers
|
||||
// New Tasks from the webhook handler are sent here. Workers
|
||||
// drain this channel. Sized large enough that the webhook handler
|
||||
// should never block under normal load.
|
||||
deliveryChannelSize = 10000
|
||||
@@ -41,7 +42,7 @@ const (
|
||||
retrySweepInterval = 60 * time.Second
|
||||
|
||||
// MaxInlineBodySize is the maximum event body size that will be carried
|
||||
// inline in a DeliveryTask through the channel. Bodies at or above this
|
||||
// inline in a Task through the channel. Bodies at or above this
|
||||
// size are left nil and fetched from the per-webhook database on demand.
|
||||
// This keeps channel buffer memory bounded under high traffic.
|
||||
MaxInlineBodySize = 16 * 1024
|
||||
@@ -53,7 +54,7 @@ const (
|
||||
maxBodyLog = 4096
|
||||
)
|
||||
|
||||
// DeliveryTask contains everything needed to deliver an event to a single
|
||||
// Task contains everything needed to deliver an event to a single
|
||||
// target. In the ≤16KB happy path, Body is non-nil and the engine delivers
|
||||
// without touching any database — it trusts that the webhook handler wrote
|
||||
// the records correctly. Only after a delivery attempt (success or failure)
|
||||
@@ -61,7 +62,7 @@ const (
|
||||
//
|
||||
// When Body is nil (payload ≥ MaxInlineBodySize), the engine fetches the
|
||||
// body from the per-webhook database using EventID before delivering.
|
||||
type DeliveryTask struct {
|
||||
type Task struct {
|
||||
DeliveryID string // ID of the Delivery record (for recording results)
|
||||
EventID string // Event ID (for DB lookup if body is nil)
|
||||
WebhookID string // Webhook ID (for per-webhook DB access)
|
||||
@@ -88,7 +89,7 @@ type DeliveryTask struct {
|
||||
// Notifier is the interface for notifying the delivery engine about new
|
||||
// deliveries. Implemented by Engine and injected into handlers.
|
||||
type Notifier interface {
|
||||
Notify(tasks []DeliveryTask)
|
||||
Notify(tasks []Task)
|
||||
}
|
||||
|
||||
// HTTPTargetConfig holds configuration for http target types.
|
||||
@@ -116,7 +117,7 @@ type EngineParams struct {
|
||||
|
||||
// Engine processes queued deliveries in the background using a bounded
|
||||
// worker pool architecture. New deliveries arrive as individual
|
||||
// DeliveryTask values via a buffered delivery channel from the webhook
|
||||
// Task values via a buffered delivery channel from the webhook
|
||||
// handler. Failed deliveries that need retry are scheduled via Go timers
|
||||
// with exponential backoff; each timer fires into a separate retry
|
||||
// channel. A fixed number of worker goroutines drain both channels,
|
||||
@@ -135,8 +136,8 @@ type Engine struct {
|
||||
client *http.Client
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
deliveryCh chan DeliveryTask
|
||||
retryCh chan DeliveryTask
|
||||
deliveryCh chan Task
|
||||
retryCh chan Task
|
||||
workers int
|
||||
|
||||
// circuitBreakers stores a *CircuitBreaker per target ID. Only used
|
||||
@@ -156,8 +157,8 @@ func New(lc fx.Lifecycle, params EngineParams) *Engine {
|
||||
Timeout: httpClientTimeout,
|
||||
Transport: NewSSRFSafeTransport(),
|
||||
},
|
||||
deliveryCh: make(chan DeliveryTask, deliveryChannelSize),
|
||||
retryCh: make(chan DeliveryTask, retryChannelSize),
|
||||
deliveryCh: make(chan Task, deliveryChannelSize),
|
||||
retryCh: make(chan Task, retryChannelSize),
|
||||
workers: defaultWorkers,
|
||||
}
|
||||
|
||||
@@ -208,11 +209,11 @@ func (e *Engine) stop() {
|
||||
|
||||
// Notify signals the delivery engine that new deliveries are ready.
|
||||
// Called by the webhook handler after creating delivery records. Each
|
||||
// DeliveryTask carries all data needed for delivery in the ≤16KB case.
|
||||
// Task carries all data needed for delivery in the ≤16KB case.
|
||||
// Tasks are sent individually to the delivery channel. The call is
|
||||
// non-blocking; if the channel is full, a warning is logged and the
|
||||
// delivery will be recovered on the next engine restart.
|
||||
func (e *Engine) Notify(tasks []DeliveryTask) {
|
||||
func (e *Engine) Notify(tasks []Task) {
|
||||
for i := range tasks {
|
||||
select {
|
||||
case e.deliveryCh <- tasks[i]:
|
||||
@@ -255,7 +256,7 @@ func (e *Engine) recoverPending(ctx context.Context) {
|
||||
// channel. It builds the event and target context from the task's inline
|
||||
// data and executes the delivery. For large bodies (≥ MaxInlineBodySize),
|
||||
// the body is fetched from the per-webhook database on demand.
|
||||
func (e *Engine) processNewTask(ctx context.Context, task *DeliveryTask) {
|
||||
func (e *Engine) processNewTask(ctx context.Context, task *Task) {
|
||||
webhookDB, err := e.dbManager.GetDB(task.WebhookID)
|
||||
if err != nil {
|
||||
e.log.Error("failed to get webhook database",
|
||||
@@ -316,7 +317,7 @@ func (e *Engine) processNewTask(ctx context.Context, task *DeliveryTask) {
|
||||
// The task carries all data needed for delivery (same as the initial
|
||||
// notification). The only DB read is a status check to verify the delivery
|
||||
// hasn't been cancelled or resolved while the timer was pending.
|
||||
func (e *Engine) processRetryTask(ctx context.Context, task *DeliveryTask) {
|
||||
func (e *Engine) processRetryTask(ctx context.Context, task *Task) {
|
||||
webhookDB, err := e.dbManager.GetDB(task.WebhookID)
|
||||
if err != nil {
|
||||
e.log.Error("failed to get webhook database for retry",
|
||||
@@ -504,7 +505,7 @@ func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string)
|
||||
bodyPtr = &bodyStr
|
||||
}
|
||||
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: d.ID,
|
||||
EventID: d.EventID,
|
||||
WebhookID: webhookID,
|
||||
@@ -604,7 +605,7 @@ func (e *Engine) recoverPendingDeliveries(ctx context.Context, webhookDB *gorm.D
|
||||
bodyPtr = &bodyStr
|
||||
}
|
||||
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: deliveries[i].ID,
|
||||
EventID: deliveries[i].EventID,
|
||||
WebhookID: webhookID,
|
||||
@@ -632,7 +633,7 @@ func (e *Engine) recoverPendingDeliveries(ctx context.Context, webhookDB *gorm.D
|
||||
}
|
||||
|
||||
// scheduleRetry creates a Go timer that fires after the given delay and
|
||||
// sends the full DeliveryTask to the engine's retry channel. The task
|
||||
// sends the full Task to the engine's retry channel. The task
|
||||
// carries all data needed for the retry attempt, so when it fires, a
|
||||
// worker can deliver without reading event or target data from the DB.
|
||||
//
|
||||
@@ -640,7 +641,7 @@ func (e *Engine) recoverPendingDeliveries(ctx context.Context, webhookDB *gorm.D
|
||||
// dropped. The delivery remains in `retrying` status in the database
|
||||
// and will be picked up by the periodic retry sweep (DB-mediated
|
||||
// fallback path). No goroutines are blocked or re-armed.
|
||||
func (e *Engine) scheduleRetry(task DeliveryTask, delay time.Duration) {
|
||||
func (e *Engine) scheduleRetry(task Task, delay time.Duration) {
|
||||
e.log.Debug("scheduling delivery retry",
|
||||
"webhook_id", task.WebhookID,
|
||||
"delivery_id", task.DeliveryID,
|
||||
@@ -690,7 +691,7 @@ func (e *Engine) retrySweep(ctx context.Context) {
|
||||
|
||||
// sweepOrphanedRetries scans all webhooks for retrying deliveries whose
|
||||
// backoff period has elapsed. For each eligible delivery, it builds a
|
||||
// DeliveryTask and sends it to the retry channel. If the channel is
|
||||
// Task and sends it to the retry channel. If the channel is
|
||||
// still full, the delivery is skipped and will be retried on the next
|
||||
// sweep cycle.
|
||||
func (e *Engine) sweepOrphanedRetries(ctx context.Context) {
|
||||
@@ -805,7 +806,7 @@ func (e *Engine) sweepWebhookRetries(ctx context.Context, webhookID string) {
|
||||
bodyPtr = &bodyStr
|
||||
}
|
||||
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: d.ID,
|
||||
EventID: d.EventID,
|
||||
WebhookID: webhookID,
|
||||
@@ -835,7 +836,7 @@ func (e *Engine) sweepWebhookRetries(ctx context.Context, webhookID string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) {
|
||||
func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *database.Delivery, task *Task) {
|
||||
switch d.Target.Type {
|
||||
case database.TargetTypeHTTP:
|
||||
e.deliverHTTP(ctx, webhookDB, d, task)
|
||||
@@ -854,7 +855,7 @@ func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *dat
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) deliverHTTP(_ context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) {
|
||||
func (e *Engine) deliverHTTP(_ context.Context, webhookDB *gorm.DB, d *database.Delivery, task *Task) {
|
||||
cfg, err := e.parseHTTPConfig(d.Target.Config)
|
||||
if err != nil {
|
||||
e.log.Error("invalid HTTP target config",
|
||||
@@ -940,7 +941,7 @@ func (e *Engine) deliverHTTP(_ context.Context, webhookDB *gorm.DB, d *database.
|
||||
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying)
|
||||
|
||||
// Schedule a timer for the next retry with exponential backoff.
|
||||
// The timer fires a DeliveryTask into the retry channel carrying
|
||||
// The timer fires a Task into the retry channel carrying
|
||||
// all data needed for the next attempt.
|
||||
shift := attemptNum - 1
|
||||
if shift > 30 {
|
||||
@@ -1038,7 +1039,7 @@ func (e *Engine) deliverSlack(webhookDB *gorm.DB, d *database.Delivery) {
|
||||
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, readErr := io.ReadAll(io.LimitReader(resp.Body, maxBodyLog))
|
||||
if readErr != nil {
|
||||
@@ -1082,10 +1083,10 @@ func FormatSlackMessage(event *database.Event) string {
|
||||
var b strings.Builder
|
||||
|
||||
b.WriteString("*Webhook Event Received*\n")
|
||||
b.WriteString(fmt.Sprintf("*Method:* `%s`\n", event.Method))
|
||||
b.WriteString(fmt.Sprintf("*Content-Type:* `%s`\n", event.ContentType))
|
||||
b.WriteString(fmt.Sprintf("*Timestamp:* `%s`\n", event.CreatedAt.UTC().Format(time.RFC3339)))
|
||||
b.WriteString(fmt.Sprintf("*Body Size:* %d bytes\n", len(event.Body)))
|
||||
fmt.Fprintf(&b, "*Method:* `%s`\n", event.Method)
|
||||
fmt.Fprintf(&b, "*Content-Type:* `%s`\n", event.ContentType)
|
||||
fmt.Fprintf(&b, "*Timestamp:* `%s`\n", event.CreatedAt.UTC().Format(time.RFC3339))
|
||||
fmt.Fprintf(&b, "*Body Size:* %d bytes\n", len(event.Body))
|
||||
|
||||
if event.Body == "" {
|
||||
b.WriteString("\n_(empty body)_\n")
|
||||
@@ -1172,7 +1173,7 @@ func (e *Engine) doHTTPRequest(cfg *HTTPTargetConfig, event *database.Event) (st
|
||||
if err != nil {
|
||||
return 0, "", durationMs, fmt.Errorf("sending request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, readErr := io.ReadAll(io.LimitReader(resp.Body, maxBodyLog))
|
||||
if readErr != nil {
|
||||
|
||||
@@ -34,7 +34,7 @@ func testMainDB(t *testing.T) *gorm.DB {
|
||||
|
||||
sqlDB, err := sql.Open("sqlite", dsn)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { sqlDB.Close() })
|
||||
t.Cleanup(func() { _ = sqlDB.Close() })
|
||||
|
||||
db, err := gorm.Open(sqlite.Dialector{Conn: sqlDB}, &gorm.Config{})
|
||||
require.NoError(t, err)
|
||||
@@ -80,8 +80,8 @@ func testEngineWithDB(t *testing.T, mainDB *gorm.DB, dbMgr *database.WebhookDBMa
|
||||
dbManager: dbMgr,
|
||||
log: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})),
|
||||
client: &http.Client{Timeout: 5 * time.Second},
|
||||
deliveryCh: make(chan DeliveryTask, deliveryChannelSize),
|
||||
retryCh: make(chan DeliveryTask, retryChannelSize),
|
||||
deliveryCh: make(chan Task, deliveryChannelSize),
|
||||
retryCh: make(chan Task, retryChannelSize),
|
||||
workers: 2,
|
||||
}
|
||||
}
|
||||
@@ -101,7 +101,7 @@ func TestProcessNewTask_InlineBody(t *testing.T) {
|
||||
received.Store(true)
|
||||
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprint(w, `{"ok":true}`)
|
||||
_, _ = fmt.Fprint(w, `{"ok":true}`)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -128,7 +128,7 @@ func TestProcessNewTask_InlineBody(t *testing.T) {
|
||||
require.NoError(t, webhookDB.Create(&delivery).Error)
|
||||
|
||||
bodyStr := event.Body
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: webhookID,
|
||||
@@ -196,7 +196,7 @@ func TestProcessNewTask_LargeBody_FetchFromDB(t *testing.T) {
|
||||
require.NoError(t, webhookDB.Create(&delivery).Error)
|
||||
|
||||
// Body is nil — engine should fetch from DB
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: webhookID,
|
||||
@@ -231,7 +231,7 @@ func TestProcessNewTask_InvalidWebhookID(t *testing.T) {
|
||||
|
||||
// Use a webhook ID that has no database
|
||||
// GetDB will create it lazily in the real impl, but the event won't exist
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: uuid.New().String(),
|
||||
EventID: uuid.New().String(),
|
||||
WebhookID: uuid.New().String(),
|
||||
@@ -285,7 +285,7 @@ func TestProcessRetryTask_SuccessfulRetry(t *testing.T) {
|
||||
require.NoError(t, webhookDB.Create(&delivery).Error)
|
||||
|
||||
bodyStr := event.Body
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: webhookID,
|
||||
@@ -340,7 +340,7 @@ func TestProcessRetryTask_SkipsNonRetryingDelivery(t *testing.T) {
|
||||
require.NoError(t, webhookDB.Create(&delivery).Error)
|
||||
|
||||
bodyStr := event.Body
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: webhookID,
|
||||
@@ -399,7 +399,7 @@ func TestProcessRetryTask_LargeBody_FetchFromDB(t *testing.T) {
|
||||
}
|
||||
require.NoError(t, webhookDB.Create(&delivery).Error)
|
||||
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: webhookID,
|
||||
@@ -456,7 +456,7 @@ func TestWorkerLifecycle_StartStop(t *testing.T) {
|
||||
require.NoError(t, webhookDB.Create(&delivery).Error)
|
||||
|
||||
bodyStr := event.Body
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: webhookID,
|
||||
@@ -472,7 +472,7 @@ func TestWorkerLifecycle_StartStop(t *testing.T) {
|
||||
AttemptNum: 1,
|
||||
}
|
||||
|
||||
e.Notify([]DeliveryTask{task})
|
||||
e.Notify([]Task{task})
|
||||
|
||||
// Wait for the worker to process the task
|
||||
require.Eventually(t, func() bool {
|
||||
@@ -526,7 +526,7 @@ func TestWorkerLifecycle_ProcessesRetryChannel(t *testing.T) {
|
||||
|
||||
// Send task directly to retry channel
|
||||
bodyStr := event.Body
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: webhookID,
|
||||
@@ -597,7 +597,7 @@ func TestProcessDelivery_UnknownTargetType(t *testing.T) {
|
||||
}
|
||||
d.ID = delivery.ID
|
||||
|
||||
task := &DeliveryTask{
|
||||
task := &Task{
|
||||
DeliveryID: delivery.ID,
|
||||
TargetType: database.TargetType("unknown"),
|
||||
}
|
||||
@@ -867,7 +867,7 @@ func TestDeliverHTTP_CustomTargetHeaders(t *testing.T) {
|
||||
require.NoError(t, webhookDB.Create(&delivery).Error)
|
||||
|
||||
bodyStr := event.Body
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: webhookID,
|
||||
@@ -914,7 +914,7 @@ func TestDeliverHTTP_TargetTimeout(t *testing.T) {
|
||||
event := seedEvent(t, db, `{"timeout":"test"}`)
|
||||
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending)
|
||||
|
||||
task := &DeliveryTask{
|
||||
task := &Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: event.WebhookID,
|
||||
@@ -964,7 +964,7 @@ func TestDeliverHTTP_InvalidConfig(t *testing.T) {
|
||||
event := seedEvent(t, db, `{"config":"invalid"}`)
|
||||
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending)
|
||||
|
||||
task := &DeliveryTask{
|
||||
task := &Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: event.WebhookID,
|
||||
@@ -1002,9 +1002,9 @@ func TestNotify_MultipleTasks(t *testing.T) {
|
||||
t.Parallel()
|
||||
e := testEngine(t, 1)
|
||||
|
||||
tasks := make([]DeliveryTask, 5)
|
||||
tasks := make([]Task, 5)
|
||||
for i := range tasks {
|
||||
tasks[i] = DeliveryTask{
|
||||
tasks[i] = Task{
|
||||
DeliveryID: fmt.Sprintf("task-%d", i),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ func testWebhookDB(t *testing.T) *gorm.DB {
|
||||
|
||||
sqlDB, err := sql.Open("sqlite", dsn)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { sqlDB.Close() })
|
||||
t.Cleanup(func() { _ = sqlDB.Close() })
|
||||
|
||||
db, err := gorm.Open(sqlite.Dialector{Conn: sqlDB}, &gorm.Config{})
|
||||
require.NoError(t, err)
|
||||
@@ -56,8 +56,8 @@ func testEngine(t *testing.T, workers int) *Engine {
|
||||
return &Engine{
|
||||
log: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})),
|
||||
client: &http.Client{Timeout: 5 * time.Second},
|
||||
deliveryCh: make(chan DeliveryTask, deliveryChannelSize),
|
||||
retryCh: make(chan DeliveryTask, retryChannelSize),
|
||||
deliveryCh: make(chan Task, deliveryChannelSize),
|
||||
retryCh: make(chan Task, retryChannelSize),
|
||||
workers: workers,
|
||||
}
|
||||
}
|
||||
@@ -108,13 +108,13 @@ func TestNotify_NonBlocking(t *testing.T) {
|
||||
|
||||
// Fill the delivery channel to capacity
|
||||
for i := 0; i < deliveryChannelSize; i++ {
|
||||
e.deliveryCh <- DeliveryTask{DeliveryID: fmt.Sprintf("fill-%d", i)}
|
||||
e.deliveryCh <- Task{DeliveryID: fmt.Sprintf("fill-%d", i)}
|
||||
}
|
||||
|
||||
// Notify should NOT block even though channel is full
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
e.Notify([]DeliveryTask{
|
||||
e.Notify([]Task{
|
||||
{DeliveryID: "overflow-1"},
|
||||
{DeliveryID: "overflow-2"},
|
||||
})
|
||||
@@ -134,10 +134,10 @@ func TestDeliverHTTP_Success(t *testing.T) {
|
||||
db := testWebhookDB(t)
|
||||
|
||||
var received atomic.Bool
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
received.Store(true)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprint(w, `{"ok":true}`)
|
||||
_, _ = fmt.Fprint(w, `{"ok":true}`)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -147,7 +147,7 @@ func TestDeliverHTTP_Success(t *testing.T) {
|
||||
event := seedEvent(t, db, `{"hello":"world"}`)
|
||||
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending)
|
||||
|
||||
task := &DeliveryTask{
|
||||
task := &Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: event.WebhookID,
|
||||
@@ -194,7 +194,7 @@ func TestDeliverHTTP_Failure(t *testing.T) {
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
fmt.Fprint(w, "internal error")
|
||||
_, _ = fmt.Fprint(w, "internal error")
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -204,7 +204,7 @@ func TestDeliverHTTP_Failure(t *testing.T) {
|
||||
event := seedEvent(t, db, `{"test":true}`)
|
||||
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending)
|
||||
|
||||
task := &DeliveryTask{
|
||||
task := &Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: event.WebhookID,
|
||||
@@ -322,7 +322,7 @@ func TestDeliverHTTP_WithRetries_Success(t *testing.T) {
|
||||
event := seedEvent(t, db, `{"retry":"ok"}`)
|
||||
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending)
|
||||
|
||||
task := &DeliveryTask{
|
||||
task := &Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: event.WebhookID,
|
||||
@@ -376,7 +376,7 @@ func TestDeliverHTTP_MaxRetriesExhausted(t *testing.T) {
|
||||
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusRetrying)
|
||||
|
||||
maxRetries := 3
|
||||
task := &DeliveryTask{
|
||||
task := &Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: event.WebhookID,
|
||||
@@ -427,7 +427,7 @@ func TestDeliverHTTP_SchedulesRetryOnFailure(t *testing.T) {
|
||||
event := seedEvent(t, db, `{"retry":"schedule"}`)
|
||||
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending)
|
||||
|
||||
task := &DeliveryTask{
|
||||
task := &Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: event.WebhookID,
|
||||
@@ -494,8 +494,8 @@ func TestExponentialBackoff_Durations(t *testing.T) {
|
||||
shift = 30
|
||||
}
|
||||
backoff := time.Duration(1<<uint(shift)) * time.Second //nolint:gosec // bounded above
|
||||
assert.Equal(t, expected[attemptNum-1], backoff,
|
||||
"backoff for attempt %d should be %v", attemptNum, expected[attemptNum-1])
|
||||
assert.Equal(t, expected[attemptNum-1], backoff, //nolint:gosec // bounded by loop range
|
||||
"backoff for attempt %d should be %v", attemptNum, expected[attemptNum-1]) //nolint:gosec // bounded by loop range
|
||||
}
|
||||
}
|
||||
|
||||
@@ -618,10 +618,10 @@ func TestWorkerPool_BoundedConcurrency(t *testing.T) {
|
||||
tasks[i].ID = delivery.ID
|
||||
}
|
||||
|
||||
// Build DeliveryTask structs for each delivery (needed by deliverHTTP)
|
||||
deliveryTasks := make([]DeliveryTask, numTasks)
|
||||
// Build Task structs for each delivery (needed by deliverHTTP)
|
||||
deliveryTasks := make([]Task, numTasks)
|
||||
for i := 0; i < numTasks; i++ {
|
||||
deliveryTasks[i] = DeliveryTask{
|
||||
deliveryTasks[i] = Task{
|
||||
DeliveryID: tasks[i].ID,
|
||||
EventID: tasks[i].EventID,
|
||||
TargetID: tasks[i].TargetID,
|
||||
@@ -687,7 +687,7 @@ func TestDeliverHTTP_CircuitBreakerBlocks(t *testing.T) {
|
||||
event := seedEvent(t, db, `{"cb":"blocked"}`)
|
||||
delivery := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending)
|
||||
|
||||
task := &DeliveryTask{
|
||||
task := &Task{
|
||||
DeliveryID: delivery.ID,
|
||||
EventID: event.ID,
|
||||
WebhookID: event.WebhookID,
|
||||
@@ -778,7 +778,7 @@ func TestScheduleRetry_SendsToRetryChannel(t *testing.T) {
|
||||
t.Parallel()
|
||||
e := testEngine(t, 1)
|
||||
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: uuid.New().String(),
|
||||
EventID: uuid.New().String(),
|
||||
WebhookID: uuid.New().String(),
|
||||
@@ -802,13 +802,13 @@ func TestScheduleRetry_DropsWhenChannelFull(t *testing.T) {
|
||||
t.Parallel()
|
||||
e := &Engine{
|
||||
log: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})),
|
||||
retryCh: make(chan DeliveryTask, 1), // tiny buffer
|
||||
retryCh: make(chan Task, 1), // tiny buffer
|
||||
}
|
||||
|
||||
// Fill the retry channel
|
||||
e.retryCh <- DeliveryTask{DeliveryID: "fill"}
|
||||
e.retryCh <- Task{DeliveryID: "fill"}
|
||||
|
||||
task := DeliveryTask{
|
||||
task := Task{
|
||||
DeliveryID: "overflow",
|
||||
AttemptNum: 2,
|
||||
}
|
||||
@@ -915,7 +915,7 @@ func TestProcessDelivery_RoutesToCorrectHandler(t *testing.T) {
|
||||
}
|
||||
d.ID = delivery.ID
|
||||
|
||||
task := &DeliveryTask{
|
||||
task := &Task{
|
||||
DeliveryID: delivery.ID,
|
||||
TargetType: tt.targetType,
|
||||
}
|
||||
@@ -1054,7 +1054,7 @@ func TestDeliverSlack_Success(t *testing.T) {
|
||||
receivedBody = string(bodyBytes)
|
||||
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprint(w, "ok")
|
||||
_, _ = fmt.Fprint(w, "ok")
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -1107,7 +1107,7 @@ func TestDeliverSlack_Failure(t *testing.T) {
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusForbidden)
|
||||
fmt.Fprint(w, "invalid_token")
|
||||
_, _ = fmt.Fprint(w, "invalid_token")
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@@ -1203,7 +1203,7 @@ func TestProcessDelivery_RoutesToSlack(t *testing.T) {
|
||||
}
|
||||
d.ID = dlv.ID
|
||||
|
||||
task := &DeliveryTask{
|
||||
task := &Task{
|
||||
DeliveryID: dlv.ID,
|
||||
TargetType: database.TargetTypeSlack,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user