Files
webhooker/internal/delivery/engine.go
clawbot 32a9170428
All checks were successful
check / check (push) Successful in 1m37s
refactor: use pinned golangci-lint Docker image for linting
Refactor Dockerfile to use a separate lint stage with a pinned
golangci-lint v2.11.3 Docker image instead of installing
golangci-lint via curl in the builder stage. This follows the
pattern used by sneak/pixa.

Changes:
- Dockerfile: separate lint stage using golangci/golangci-lint:v2.11.3
  (Debian-based, pinned by sha256) with COPY --from=lint dependency
- Bump Go from 1.24 to 1.26.1 (golang:1.26.1-bookworm, pinned)
- Bump golangci-lint from v1.64.8 to v2.11.3
- Migrate .golangci.yml from v1 to v2 format (same linters, format only)
- All Docker images pinned by sha256 digest
- Fix all lint issues from the v2 linter upgrade:
  - Add package comments to all packages
  - Add doc comments to all exported types, functions, and methods
  - Fix unchecked errors (errcheck)
  - Fix unused parameters (revive)
  - Fix gosec warnings (MaxBytesReader for form parsing)
  - Fix staticcheck suggestions (fmt.Fprintf instead of WriteString)
  - Rename DeliveryTask to Task to avoid stutter (delivery.Task)
  - Rename shadowed builtin 'max' parameter
- Update README.md version requirements
2026-03-18 22:26:48 -07:00

1743 lines
32 KiB
Go

// Package delivery manages asynchronous event delivery
// to configured targets.
package delivery
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"sync"
"time"
"go.uber.org/fx"
"gorm.io/gorm"
"sneak.berlin/go/webhooker/internal/database"
"sneak.berlin/go/webhooker/internal/logger"
)
const (
// deliveryChannelSize is the buffer size for the delivery
// channel. New Tasks from the webhook handler are sent
// here. Workers drain this channel. Sized large enough
// that the webhook handler should never block under
// normal load.
deliveryChannelSize = 10000
// retryChannelSize is the buffer size for the retry
// channel. Timer-fired retries are sent here for
// processing by workers.
retryChannelSize = 10000
// defaultWorkers is the number of worker goroutines in
// the delivery engine pool. At most this many deliveries
// are in-flight at any time, preventing goroutine
// explosions regardless of queue depth.
defaultWorkers = 10
// retrySweepInterval is how often the periodic retry
// sweep runs.
retrySweepInterval = 60 * time.Second
// MaxInlineBodySize is the maximum event body size that
// will be carried inline in a Task through the channel.
// Bodies at or above this size are left nil and fetched
// from the per-webhook database on demand.
MaxInlineBodySize = 16 * 1024
// httpClientTimeout is the timeout for outbound HTTP
// requests.
httpClientTimeout = 30 * time.Second
// maxBodyLog is the maximum response body length to
// store in DeliveryResult.
maxBodyLog = 4096
// maxBackoffShift caps the exponential backoff shift to
// avoid integer overflow in the 1<<shift expression.
maxBackoffShift = 30
// httpSuccessMin is the lower bound (inclusive) of the
// HTTP success status code range.
httpSuccessMin = 200
// httpSuccessMax is the upper bound (exclusive) of the
// HTTP success status code range.
httpSuccessMax = 300
)
// Sentinel errors returned by config parsers.
var (
errEmptyTargetConfig = errors.New(
"empty target config",
)
errMissingWebhookURL = errors.New(
"webhook_url is required",
)
errMissingTargetURL = errors.New(
"target URL is required",
)
)
// Task contains everything needed to deliver an event to a
// single target.
type Task struct {
DeliveryID string
EventID string
WebhookID string
TargetID string
TargetName string
TargetType database.TargetType
TargetConfig string
MaxRetries int
Method string
Headers string
ContentType string
Body *string
AttemptNum int
}
// Notifier is the interface for notifying the delivery
// engine about new deliveries.
type Notifier interface {
Notify(tasks []Task)
}
// HTTPTargetConfig holds configuration for http target
// types.
type HTTPTargetConfig struct {
URL string `json:"url"`
Headers map[string]string `json:"headers,omitempty"`
Timeout int `json:"timeout,omitempty"`
}
// SlackTargetConfig holds configuration for slack target
// types.
type SlackTargetConfig struct {
WebhookURL string `json:"webhookUrl"`
}
// EngineParams are the fx dependencies for the delivery
// engine.
type EngineParams struct {
fx.In
DB *database.Database
DBManager *database.WebhookDBManager
Logger *logger.Logger
}
// Engine processes queued deliveries in the background
// using a bounded worker pool architecture.
type Engine struct {
database *database.Database
dbManager *database.WebhookDBManager
log *slog.Logger
client *http.Client
cancel context.CancelFunc
wg sync.WaitGroup
deliveryCh chan Task
retryCh chan Task
workers int
// circuitBreakers stores a *CircuitBreaker per target
// ID.
circuitBreakers sync.Map
}
// New creates and registers the delivery engine with the
// fx lifecycle.
func New(
lc fx.Lifecycle,
params EngineParams,
) *Engine {
e := &Engine{
database: params.DB,
dbManager: params.DBManager,
log: params.Logger.Get(),
client: &http.Client{
Timeout: httpClientTimeout,
Transport: NewSSRFSafeTransport(),
},
deliveryCh: make(chan Task, deliveryChannelSize),
retryCh: make(chan Task, retryChannelSize),
workers: defaultWorkers,
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
e.start(ctx)
return nil
},
OnStop: func(_ context.Context) error {
e.stop()
return nil
},
})
return e
}
// Notify signals the delivery engine that new deliveries
// are ready.
func (e *Engine) Notify(tasks []Task) {
for i := range tasks {
select {
case e.deliveryCh <- tasks[i]:
default:
e.log.Warn(
"delivery channel full, "+
"task will be recovered on restart",
"delivery_id", tasks[i].DeliveryID,
"event_id", tasks[i].EventID,
)
}
}
}
// FormatSlackMessage builds a Slack-compatible message
// string from a webhook event.
func FormatSlackMessage(
event *database.Event,
) string {
var b strings.Builder
b.WriteString("*Webhook Event Received*\n")
fmt.Fprintf(
&b, "*Method:* `%s`\n", event.Method,
)
fmt.Fprintf(
&b,
"*Content-Type:* `%s`\n",
event.ContentType,
)
fmt.Fprintf(
&b,
"*Timestamp:* `%s`\n",
event.CreatedAt.UTC().Format(time.RFC3339),
)
fmt.Fprintf(
&b,
"*Body Size:* %d bytes\n",
len(event.Body),
)
if event.Body == "" {
b.WriteString("\n_(empty body)_\n")
return b.String()
}
if formatted := formatJSONBody(event.Body); formatted != "" {
b.WriteString(formatted)
return b.String()
}
formatRawBody(&b, event.Body)
return b.String()
}
func (e *Engine) start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
e.cancel = cancel
for range e.workers {
e.wg.Add(1)
go e.worker(ctx)
}
e.wg.Add(1)
go e.recoverPending(ctx)
e.wg.Add(1)
go e.retrySweep(ctx)
e.log.Info(
"delivery engine started",
"workers", e.workers,
)
}
func (e *Engine) stop() {
e.log.Info("delivery engine stopping")
e.cancel()
e.wg.Wait()
e.log.Info("delivery engine stopped")
}
func (e *Engine) worker(ctx context.Context) {
defer e.wg.Done()
for {
select {
case <-ctx.Done():
return
case task := <-e.deliveryCh:
e.processNewTask(ctx, &task)
case task := <-e.retryCh:
e.processRetryTask(ctx, &task)
}
}
}
func (e *Engine) recoverPending(ctx context.Context) {
defer e.wg.Done()
e.recoverInFlight(ctx)
}
func (e *Engine) processNewTask(
ctx context.Context, task *Task,
) {
webhookDB, err := e.dbManager.GetDB(task.WebhookID)
if err != nil {
e.log.Error(
"failed to get webhook database",
"webhook_id", task.WebhookID,
"error", err,
)
return
}
event := buildEventFromTask(task)
event, err = e.resolveEventBody(
webhookDB, event, task,
)
if err != nil {
e.log.Error(
"failed to fetch event body from database",
"event_id", task.EventID,
"error", err,
)
return
}
target := buildTargetFromTask(task)
d := &database.Delivery{
EventID: task.EventID,
TargetID: task.TargetID,
Status: database.DeliveryStatusPending,
Event: event,
Target: target,
}
d.ID = task.DeliveryID
e.processDelivery(ctx, webhookDB, d, task)
}
func (e *Engine) processRetryTask(
ctx context.Context, task *Task,
) {
webhookDB, err := e.dbManager.GetDB(task.WebhookID)
if err != nil {
e.log.Error(
"failed to get webhook database for retry",
"webhook_id", task.WebhookID,
"delivery_id", task.DeliveryID,
"error", err,
)
return
}
d, err := e.loadRetryDelivery(
webhookDB, task.DeliveryID,
)
if err != nil {
e.log.Error(
"failed to load delivery for retry",
"delivery_id", task.DeliveryID,
"error", err,
)
return
}
if d.Status != database.DeliveryStatusRetrying {
e.log.Debug(
"skipping retry for delivery "+
"no longer in retrying status",
"delivery_id", d.ID,
"status", d.Status,
)
return
}
event := buildEventFromTask(task)
event, err = e.resolveEventBody(
webhookDB, event, task,
)
if err != nil {
e.log.Error(
"failed to fetch event body for retry",
"event_id", task.EventID,
"error", err,
)
return
}
target := buildTargetFromTask(task)
d.EventID = task.EventID
d.TargetID = task.TargetID
d.Event = event
d.Target = target
e.processDelivery(ctx, webhookDB, d, task)
}
func (e *Engine) recoverInFlight(ctx context.Context) {
var webhookIDs []string
err := e.database.DB().
Model(&database.Webhook{}).
Pluck("id", &webhookIDs).Error
if err != nil {
e.log.Error(
"failed to query webhook IDs for recovery",
"error", err,
)
return
}
for _, webhookID := range webhookIDs {
select {
case <-ctx.Done():
return
default:
}
if !e.dbManager.DBExists(webhookID) {
continue
}
e.recoverWebhookDeliveries(ctx, webhookID)
}
}
func (e *Engine) recoverWebhookDeliveries(
ctx context.Context, webhookID string,
) {
webhookDB, err := e.dbManager.GetDB(webhookID)
if err != nil {
e.log.Error(
"failed to get webhook database for recovery",
"webhook_id", webhookID,
"error", err,
)
return
}
e.recoverPendingDeliveries(
ctx, webhookDB, webhookID,
)
e.recoverRetryingDeliveries(
webhookDB, webhookID,
)
}
func (e *Engine) recoverRetryingDeliveries(
webhookDB *gorm.DB, webhookID string,
) {
var retrying []database.Delivery
err := webhookDB.
Where(
"status = ?",
database.DeliveryStatusRetrying,
).
Find(&retrying).Error
if err != nil {
e.log.Error(
"failed to query retrying deliveries "+
"for recovery",
"webhook_id", webhookID,
"error", err,
)
return
}
for i := range retrying {
e.recoverSingleRetry(
webhookDB, webhookID, &retrying[i],
)
}
}
func (e *Engine) recoverSingleRetry(
webhookDB *gorm.DB,
webhookID string,
d *database.Delivery,
) {
attemptNum := e.countAttempts(webhookDB, d.ID)
remaining := e.calcRemainingBackoff(
webhookDB, d.ID, attemptNum,
)
event, err := e.loadEvent(webhookDB, d.EventID)
if err != nil {
e.log.Error(
"failed to load event for retrying "+
"delivery recovery",
"delivery_id", d.ID,
"event_id", d.EventID,
"error", err,
)
return
}
target, err := e.loadTarget(d.TargetID)
if err != nil {
e.log.Error(
"failed to load target for retrying "+
"delivery recovery",
"delivery_id", d.ID,
"target_id", d.TargetID,
"error", err,
)
return
}
task := buildRecoveryTask(
d, webhookID, &event, &target, attemptNum+1,
)
e.log.Info(
"recovering retrying delivery",
"webhook_id", webhookID,
"delivery_id", d.ID,
"attempt", attemptNum,
"remaining_backoff", remaining,
)
e.scheduleRetry(task, remaining)
}
func (e *Engine) recoverPendingDeliveries(
ctx context.Context,
webhookDB *gorm.DB,
webhookID string,
) {
var deliveries []database.Delivery
result := webhookDB.
Where(
"status = ?",
database.DeliveryStatusPending,
).
Preload("Event").
Find(&deliveries)
if result.Error != nil {
e.log.Error(
"failed to query pending deliveries",
"webhook_id", webhookID,
"error", result.Error,
)
return
}
if len(deliveries) == 0 {
return
}
e.log.Info(
"recovering pending deliveries",
"webhook_id", webhookID,
"count", len(deliveries),
)
targetMap := e.loadTargetMap(deliveries)
e.sendRecoveredDeliveries(
ctx, deliveries, webhookID, targetMap,
)
}
func (e *Engine) scheduleRetry(
task Task, delay time.Duration,
) {
e.log.Debug(
"scheduling delivery retry",
"webhook_id", task.WebhookID,
"delivery_id", task.DeliveryID,
"delay", delay,
"next_attempt", task.AttemptNum,
)
time.AfterFunc(delay, func() {
select {
case e.retryCh <- task:
default:
e.log.Warn(
"retry channel full, delivery "+
"will be recovered by periodic sweep",
"delivery_id", task.DeliveryID,
"webhook_id", task.WebhookID,
)
}
})
}
func (e *Engine) retrySweep(ctx context.Context) {
defer e.wg.Done()
ticker := time.NewTicker(retrySweepInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
e.sweepOrphanedRetries(ctx)
}
}
}
func (e *Engine) sweepOrphanedRetries(
ctx context.Context,
) {
var webhookIDs []string
err := e.database.DB().
Model(&database.Webhook{}).
Pluck("id", &webhookIDs).Error
if err != nil {
e.log.Error(
"retry sweep: failed to query webhook IDs",
"error", err,
)
return
}
for _, webhookID := range webhookIDs {
select {
case <-ctx.Done():
return
default:
}
if !e.dbManager.DBExists(webhookID) {
continue
}
e.sweepWebhookRetries(ctx, webhookID)
}
}
func (e *Engine) sweepWebhookRetries(
ctx context.Context, webhookID string,
) {
webhookDB, err := e.dbManager.GetDB(webhookID)
if err != nil {
e.log.Error(
"retry sweep: failed to get webhook database",
"webhook_id", webhookID,
"error", err,
)
return
}
var retrying []database.Delivery
err = webhookDB.
Where(
"status = ?",
database.DeliveryStatusRetrying,
).
Find(&retrying).Error
if err != nil {
e.log.Error(
"retry sweep: "+
"failed to query retrying deliveries",
"webhook_id", webhookID,
"error", err,
)
return
}
for i := range retrying {
select {
case <-ctx.Done():
return
default:
}
e.sweepSingleRetry(
webhookDB, webhookID, &retrying[i],
)
}
}
func (e *Engine) sweepSingleRetry(
webhookDB *gorm.DB,
webhookID string,
d *database.Delivery,
) {
attemptNum := e.countAttempts(webhookDB, d.ID)
if !e.backoffElapsed(
webhookDB, d.ID, attemptNum,
) {
return
}
event, err := e.loadEvent(webhookDB, d.EventID)
if err != nil {
e.log.Error(
"retry sweep: failed to load event",
"delivery_id", d.ID,
"event_id", d.EventID,
"error", err,
)
return
}
target, err := e.loadTarget(d.TargetID)
if err != nil {
e.log.Error(
"retry sweep: failed to load target",
"delivery_id", d.ID,
"target_id", d.TargetID,
"error", err,
)
return
}
task := buildRecoveryTask(
d, webhookID, &event, &target, attemptNum+1,
)
select {
case e.retryCh <- task:
e.log.Info(
"retry sweep: "+
"recovered orphaned retrying delivery",
"delivery_id", d.ID,
"webhook_id", webhookID,
"attempt", attemptNum+1,
)
default:
}
}
func (e *Engine) processDelivery(
ctx context.Context,
webhookDB *gorm.DB,
d *database.Delivery,
task *Task,
) {
switch d.Target.Type {
case database.TargetTypeHTTP:
e.deliverHTTP(ctx, webhookDB, d, task)
case database.TargetTypeDatabase:
e.deliverDatabase(webhookDB, d)
case database.TargetTypeLog:
e.deliverLog(webhookDB, d)
case database.TargetTypeSlack:
e.deliverSlack(ctx, webhookDB, d)
default:
e.log.Error(
"unknown target type",
"target_id", d.TargetID,
"type", d.Target.Type,
)
e.updateDeliveryStatus(
webhookDB, d, database.DeliveryStatusFailed,
)
}
}
func (e *Engine) deliverHTTP(
ctx context.Context,
webhookDB *gorm.DB,
d *database.Delivery,
task *Task,
) {
cfg, err := e.parseHTTPConfig(d.Target.Config)
if err != nil {
e.log.Error(
"invalid HTTP target config",
"target_id", d.TargetID,
"error", err,
)
e.recordResult(
webhookDB, d, task.AttemptNum,
false, 0, "", err.Error(), 0,
)
e.updateDeliveryStatus(
webhookDB, d, database.DeliveryStatusFailed,
)
return
}
if d.Target.MaxRetries == 0 {
e.deliverHTTPFireAndForget(
ctx, webhookDB, d, cfg,
)
return
}
e.deliverHTTPWithRetry(
ctx, webhookDB, d, task, cfg,
)
}
func (e *Engine) deliverHTTPFireAndForget(
ctx context.Context,
webhookDB *gorm.DB,
d *database.Delivery,
cfg *HTTPTargetConfig,
) {
statusCode, respBody, duration, reqErr :=
e.doHTTPRequest(ctx, cfg, &d.Event)
success := reqErr == nil &&
statusCode >= httpSuccessMin &&
statusCode < httpSuccessMax
errMsg := ""
if reqErr != nil {
errMsg = reqErr.Error()
}
e.recordResult(
webhookDB, d, 1, success,
statusCode, respBody, errMsg, duration,
)
if success {
e.updateDeliveryStatus(
webhookDB, d,
database.DeliveryStatusDelivered,
)
} else {
e.updateDeliveryStatus(
webhookDB, d,
database.DeliveryStatusFailed,
)
}
}
func (e *Engine) deliverHTTPWithRetry(
ctx context.Context,
webhookDB *gorm.DB,
d *database.Delivery,
task *Task,
cfg *HTTPTargetConfig,
) {
cb := e.getCircuitBreaker(task.TargetID)
if e.circuitBreakerBlock(
webhookDB, d, task, cb,
) {
return
}
attemptNum := task.AttemptNum
statusCode, respBody, duration, reqErr :=
e.doHTTPRequest(ctx, cfg, &d.Event)
success := reqErr == nil &&
statusCode >= httpSuccessMin &&
statusCode < httpSuccessMax
errMsg := ""
if reqErr != nil {
errMsg = reqErr.Error()
}
e.recordResult(
webhookDB, d, attemptNum, success,
statusCode, respBody, errMsg, duration,
)
if success {
cb.RecordSuccess()
e.updateDeliveryStatus(
webhookDB, d,
database.DeliveryStatusDelivered,
)
return
}
cb.RecordFailure()
e.handleHTTPRetry(webhookDB, d, task, attemptNum)
}
func (e *Engine) circuitBreakerBlock(
webhookDB *gorm.DB,
d *database.Delivery,
task *Task,
cb *CircuitBreaker,
) bool {
if cb.Allow() {
return false
}
remaining := cb.CooldownRemaining()
e.log.Info(
"circuit breaker open, skipping delivery",
"target_id", task.TargetID,
"target_name", task.TargetName,
"delivery_id", d.ID,
"cooldown_remaining", remaining,
)
e.updateDeliveryStatus(
webhookDB, d,
database.DeliveryStatusRetrying,
)
retryTask := *task
e.scheduleRetry(retryTask, remaining)
return true
}
func (e *Engine) handleHTTPRetry(
webhookDB *gorm.DB,
d *database.Delivery,
task *Task,
attemptNum int,
) {
if attemptNum >= d.Target.MaxRetries {
e.updateDeliveryStatus(
webhookDB, d,
database.DeliveryStatusFailed,
)
return
}
e.updateDeliveryStatus(
webhookDB, d, database.DeliveryStatusRetrying,
)
backoff := calcBackoff(attemptNum)
retryTask := *task
retryTask.AttemptNum = attemptNum + 1
e.scheduleRetry(retryTask, backoff)
}
func (e *Engine) getCircuitBreaker(
targetID string,
) *CircuitBreaker {
if val, ok := e.circuitBreakers.Load(targetID); ok {
cb, _ := val.(*CircuitBreaker)
return cb
}
fresh := NewCircuitBreaker()
actual, _ := e.circuitBreakers.LoadOrStore(
targetID, fresh,
)
cb, _ := actual.(*CircuitBreaker)
return cb
}
func (e *Engine) deliverDatabase(
webhookDB *gorm.DB, d *database.Delivery,
) {
e.recordResult(
webhookDB, d, 1, true, 0, "", "", 0,
)
e.updateDeliveryStatus(
webhookDB, d, database.DeliveryStatusDelivered,
)
}
func (e *Engine) deliverLog(
webhookDB *gorm.DB, d *database.Delivery,
) {
e.log.Info(
"webhook event delivered to log target",
"delivery_id", d.ID,
"event_id", d.EventID,
"target_id", d.TargetID,
"target_name", d.Target.Name,
"method", d.Event.Method,
"content_type", d.Event.ContentType,
"body_length", len(d.Event.Body),
)
e.recordResult(
webhookDB, d, 1, true, 0, "", "", 0,
)
e.updateDeliveryStatus(
webhookDB, d, database.DeliveryStatusDelivered,
)
}
func (e *Engine) deliverSlack(
ctx context.Context,
webhookDB *gorm.DB,
d *database.Delivery,
) {
cfg, err := e.parseSlackConfig(d.Target.Config)
if err != nil {
e.log.Error(
"invalid Slack target config",
"target_id", d.TargetID,
"error", err,
)
e.recordResult(
webhookDB, d, 1,
false, 0, "", err.Error(), 0,
)
e.updateDeliveryStatus(
webhookDB, d,
database.DeliveryStatusFailed,
)
return
}
msg := FormatSlackMessage(&d.Event)
payload, err := json.Marshal(
map[string]string{"text": msg},
)
if err != nil {
e.log.Error(
"failed to marshal Slack payload",
"target_id", d.TargetID,
"error", err,
)
e.recordResult(
webhookDB, d, 1,
false, 0, "", err.Error(), 0,
)
e.updateDeliveryStatus(
webhookDB, d,
database.DeliveryStatusFailed,
)
return
}
e.sendSlackRequest(
ctx, webhookDB, d, cfg, payload,
)
}
func (e *Engine) sendSlackRequest(
ctx context.Context,
webhookDB *gorm.DB,
d *database.Delivery,
cfg *SlackTargetConfig,
payload []byte,
) {
start := time.Now()
req, err := http.NewRequestWithContext(
ctx,
http.MethodPost,
cfg.WebhookURL,
bytes.NewReader(payload),
)
if err != nil {
e.failSlackDelivery(
webhookDB, d, err.Error(), 0,
)
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "webhooker/1.0")
resp, doErr := e.executeRequest(req)
durationMs := time.Since(start).Milliseconds()
if doErr != nil {
errStr := fmt.Errorf(
"sending request: %w", doErr,
).Error()
e.failSlackDelivery(
webhookDB, d, errStr, durationMs,
)
return
}
defer func() { _ = resp.Body.Close() }()
e.handleSlackResponse(
webhookDB, d, resp, durationMs,
)
}
func (e *Engine) failSlackDelivery(
webhookDB *gorm.DB,
d *database.Delivery,
errMsg string,
durationMs int64,
) {
e.recordResult(
webhookDB, d, 1,
false, 0, "", errMsg, durationMs,
)
e.updateDeliveryStatus(
webhookDB, d,
database.DeliveryStatusFailed,
)
}
func (e *Engine) handleSlackResponse(
webhookDB *gorm.DB,
d *database.Delivery,
resp *http.Response,
durationMs int64,
) {
body, readErr := io.ReadAll(
io.LimitReader(resp.Body, maxBodyLog),
)
if readErr != nil {
e.log.Error(
"failed to read Slack response body",
"error", readErr,
)
}
respBody := string(body)
success := resp.StatusCode >= httpSuccessMin &&
resp.StatusCode < httpSuccessMax
errMsg := ""
if !success {
errMsg = fmt.Sprintf("HTTP %d", resp.StatusCode)
}
e.recordResult(
webhookDB, d, 1, success,
resp.StatusCode, respBody, errMsg, durationMs,
)
if success {
e.updateDeliveryStatus(
webhookDB, d,
database.DeliveryStatusDelivered,
)
} else {
e.updateDeliveryStatus(
webhookDB, d,
database.DeliveryStatusFailed,
)
}
}
func (e *Engine) parseSlackConfig(
configJSON string,
) (*SlackTargetConfig, error) {
if configJSON == "" {
return nil, errEmptyTargetConfig
}
var cfg SlackTargetConfig
err := json.Unmarshal(
[]byte(configJSON), &cfg,
)
if err != nil {
return nil, fmt.Errorf(
"parsing config JSON: %w", err,
)
}
if cfg.WebhookURL == "" {
return nil, errMissingWebhookURL
}
return &cfg, nil
}
func (e *Engine) doHTTPRequest(
ctx context.Context,
cfg *HTTPTargetConfig,
event *database.Event,
) (int, string, int64, error) {
start := time.Now()
req, reqErr := http.NewRequestWithContext(
ctx,
http.MethodPost,
cfg.URL,
bytes.NewReader([]byte(event.Body)),
)
if reqErr != nil {
return 0, "", 0, fmt.Errorf(
"creating request: %w", reqErr,
)
}
applyRequestHeaders(req, event, cfg)
client := e.clientForConfig(cfg)
resp, doErr := executeHTTPRequest(client, req)
dur := time.Since(start).Milliseconds()
if doErr != nil {
return 0, "", dur, fmt.Errorf(
"sending request: %w", doErr,
)
}
defer func() { _ = resp.Body.Close() }()
body, readErr := io.ReadAll(
io.LimitReader(resp.Body, maxBodyLog),
)
if readErr != nil {
return resp.StatusCode, "", dur,
fmt.Errorf(
"reading response body: %w", readErr,
)
}
return resp.StatusCode, string(body), dur, nil
}
func (e *Engine) recordResult(
webhookDB *gorm.DB,
d *database.Delivery,
attemptNum int,
success bool,
statusCode int,
respBody, errMsg string,
durationMs int64,
) {
result := &database.DeliveryResult{
DeliveryID: d.ID,
AttemptNum: attemptNum,
Success: success,
StatusCode: statusCode,
ResponseBody: truncate(respBody, maxBodyLog),
Error: errMsg,
Duration: durationMs,
}
err := webhookDB.Create(result).Error
if err != nil {
e.log.Error(
"failed to record delivery result",
"delivery_id", d.ID,
"error", err,
)
}
}
func (e *Engine) updateDeliveryStatus(
webhookDB *gorm.DB,
d *database.Delivery,
status database.DeliveryStatus,
) {
err := webhookDB.Model(d).
Update("status", status).Error
if err != nil {
e.log.Error(
"failed to update delivery status",
"delivery_id", d.ID,
"status", status,
"error", err,
)
}
}
func (e *Engine) parseHTTPConfig(
configJSON string,
) (*HTTPTargetConfig, error) {
if configJSON == "" {
return nil, errEmptyTargetConfig
}
var cfg HTTPTargetConfig
err := json.Unmarshal(
[]byte(configJSON), &cfg,
)
if err != nil {
return nil, fmt.Errorf(
"parsing config JSON: %w", err,
)
}
if cfg.URL == "" {
return nil, errMissingTargetURL
}
return &cfg, nil
}
// isForwardableHeader returns true if the header should
// be forwarded to targets.
func isForwardableHeader(name string) bool {
switch http.CanonicalHeaderKey(name) {
case "Host", "Connection", "Keep-Alive",
"Transfer-Encoding", "Te", "Trailer",
"Upgrade", "Proxy-Authorization",
"Proxy-Connection", "Content-Length":
return false
default:
return true
}
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}
// --- Helper functions ---
func buildEventFromTask(task *Task) database.Event {
event := database.Event{
Method: task.Method,
Headers: task.Headers,
ContentType: task.ContentType,
}
event.ID = task.EventID
event.WebhookID = task.WebhookID
return event
}
func buildTargetFromTask(task *Task) database.Target {
target := database.Target{
Name: task.TargetName,
Type: task.TargetType,
Config: task.TargetConfig,
MaxRetries: task.MaxRetries,
}
target.ID = task.TargetID
return target
}
func (e *Engine) resolveEventBody(
webhookDB *gorm.DB,
event database.Event,
task *Task,
) (database.Event, error) {
if task.Body != nil {
event.Body = *task.Body
return event, nil
}
var dbEvent database.Event
err := webhookDB.Select("body").
First(&dbEvent, "id = ?", task.EventID).Error
if err != nil {
return event, fmt.Errorf(
"fetching event body: %w", err,
)
}
event.Body = dbEvent.Body
return event, nil
}
func (e *Engine) loadRetryDelivery(
webhookDB *gorm.DB, deliveryID string,
) (*database.Delivery, error) {
var d database.Delivery
err := webhookDB.Select("id", "status").
First(&d, "id = ?", deliveryID).Error
if err != nil {
return nil, fmt.Errorf(
"loading delivery: %w", err,
)
}
return &d, nil
}
func (e *Engine) countAttempts(
webhookDB *gorm.DB, deliveryID string,
) int {
var resultCount int64
webhookDB.Model(&database.DeliveryResult{}).
Where("delivery_id = ?", deliveryID).
Count(&resultCount)
return int(resultCount)
}
func (e *Engine) loadEvent(
webhookDB *gorm.DB, eventID string,
) (database.Event, error) {
var event database.Event
err := webhookDB.
First(&event, "id = ?", eventID).Error
if err != nil {
return event, fmt.Errorf(
"loading event: %w", err,
)
}
return event, nil
}
func (e *Engine) loadTarget(
targetID string,
) (database.Target, error) {
var target database.Target
err := e.database.DB().
First(&target, "id = ?", targetID).Error
if err != nil {
return target, fmt.Errorf(
"loading target: %w", err,
)
}
return target, nil
}
func calcBackoff(attemptNum int) time.Duration {
shift := max(attemptNum-1, 0)
shift = min(shift, maxBackoffShift)
return time.Duration(1<<uint(shift)) * time.Second
}
func (e *Engine) calcRemainingBackoff(
webhookDB *gorm.DB,
deliveryID string,
attemptNum int,
) time.Duration {
var lastResult database.DeliveryResult
err := webhookDB.
Where("delivery_id = ?", deliveryID).
Order("created_at DESC").
First(&lastResult).Error
if err != nil {
return 0
}
backoff := calcBackoff(attemptNum)
elapsed := time.Since(lastResult.CreatedAt)
remaining := backoff - elapsed
return max(remaining, 0)
}
func (e *Engine) backoffElapsed(
webhookDB *gorm.DB,
deliveryID string,
attemptNum int,
) bool {
var lastResult database.DeliveryResult
err := webhookDB.
Where("delivery_id = ?", deliveryID).
Order("created_at DESC").
First(&lastResult).Error
if err != nil {
return true
}
backoff := calcBackoff(attemptNum)
return time.Since(lastResult.CreatedAt) >= backoff
}
func buildRecoveryTask(
d *database.Delivery,
webhookID string,
event *database.Event,
target *database.Target,
attemptNum int,
) Task {
var bodyPtr *string
if len(event.Body) < MaxInlineBodySize {
bodyStr := event.Body
bodyPtr = &bodyStr
}
return Task{
DeliveryID: d.ID,
EventID: d.EventID,
WebhookID: webhookID,
TargetID: target.ID,
TargetName: target.Name,
TargetType: target.Type,
TargetConfig: target.Config,
MaxRetries: target.MaxRetries,
Method: event.Method,
Headers: event.Headers,
ContentType: event.ContentType,
Body: bodyPtr,
AttemptNum: attemptNum,
}
}
func (e *Engine) loadTargetMap(
deliveries []database.Delivery,
) map[string]database.Target {
seen := make(map[string]bool)
targetIDs := make([]string, 0, len(deliveries))
for _, d := range deliveries {
if !seen[d.TargetID] {
targetIDs = append(targetIDs, d.TargetID)
seen[d.TargetID] = true
}
}
var targets []database.Target
err := e.database.DB().
Where("id IN ?", targetIDs).
Find(&targets).Error
if err != nil {
e.log.Error(
"failed to load targets from main DB",
"error", err,
)
return nil
}
targetMap := make(
map[string]database.Target, len(targets),
)
for _, t := range targets {
targetMap[t.ID] = t
}
return targetMap
}
func (e *Engine) sendRecoveredDeliveries(
ctx context.Context,
deliveries []database.Delivery,
webhookID string,
targetMap map[string]database.Target,
) {
for i := range deliveries {
select {
case <-ctx.Done():
return
default:
}
target, ok := targetMap[deliveries[i].TargetID]
if !ok {
e.log.Error(
"target not found for delivery",
"delivery_id", deliveries[i].ID,
"target_id", deliveries[i].TargetID,
)
continue
}
task := buildRecoveryTask(
&deliveries[i], webhookID,
&deliveries[i].Event, &target, 1,
)
select {
case e.deliveryCh <- task:
default:
e.log.Warn(
"delivery channel full during "+
"recovery, remaining deliveries "+
"will be recovered on next restart",
"delivery_id", deliveries[i].ID,
)
return
}
}
}
func formatJSONBody(body string) string {
var parsed json.RawMessage
if json.Unmarshal([]byte(body), &parsed) != nil {
return ""
}
var pretty bytes.Buffer
if json.Indent(&pretty, parsed, "", " ") != nil {
return ""
}
var b strings.Builder
b.WriteString("\n```\n")
prettyStr := pretty.String()
const maxPayloadDisplay = 3500
if len(prettyStr) > maxPayloadDisplay {
b.WriteString(prettyStr[:maxPayloadDisplay])
b.WriteString("\n... (truncated)")
} else {
b.WriteString(prettyStr)
}
b.WriteString("\n```\n")
return b.String()
}
func formatRawBody(b *strings.Builder, body string) {
b.WriteString("\n```\n")
const maxRawDisplay = 3500
if len(body) > maxRawDisplay {
b.WriteString(body[:maxRawDisplay])
b.WriteString("\n... (truncated)")
} else {
b.WriteString(body)
}
b.WriteString("\n```\n")
}
func applyRequestHeaders(
req *http.Request,
event *database.Event,
cfg *HTTPTargetConfig,
) {
if event.ContentType != "" {
req.Header.Set(
"Content-Type", event.ContentType,
)
}
var originalHeaders map[string][]string
if event.Headers != "" {
jsonErr := json.Unmarshal(
[]byte(event.Headers),
&originalHeaders,
)
if jsonErr == nil {
for k, vals := range originalHeaders {
if isForwardableHeader(k) {
for _, v := range vals {
req.Header.Add(k, v)
}
}
}
}
}
for k, v := range cfg.Headers {
req.Header.Set(k, v)
}
req.Header.Set("User-Agent", "webhooker/1.0")
}
func (e *Engine) clientForConfig(
cfg *HTTPTargetConfig,
) *http.Client {
if cfg.Timeout > 0 {
return &http.Client{
Timeout: time.Duration(
cfg.Timeout,
) * time.Second,
}
}
return e.client
}
// executeRequest sends an HTTP request using the engine's
// default client. URLs are validated by SSRF-safe
// transport and config parsers before reaching here.
func (e *Engine) executeRequest(
req *http.Request,
) (*http.Response, error) {
return e.client.Do(req) //#nosec G704 -- URL validated by parseSlackConfig and SSRF-safe transport
}
// executeHTTPRequest sends an HTTP request using the
// provided client. URLs are validated by config parsers
// and SSRF-safe transport before reaching here.
func executeHTTPRequest(
client *http.Client, req *http.Request,
) (*http.Response, error) {
return client.Do(req) //#nosec G704 -- URL validated by parseHTTPConfig and SSRF-safe transport
}