1101 lines
33 KiB
Go
1101 lines
33 KiB
Go
package delivery
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/fx"
|
|
"gorm.io/gorm"
|
|
"sneak.berlin/go/webhooker/internal/database"
|
|
"sneak.berlin/go/webhooker/internal/logger"
|
|
)
|
|
|
|
const (
|
|
// deliveryChannelSize is the buffer size for the delivery channel.
|
|
// New DeliveryTasks from the webhook handler are sent here. Workers
|
|
// drain this channel. Sized large enough that the webhook handler
|
|
// should never block under normal load.
|
|
deliveryChannelSize = 10000
|
|
|
|
// retryChannelSize is the buffer size for the retry channel.
|
|
// Timer-fired retries are sent here for processing by workers.
|
|
retryChannelSize = 10000
|
|
|
|
// defaultWorkers is the number of worker goroutines in the delivery
|
|
// engine pool. At most this many deliveries are in-flight at any
|
|
// time, preventing goroutine explosions regardless of queue depth.
|
|
defaultWorkers = 10
|
|
|
|
// retrySweepInterval is how often the periodic retry sweep runs.
|
|
// The sweep scans all per-webhook databases for "orphaned" retrying
|
|
// deliveries — ones whose in-memory timer was dropped because the
|
|
// retry channel was full. This is the DB-mediated fallback path.
|
|
retrySweepInterval = 60 * time.Second
|
|
|
|
// MaxInlineBodySize is the maximum event body size that will be carried
|
|
// inline in a DeliveryTask through the channel. Bodies at or above this
|
|
// size are left nil and fetched from the per-webhook database on demand.
|
|
// This keeps channel buffer memory bounded under high traffic.
|
|
MaxInlineBodySize = 16 * 1024
|
|
|
|
// httpClientTimeout is the timeout for outbound HTTP requests.
|
|
httpClientTimeout = 30 * time.Second
|
|
|
|
// maxBodyLog is the maximum response body length to store in DeliveryResult.
|
|
maxBodyLog = 4096
|
|
)
|
|
|
|
// DeliveryTask contains everything needed to deliver an event to a single
|
|
// target. In the ≤16KB happy path, Body is non-nil and the engine delivers
|
|
// without touching any database — it trusts that the webhook handler wrote
|
|
// the records correctly. Only after a delivery attempt (success or failure)
|
|
// does the engine write to the DB to record the result.
|
|
//
|
|
// When Body is nil (payload ≥ MaxInlineBodySize), the engine fetches the
|
|
// body from the per-webhook database using EventID before delivering.
|
|
type DeliveryTask struct {
|
|
DeliveryID string // ID of the Delivery record (for recording results)
|
|
EventID string // Event ID (for DB lookup if body is nil)
|
|
WebhookID string // Webhook ID (for per-webhook DB access)
|
|
|
|
// Target info (from main DB, included at notification time)
|
|
TargetID string
|
|
TargetName string
|
|
TargetType database.TargetType
|
|
TargetConfig string // JSON config (URL, headers, etc.)
|
|
MaxRetries int
|
|
|
|
// Event data (inline for ≤16KB bodies)
|
|
Method string
|
|
Headers string // JSON
|
|
ContentType string
|
|
Body *string // nil if body ≥ MaxInlineBodySize; fetch from DB by EventID
|
|
|
|
// AttemptNum tracks the delivery attempt number. Set to 1 for the
|
|
// initial delivery and incremented for each retry. This avoids a DB
|
|
// query to count prior results in the hot path.
|
|
AttemptNum int
|
|
}
|
|
|
|
// Notifier is the interface for notifying the delivery engine about new
|
|
// deliveries. Implemented by Engine and injected into handlers.
|
|
type Notifier interface {
|
|
Notify(tasks []DeliveryTask)
|
|
}
|
|
|
|
// HTTPTargetConfig holds configuration for http and retry target types.
|
|
type HTTPTargetConfig struct {
|
|
URL string `json:"url"`
|
|
Headers map[string]string `json:"headers,omitempty"`
|
|
Timeout int `json:"timeout,omitempty"` // seconds, 0 = default
|
|
}
|
|
|
|
// EngineParams are the fx dependencies for the delivery engine.
|
|
//
|
|
//nolint:revive // EngineParams is a standard fx naming convention
|
|
type EngineParams struct {
|
|
fx.In
|
|
DB *database.Database
|
|
DBManager *database.WebhookDBManager
|
|
Logger *logger.Logger
|
|
}
|
|
|
|
// Engine processes queued deliveries in the background using a bounded
|
|
// worker pool architecture. New deliveries arrive as individual
|
|
// DeliveryTask values via a buffered delivery channel from the webhook
|
|
// handler. Failed deliveries that need retry are scheduled via Go timers
|
|
// with exponential backoff; each timer fires into a separate retry
|
|
// channel. A fixed number of worker goroutines drain both channels,
|
|
// ensuring at most N deliveries are in-flight at any time (N = number
|
|
// of workers). This prevents goroutine explosions when a circuit breaker
|
|
// is open for a long period and many retries queue up.
|
|
//
|
|
// In the happy path (body ≤ 16KB), a worker delivers without reading
|
|
// from any database — it only writes to record results. The database
|
|
// stores delivery status for crash recovery only; on startup the engine
|
|
// scans for interrupted deliveries and re-queues them into the channels.
|
|
type Engine struct {
|
|
database *database.Database
|
|
dbManager *database.WebhookDBManager
|
|
log *slog.Logger
|
|
client *http.Client
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
deliveryCh chan DeliveryTask
|
|
retryCh chan DeliveryTask
|
|
workers int
|
|
|
|
// circuitBreakers stores a *CircuitBreaker per target ID. Only used
|
|
// for HTTP targets with MaxRetries > 0 — fire-and-forget HTTP targets
|
|
// (MaxRetries == 0), database targets, and log targets do not need
|
|
// circuit breakers because they either fire once or are local ops.
|
|
circuitBreakers sync.Map
|
|
}
|
|
|
|
// New creates and registers the delivery engine with the fx lifecycle.
|
|
func New(lc fx.Lifecycle, params EngineParams) *Engine {
|
|
e := &Engine{
|
|
database: params.DB,
|
|
dbManager: params.DBManager,
|
|
log: params.Logger.Get(),
|
|
client: &http.Client{
|
|
Timeout: httpClientTimeout,
|
|
},
|
|
deliveryCh: make(chan DeliveryTask, deliveryChannelSize),
|
|
retryCh: make(chan DeliveryTask, retryChannelSize),
|
|
workers: defaultWorkers,
|
|
}
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(_ context.Context) error {
|
|
e.start()
|
|
return nil
|
|
},
|
|
OnStop: func(_ context.Context) error {
|
|
e.stop()
|
|
return nil
|
|
},
|
|
})
|
|
|
|
return e
|
|
}
|
|
|
|
func (e *Engine) start() {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
e.cancel = cancel
|
|
|
|
// Start the worker pool. These are the ONLY goroutines that
|
|
// perform HTTP delivery. Bounded concurrency is guaranteed.
|
|
for i := 0; i < e.workers; i++ {
|
|
e.wg.Add(1)
|
|
go e.worker(ctx)
|
|
}
|
|
|
|
// Start recovery scan in a separate goroutine. Recovered tasks
|
|
// are sent into the delivery/retry channels and picked up by workers.
|
|
e.wg.Add(1)
|
|
go e.recoverPending(ctx)
|
|
|
|
// Start the periodic retry sweep. This is the DB-mediated fallback
|
|
// for retries whose timers were dropped due to channel overflow.
|
|
e.wg.Add(1)
|
|
go e.retrySweep(ctx)
|
|
|
|
e.log.Info("delivery engine started", "workers", e.workers)
|
|
}
|
|
|
|
func (e *Engine) stop() {
|
|
e.log.Info("delivery engine stopping")
|
|
e.cancel()
|
|
e.wg.Wait()
|
|
e.log.Info("delivery engine stopped")
|
|
}
|
|
|
|
// Notify signals the delivery engine that new deliveries are ready.
|
|
// Called by the webhook handler after creating delivery records. Each
|
|
// DeliveryTask carries all data needed for delivery in the ≤16KB case.
|
|
// Tasks are sent individually to the delivery channel. The call is
|
|
// non-blocking; if the channel is full, a warning is logged and the
|
|
// delivery will be recovered on the next engine restart.
|
|
func (e *Engine) Notify(tasks []DeliveryTask) {
|
|
for i := range tasks {
|
|
select {
|
|
case e.deliveryCh <- tasks[i]:
|
|
default:
|
|
e.log.Warn("delivery channel full, task will be recovered on restart",
|
|
"delivery_id", tasks[i].DeliveryID,
|
|
"event_id", tasks[i].EventID,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
// worker is the main loop for a worker goroutine. It selects from both
|
|
// the delivery channel (new tasks from the handler) and the retry channel
|
|
// (tasks from backoff timers). At most e.workers deliveries are in-flight
|
|
// at any time.
|
|
func (e *Engine) worker(ctx context.Context) {
|
|
defer e.wg.Done()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case task := <-e.deliveryCh:
|
|
e.processNewTask(ctx, &task)
|
|
case task := <-e.retryCh:
|
|
e.processRetryTask(ctx, &task)
|
|
}
|
|
}
|
|
}
|
|
|
|
// recoverPending runs on startup to recover any pending or retrying
|
|
// deliveries that were interrupted by an unexpected shutdown. Recovered
|
|
// tasks are sent into the delivery/retry channels for workers to pick up.
|
|
func (e *Engine) recoverPending(ctx context.Context) {
|
|
defer e.wg.Done()
|
|
e.recoverInFlight(ctx)
|
|
}
|
|
|
|
// processNewTask handles a single new delivery task from the delivery
|
|
// channel. It builds the event and target context from the task's inline
|
|
// data and executes the delivery. For large bodies (≥ MaxInlineBodySize),
|
|
// the body is fetched from the per-webhook database on demand.
|
|
func (e *Engine) processNewTask(ctx context.Context, task *DeliveryTask) {
|
|
webhookDB, err := e.dbManager.GetDB(task.WebhookID)
|
|
if err != nil {
|
|
e.log.Error("failed to get webhook database",
|
|
"webhook_id", task.WebhookID,
|
|
"error", err,
|
|
)
|
|
return
|
|
}
|
|
|
|
// Build Event from task data
|
|
event := database.Event{
|
|
Method: task.Method,
|
|
Headers: task.Headers,
|
|
ContentType: task.ContentType,
|
|
}
|
|
event.ID = task.EventID
|
|
event.WebhookID = task.WebhookID
|
|
|
|
if task.Body != nil {
|
|
event.Body = *task.Body
|
|
} else {
|
|
// Large body: fetch from per-webhook DB
|
|
var dbEvent database.Event
|
|
if err := webhookDB.Select("body").
|
|
First(&dbEvent, "id = ?", task.EventID).Error; err != nil {
|
|
e.log.Error("failed to fetch event body from database",
|
|
"event_id", task.EventID,
|
|
"error", err,
|
|
)
|
|
return
|
|
}
|
|
event.Body = dbEvent.Body
|
|
}
|
|
|
|
// Build Target from task data (no main DB query needed)
|
|
target := database.Target{
|
|
Name: task.TargetName,
|
|
Type: task.TargetType,
|
|
Config: task.TargetConfig,
|
|
MaxRetries: task.MaxRetries,
|
|
}
|
|
target.ID = task.TargetID
|
|
|
|
// Build Delivery struct for the processing chain
|
|
d := &database.Delivery{
|
|
EventID: task.EventID,
|
|
TargetID: task.TargetID,
|
|
Status: database.DeliveryStatusPending,
|
|
Event: event,
|
|
Target: target,
|
|
}
|
|
d.ID = task.DeliveryID
|
|
|
|
e.processDelivery(ctx, webhookDB, d, task)
|
|
}
|
|
|
|
// processRetryTask handles a single delivery task fired by a retry timer.
|
|
// The task carries all data needed for delivery (same as the initial
|
|
// notification). The only DB read is a status check to verify the delivery
|
|
// hasn't been cancelled or resolved while the timer was pending.
|
|
func (e *Engine) processRetryTask(ctx context.Context, task *DeliveryTask) {
|
|
webhookDB, err := e.dbManager.GetDB(task.WebhookID)
|
|
if err != nil {
|
|
e.log.Error("failed to get webhook database for retry",
|
|
"webhook_id", task.WebhookID,
|
|
"delivery_id", task.DeliveryID,
|
|
"error", err,
|
|
)
|
|
return
|
|
}
|
|
|
|
// Verify delivery is still in retrying status (may have been
|
|
// cancelled or manually resolved while the timer was pending)
|
|
var d database.Delivery
|
|
if err := webhookDB.Select("id", "status").
|
|
First(&d, "id = ?", task.DeliveryID).Error; err != nil {
|
|
e.log.Error("failed to load delivery for retry",
|
|
"delivery_id", task.DeliveryID,
|
|
"error", err,
|
|
)
|
|
return
|
|
}
|
|
|
|
if d.Status != database.DeliveryStatusRetrying {
|
|
e.log.Debug("skipping retry for delivery no longer in retrying status",
|
|
"delivery_id", d.ID,
|
|
"status", d.Status,
|
|
)
|
|
return
|
|
}
|
|
|
|
// Build Event from task data
|
|
event := database.Event{
|
|
Method: task.Method,
|
|
Headers: task.Headers,
|
|
ContentType: task.ContentType,
|
|
}
|
|
event.ID = task.EventID
|
|
event.WebhookID = task.WebhookID
|
|
|
|
if task.Body != nil {
|
|
event.Body = *task.Body
|
|
} else {
|
|
// Large body: fetch from per-webhook DB
|
|
var dbEvent database.Event
|
|
if err := webhookDB.Select("body").
|
|
First(&dbEvent, "id = ?", task.EventID).Error; err != nil {
|
|
e.log.Error("failed to fetch event body for retry",
|
|
"event_id", task.EventID,
|
|
"error", err,
|
|
)
|
|
return
|
|
}
|
|
event.Body = dbEvent.Body
|
|
}
|
|
|
|
// Build Target from task data
|
|
target := database.Target{
|
|
Name: task.TargetName,
|
|
Type: task.TargetType,
|
|
Config: task.TargetConfig,
|
|
MaxRetries: task.MaxRetries,
|
|
}
|
|
target.ID = task.TargetID
|
|
|
|
// Populate the delivery with event and target for processing
|
|
d.EventID = task.EventID
|
|
d.TargetID = task.TargetID
|
|
d.Event = event
|
|
d.Target = target
|
|
|
|
e.processDelivery(ctx, webhookDB, &d, task)
|
|
}
|
|
|
|
// recoverInFlight scans all webhooks on startup for deliveries that were
|
|
// interrupted by an unexpected shutdown. Pending deliveries are sent to
|
|
// the delivery channel; retrying deliveries get timers scheduled for
|
|
// their remaining backoff period.
|
|
func (e *Engine) recoverInFlight(ctx context.Context) {
|
|
var webhookIDs []string
|
|
if err := e.database.DB().Model(&database.Webhook{}).Pluck("id", &webhookIDs).Error; err != nil {
|
|
e.log.Error("failed to query webhook IDs for recovery", "error", err)
|
|
return
|
|
}
|
|
|
|
for _, webhookID := range webhookIDs {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
if !e.dbManager.DBExists(webhookID) {
|
|
continue
|
|
}
|
|
|
|
e.recoverWebhookDeliveries(ctx, webhookID)
|
|
}
|
|
}
|
|
|
|
// recoverWebhookDeliveries recovers pending and retrying deliveries for
|
|
// a single webhook. Pending deliveries are sent to the delivery channel;
|
|
// retrying deliveries get timers scheduled for their remaining backoff.
|
|
func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) {
|
|
webhookDB, err := e.dbManager.GetDB(webhookID)
|
|
if err != nil {
|
|
e.log.Error("failed to get webhook database for recovery",
|
|
"webhook_id", webhookID,
|
|
"error", err,
|
|
)
|
|
return
|
|
}
|
|
|
|
// Recover pending deliveries by sending them to the delivery channel
|
|
e.recoverPendingDeliveries(ctx, webhookDB, webhookID)
|
|
|
|
// Schedule timers for retrying deliveries based on remaining backoff
|
|
var retrying []database.Delivery
|
|
if err := webhookDB.Where("status = ?", database.DeliveryStatusRetrying).
|
|
Find(&retrying).Error; err != nil {
|
|
e.log.Error("failed to query retrying deliveries for recovery",
|
|
"webhook_id", webhookID,
|
|
"error", err,
|
|
)
|
|
return
|
|
}
|
|
|
|
for i := range retrying {
|
|
d := &retrying[i]
|
|
|
|
var resultCount int64
|
|
webhookDB.Model(&database.DeliveryResult{}).
|
|
Where("delivery_id = ?", d.ID).
|
|
Count(&resultCount)
|
|
attemptNum := int(resultCount)
|
|
|
|
// Load event for this delivery
|
|
var event database.Event
|
|
if err := webhookDB.First(&event, "id = ?", d.EventID).Error; err != nil {
|
|
e.log.Error("failed to load event for retrying delivery recovery",
|
|
"delivery_id", d.ID,
|
|
"event_id", d.EventID,
|
|
"error", err,
|
|
)
|
|
continue
|
|
}
|
|
|
|
// Load target from main DB
|
|
var target database.Target
|
|
if err := e.database.DB().First(&target, "id = ?", d.TargetID).Error; err != nil {
|
|
e.log.Error("failed to load target for retrying delivery recovery",
|
|
"delivery_id", d.ID,
|
|
"target_id", d.TargetID,
|
|
"error", err,
|
|
)
|
|
continue
|
|
}
|
|
|
|
// Calculate remaining backoff from last attempt
|
|
remaining := time.Duration(0)
|
|
|
|
var lastResult database.DeliveryResult
|
|
if err := webhookDB.Where("delivery_id = ?", d.ID).
|
|
Order("created_at DESC").
|
|
First(&lastResult).Error; err == nil {
|
|
shift := attemptNum - 1
|
|
if shift < 0 {
|
|
shift = 0
|
|
}
|
|
if shift > 30 {
|
|
shift = 30
|
|
}
|
|
backoff := time.Duration(1<<uint(shift)) * time.Second //nolint:gosec // bounded above
|
|
elapsed := time.Since(lastResult.CreatedAt)
|
|
remaining = backoff - elapsed
|
|
if remaining < 0 {
|
|
remaining = 0
|
|
}
|
|
}
|
|
|
|
// Build task from DB data. Use body pointer semantics: inline
|
|
// for small bodies, nil for large ones (will be fetched on retry).
|
|
var bodyPtr *string
|
|
if len(event.Body) < MaxInlineBodySize {
|
|
bodyStr := event.Body
|
|
bodyPtr = &bodyStr
|
|
}
|
|
|
|
task := DeliveryTask{
|
|
DeliveryID: d.ID,
|
|
EventID: d.EventID,
|
|
WebhookID: webhookID,
|
|
TargetID: target.ID,
|
|
TargetName: target.Name,
|
|
TargetType: target.Type,
|
|
TargetConfig: target.Config,
|
|
MaxRetries: target.MaxRetries,
|
|
Method: event.Method,
|
|
Headers: event.Headers,
|
|
ContentType: event.ContentType,
|
|
Body: bodyPtr,
|
|
AttemptNum: attemptNum + 1,
|
|
}
|
|
|
|
e.log.Info("recovering retrying delivery",
|
|
"webhook_id", webhookID,
|
|
"delivery_id", d.ID,
|
|
"attempt", attemptNum,
|
|
"remaining_backoff", remaining,
|
|
)
|
|
|
|
e.scheduleRetry(task, remaining)
|
|
}
|
|
}
|
|
|
|
// recoverPendingDeliveries sends pending deliveries for a single webhook
|
|
// into the delivery channel. Used for crash recovery where we don't have
|
|
// in-memory notifications — everything is loaded from the DB and queued
|
|
// for workers to pick up.
|
|
func (e *Engine) recoverPendingDeliveries(ctx context.Context, webhookDB *gorm.DB, webhookID string) {
|
|
var deliveries []database.Delivery
|
|
result := webhookDB.
|
|
Where("status = ?", database.DeliveryStatusPending).
|
|
Preload("Event").
|
|
Find(&deliveries)
|
|
|
|
if result.Error != nil {
|
|
e.log.Error("failed to query pending deliveries",
|
|
"webhook_id", webhookID,
|
|
"error", result.Error,
|
|
)
|
|
return
|
|
}
|
|
|
|
if len(deliveries) == 0 {
|
|
return
|
|
}
|
|
|
|
e.log.Info("recovering pending deliveries",
|
|
"webhook_id", webhookID,
|
|
"count", len(deliveries),
|
|
)
|
|
|
|
// Collect unique target IDs and load targets from the main DB
|
|
seen := make(map[string]bool)
|
|
targetIDs := make([]string, 0, len(deliveries))
|
|
for _, d := range deliveries {
|
|
if !seen[d.TargetID] {
|
|
targetIDs = append(targetIDs, d.TargetID)
|
|
seen[d.TargetID] = true
|
|
}
|
|
}
|
|
|
|
var targets []database.Target
|
|
if err := e.database.DB().Where("id IN ?", targetIDs).Find(&targets).Error; err != nil {
|
|
e.log.Error("failed to load targets from main DB", "error", err)
|
|
return
|
|
}
|
|
|
|
targetMap := make(map[string]database.Target, len(targets))
|
|
for _, t := range targets {
|
|
targetMap[t.ID] = t
|
|
}
|
|
|
|
// Send recovered deliveries to the delivery channel for workers
|
|
for i := range deliveries {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
target, ok := targetMap[deliveries[i].TargetID]
|
|
if !ok {
|
|
e.log.Error("target not found for delivery",
|
|
"delivery_id", deliveries[i].ID,
|
|
"target_id", deliveries[i].TargetID,
|
|
)
|
|
continue
|
|
}
|
|
|
|
// Build task from DB data
|
|
var bodyPtr *string
|
|
if len(deliveries[i].Event.Body) < MaxInlineBodySize {
|
|
bodyStr := deliveries[i].Event.Body
|
|
bodyPtr = &bodyStr
|
|
}
|
|
|
|
task := DeliveryTask{
|
|
DeliveryID: deliveries[i].ID,
|
|
EventID: deliveries[i].EventID,
|
|
WebhookID: webhookID,
|
|
TargetID: target.ID,
|
|
TargetName: target.Name,
|
|
TargetType: target.Type,
|
|
TargetConfig: target.Config,
|
|
MaxRetries: target.MaxRetries,
|
|
Method: deliveries[i].Event.Method,
|
|
Headers: deliveries[i].Event.Headers,
|
|
ContentType: deliveries[i].Event.ContentType,
|
|
Body: bodyPtr,
|
|
AttemptNum: 1,
|
|
}
|
|
|
|
select {
|
|
case e.deliveryCh <- task:
|
|
default:
|
|
e.log.Warn("delivery channel full during recovery, remaining deliveries will be recovered on next restart",
|
|
"delivery_id", deliveries[i].ID,
|
|
)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// scheduleRetry creates a Go timer that fires after the given delay and
|
|
// sends the full DeliveryTask to the engine's retry channel. The task
|
|
// carries all data needed for the retry attempt, so when it fires, a
|
|
// worker can deliver without reading event or target data from the DB.
|
|
//
|
|
// If the retry channel is full when the timer fires, the timer is
|
|
// dropped. The delivery remains in `retrying` status in the database
|
|
// and will be picked up by the periodic retry sweep (DB-mediated
|
|
// fallback path). No goroutines are blocked or re-armed.
|
|
func (e *Engine) scheduleRetry(task DeliveryTask, delay time.Duration) {
|
|
e.log.Debug("scheduling delivery retry",
|
|
"webhook_id", task.WebhookID,
|
|
"delivery_id", task.DeliveryID,
|
|
"delay", delay,
|
|
"next_attempt", task.AttemptNum,
|
|
)
|
|
|
|
time.AfterFunc(delay, func() {
|
|
select {
|
|
case e.retryCh <- task:
|
|
default:
|
|
// Retry channel full — drop the timer. The delivery is
|
|
// already marked as `retrying` in the per-webhook DB, so
|
|
// the periodic retry sweep will pick it up. This is the
|
|
// DB-mediated fallback path: no blocked goroutines, no
|
|
// unbounded timer chains.
|
|
e.log.Warn("retry channel full, delivery will be recovered by periodic sweep",
|
|
"delivery_id", task.DeliveryID,
|
|
"webhook_id", task.WebhookID,
|
|
)
|
|
}
|
|
})
|
|
}
|
|
|
|
// retrySweep runs periodically to scan all per-webhook databases for
|
|
// "orphaned" retrying deliveries — ones whose in-memory retry timer was
|
|
// dropped because the retry channel was full. This is the DB-mediated
|
|
// fallback path that ensures no retries are permanently lost even under
|
|
// extreme backpressure.
|
|
//
|
|
// The sweep is also the same mechanism used on startup recovery, making
|
|
// the system resilient to both channel overflow and unexpected restarts.
|
|
func (e *Engine) retrySweep(ctx context.Context) {
|
|
defer e.wg.Done()
|
|
ticker := time.NewTicker(retrySweepInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
e.sweepOrphanedRetries(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// sweepOrphanedRetries scans all webhooks for retrying deliveries whose
|
|
// backoff period has elapsed. For each eligible delivery, it builds a
|
|
// DeliveryTask and sends it to the retry channel. If the channel is
|
|
// still full, the delivery is skipped and will be retried on the next
|
|
// sweep cycle.
|
|
func (e *Engine) sweepOrphanedRetries(ctx context.Context) {
|
|
var webhookIDs []string
|
|
if err := e.database.DB().Model(&database.Webhook{}).Pluck("id", &webhookIDs).Error; err != nil {
|
|
e.log.Error("retry sweep: failed to query webhook IDs", "error", err)
|
|
return
|
|
}
|
|
|
|
for _, webhookID := range webhookIDs {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
if !e.dbManager.DBExists(webhookID) {
|
|
continue
|
|
}
|
|
|
|
e.sweepWebhookRetries(ctx, webhookID)
|
|
}
|
|
}
|
|
|
|
// sweepWebhookRetries scans a single webhook's database for retrying
|
|
// deliveries whose backoff period has elapsed and sends them to the
|
|
// retry channel.
|
|
func (e *Engine) sweepWebhookRetries(ctx context.Context, webhookID string) {
|
|
webhookDB, err := e.dbManager.GetDB(webhookID)
|
|
if err != nil {
|
|
e.log.Error("retry sweep: failed to get webhook database",
|
|
"webhook_id", webhookID,
|
|
"error", err,
|
|
)
|
|
return
|
|
}
|
|
|
|
var retrying []database.Delivery
|
|
if err := webhookDB.Where("status = ?", database.DeliveryStatusRetrying).
|
|
Find(&retrying).Error; err != nil {
|
|
e.log.Error("retry sweep: failed to query retrying deliveries",
|
|
"webhook_id", webhookID,
|
|
"error", err,
|
|
)
|
|
return
|
|
}
|
|
|
|
for i := range retrying {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
d := &retrying[i]
|
|
|
|
// Count prior attempts to determine backoff
|
|
var resultCount int64
|
|
webhookDB.Model(&database.DeliveryResult{}).
|
|
Where("delivery_id = ?", d.ID).
|
|
Count(&resultCount)
|
|
attemptNum := int(resultCount)
|
|
|
|
// Check if the backoff period has elapsed since the last attempt.
|
|
// If it hasn't, this delivery likely has an active in-memory
|
|
// timer and is not orphaned — skip it.
|
|
var lastResult database.DeliveryResult
|
|
if err := webhookDB.Where("delivery_id = ?", d.ID).
|
|
Order("created_at DESC").
|
|
First(&lastResult).Error; err == nil {
|
|
shift := attemptNum - 1
|
|
if shift < 0 {
|
|
shift = 0
|
|
}
|
|
if shift > 30 {
|
|
shift = 30
|
|
}
|
|
backoff := time.Duration(1<<uint(shift)) * time.Second //nolint:gosec // bounded above
|
|
if time.Since(lastResult.CreatedAt) < backoff {
|
|
continue // Backoff hasn't elapsed; likely has an active timer
|
|
}
|
|
}
|
|
// If no result exists, the delivery was set to retrying but
|
|
// never actually attempted — process immediately.
|
|
|
|
// Load event for this delivery
|
|
var event database.Event
|
|
if err := webhookDB.First(&event, "id = ?", d.EventID).Error; err != nil {
|
|
e.log.Error("retry sweep: failed to load event",
|
|
"delivery_id", d.ID,
|
|
"event_id", d.EventID,
|
|
"error", err,
|
|
)
|
|
continue
|
|
}
|
|
|
|
// Load target from main DB
|
|
var target database.Target
|
|
if err := e.database.DB().First(&target, "id = ?", d.TargetID).Error; err != nil {
|
|
e.log.Error("retry sweep: failed to load target",
|
|
"delivery_id", d.ID,
|
|
"target_id", d.TargetID,
|
|
"error", err,
|
|
)
|
|
continue
|
|
}
|
|
|
|
// Build task from DB data
|
|
var bodyPtr *string
|
|
if len(event.Body) < MaxInlineBodySize {
|
|
bodyStr := event.Body
|
|
bodyPtr = &bodyStr
|
|
}
|
|
|
|
task := DeliveryTask{
|
|
DeliveryID: d.ID,
|
|
EventID: d.EventID,
|
|
WebhookID: webhookID,
|
|
TargetID: target.ID,
|
|
TargetName: target.Name,
|
|
TargetType: target.Type,
|
|
TargetConfig: target.Config,
|
|
MaxRetries: target.MaxRetries,
|
|
Method: event.Method,
|
|
Headers: event.Headers,
|
|
ContentType: event.ContentType,
|
|
Body: bodyPtr,
|
|
AttemptNum: attemptNum + 1,
|
|
}
|
|
|
|
// Try to send to retry channel; skip if still full
|
|
select {
|
|
case e.retryCh <- task:
|
|
e.log.Info("retry sweep: recovered orphaned retrying delivery",
|
|
"delivery_id", d.ID,
|
|
"webhook_id", webhookID,
|
|
"attempt", attemptNum+1,
|
|
)
|
|
default:
|
|
// Channel still full — will try again on the next sweep
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) {
|
|
switch d.Target.Type {
|
|
case database.TargetTypeHTTP:
|
|
e.deliverHTTP(ctx, webhookDB, d, task)
|
|
case database.TargetTypeDatabase:
|
|
e.deliverDatabase(webhookDB, d)
|
|
case database.TargetTypeLog:
|
|
e.deliverLog(webhookDB, d)
|
|
default:
|
|
e.log.Error("unknown target type",
|
|
"target_id", d.TargetID,
|
|
"type", d.Target.Type,
|
|
)
|
|
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) deliverHTTP(_ context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) {
|
|
cfg, err := e.parseHTTPConfig(d.Target.Config)
|
|
if err != nil {
|
|
e.log.Error("invalid HTTP target config",
|
|
"target_id", d.TargetID,
|
|
"error", err,
|
|
)
|
|
e.recordResult(webhookDB, d, task.AttemptNum, false, 0, "", err.Error(), 0)
|
|
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed)
|
|
return
|
|
}
|
|
|
|
maxRetries := d.Target.MaxRetries
|
|
|
|
// Fire-and-forget mode: max_retries == 0 means attempt once with no
|
|
// circuit breaker and no retry scheduling.
|
|
if maxRetries == 0 {
|
|
statusCode, respBody, duration, reqErr := e.doHTTPRequest(cfg, &d.Event)
|
|
|
|
success := reqErr == nil && statusCode >= 200 && statusCode < 300
|
|
errMsg := ""
|
|
if reqErr != nil {
|
|
errMsg = reqErr.Error()
|
|
}
|
|
|
|
e.recordResult(webhookDB, d, 1, success, statusCode, respBody, errMsg, duration)
|
|
|
|
if success {
|
|
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered)
|
|
} else {
|
|
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Retry mode: max_retries > 0 — use circuit breaker and exponential backoff.
|
|
|
|
// Check the circuit breaker for this target before attempting delivery.
|
|
cb := e.getCircuitBreaker(task.TargetID)
|
|
if !cb.Allow() {
|
|
// Circuit is open — skip delivery, mark as retrying, and
|
|
// schedule a retry for after the cooldown expires.
|
|
remaining := cb.CooldownRemaining()
|
|
e.log.Info("circuit breaker open, skipping delivery",
|
|
"target_id", task.TargetID,
|
|
"target_name", task.TargetName,
|
|
"delivery_id", d.ID,
|
|
"cooldown_remaining", remaining,
|
|
)
|
|
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying)
|
|
|
|
retryTask := *task
|
|
// Don't increment AttemptNum — this wasn't a real attempt
|
|
e.scheduleRetry(retryTask, remaining)
|
|
return
|
|
}
|
|
|
|
attemptNum := task.AttemptNum
|
|
|
|
// Attempt delivery immediately — backoff is handled by the timer
|
|
// that triggered this call, not by polling.
|
|
statusCode, respBody, duration, reqErr := e.doHTTPRequest(cfg, &d.Event)
|
|
|
|
success := reqErr == nil && statusCode >= 200 && statusCode < 300
|
|
errMsg := ""
|
|
if reqErr != nil {
|
|
errMsg = reqErr.Error()
|
|
}
|
|
|
|
e.recordResult(webhookDB, d, attemptNum, success, statusCode, respBody, errMsg, duration)
|
|
|
|
if success {
|
|
cb.RecordSuccess()
|
|
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered)
|
|
return
|
|
}
|
|
|
|
// Delivery failed — record failure in circuit breaker
|
|
cb.RecordFailure()
|
|
|
|
if attemptNum >= maxRetries {
|
|
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed)
|
|
} else {
|
|
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying)
|
|
|
|
// Schedule a timer for the next retry with exponential backoff.
|
|
// The timer fires a DeliveryTask into the retry channel carrying
|
|
// all data needed for the next attempt.
|
|
shift := attemptNum - 1
|
|
if shift > 30 {
|
|
shift = 30
|
|
}
|
|
backoff := time.Duration(1<<uint(shift)) * time.Second //nolint:gosec // bounded above
|
|
|
|
retryTask := *task
|
|
retryTask.AttemptNum = attemptNum + 1
|
|
e.scheduleRetry(retryTask, backoff)
|
|
}
|
|
}
|
|
|
|
// getCircuitBreaker returns the circuit breaker for the given target ID,
|
|
// creating one if it doesn't exist yet. Circuit breakers are in-memory
|
|
// only and reset on restart (startup recovery rescans the DB anyway).
|
|
func (e *Engine) getCircuitBreaker(targetID string) *CircuitBreaker {
|
|
if val, ok := e.circuitBreakers.Load(targetID); ok {
|
|
cb, _ := val.(*CircuitBreaker) //nolint:errcheck // type is guaranteed by LoadOrStore below
|
|
return cb
|
|
}
|
|
fresh := NewCircuitBreaker()
|
|
actual, _ := e.circuitBreakers.LoadOrStore(targetID, fresh)
|
|
cb, _ := actual.(*CircuitBreaker) //nolint:errcheck // we only store *CircuitBreaker values
|
|
return cb
|
|
}
|
|
|
|
// deliverDatabase handles the database target type. Since events are already
|
|
// stored in the per-webhook database (that's the whole point of per-webhook
|
|
// databases), the database target simply marks the delivery as successful.
|
|
// The per-webhook DB IS the dedicated event database for this webhook.
|
|
func (e *Engine) deliverDatabase(webhookDB *gorm.DB, d *database.Delivery) {
|
|
e.recordResult(webhookDB, d, 1, true, 0, "", "", 0)
|
|
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered)
|
|
}
|
|
|
|
func (e *Engine) deliverLog(webhookDB *gorm.DB, d *database.Delivery) {
|
|
e.log.Info("webhook event delivered to log target",
|
|
"delivery_id", d.ID,
|
|
"event_id", d.EventID,
|
|
"target_id", d.TargetID,
|
|
"target_name", d.Target.Name,
|
|
"method", d.Event.Method,
|
|
"content_type", d.Event.ContentType,
|
|
"body_length", len(d.Event.Body),
|
|
)
|
|
e.recordResult(webhookDB, d, 1, true, 0, "", "", 0)
|
|
e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered)
|
|
}
|
|
|
|
// doHTTPRequest performs the outbound HTTP POST to a target URL.
|
|
func (e *Engine) doHTTPRequest(cfg *HTTPTargetConfig, event *database.Event) (statusCode int, respBody string, durationMs int64, err error) {
|
|
start := time.Now()
|
|
|
|
req, err := http.NewRequest(http.MethodPost, cfg.URL, bytes.NewReader([]byte(event.Body)))
|
|
if err != nil {
|
|
return 0, "", 0, fmt.Errorf("creating request: %w", err)
|
|
}
|
|
|
|
// Set content type from original event
|
|
if event.ContentType != "" {
|
|
req.Header.Set("Content-Type", event.ContentType)
|
|
}
|
|
|
|
// Apply original headers (filtered)
|
|
var originalHeaders map[string][]string
|
|
if event.Headers != "" {
|
|
if jsonErr := json.Unmarshal([]byte(event.Headers), &originalHeaders); jsonErr == nil {
|
|
for k, vals := range originalHeaders {
|
|
if isForwardableHeader(k) {
|
|
for _, v := range vals {
|
|
req.Header.Add(k, v)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Apply target-specific headers (override)
|
|
for k, v := range cfg.Headers {
|
|
req.Header.Set(k, v)
|
|
}
|
|
|
|
req.Header.Set("User-Agent", "webhooker/1.0")
|
|
|
|
client := e.client
|
|
if cfg.Timeout > 0 {
|
|
client = &http.Client{Timeout: time.Duration(cfg.Timeout) * time.Second}
|
|
}
|
|
|
|
resp, err := client.Do(req)
|
|
durationMs = time.Since(start).Milliseconds()
|
|
if err != nil {
|
|
return 0, "", durationMs, fmt.Errorf("sending request: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
body, readErr := io.ReadAll(io.LimitReader(resp.Body, maxBodyLog))
|
|
if readErr != nil {
|
|
return resp.StatusCode, "", durationMs, fmt.Errorf("reading response body: %w", readErr)
|
|
}
|
|
|
|
return resp.StatusCode, string(body), durationMs, nil
|
|
}
|
|
|
|
func (e *Engine) recordResult(webhookDB *gorm.DB, d *database.Delivery, attemptNum int, success bool, statusCode int, respBody, errMsg string, durationMs int64) {
|
|
result := &database.DeliveryResult{
|
|
DeliveryID: d.ID,
|
|
AttemptNum: attemptNum,
|
|
Success: success,
|
|
StatusCode: statusCode,
|
|
ResponseBody: truncate(respBody, maxBodyLog),
|
|
Error: errMsg,
|
|
Duration: durationMs,
|
|
}
|
|
|
|
if err := webhookDB.Create(result).Error; err != nil {
|
|
e.log.Error("failed to record delivery result",
|
|
"delivery_id", d.ID,
|
|
"error", err,
|
|
)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) updateDeliveryStatus(webhookDB *gorm.DB, d *database.Delivery, status database.DeliveryStatus) {
|
|
if err := webhookDB.Model(d).Update("status", status).Error; err != nil {
|
|
e.log.Error("failed to update delivery status",
|
|
"delivery_id", d.ID,
|
|
"status", status,
|
|
"error", err,
|
|
)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) parseHTTPConfig(configJSON string) (*HTTPTargetConfig, error) {
|
|
if configJSON == "" {
|
|
return nil, fmt.Errorf("empty target config")
|
|
}
|
|
var cfg HTTPTargetConfig
|
|
if err := json.Unmarshal([]byte(configJSON), &cfg); err != nil {
|
|
return nil, fmt.Errorf("parsing config JSON: %w", err)
|
|
}
|
|
if cfg.URL == "" {
|
|
return nil, fmt.Errorf("target URL is required")
|
|
}
|
|
return &cfg, nil
|
|
}
|
|
|
|
// isForwardableHeader returns true if the header should be forwarded to targets.
|
|
// Hop-by-hop headers and internal headers are excluded.
|
|
func isForwardableHeader(name string) bool {
|
|
switch http.CanonicalHeaderKey(name) {
|
|
case "Host", "Connection", "Keep-Alive", "Transfer-Encoding",
|
|
"Te", "Trailer", "Upgrade", "Proxy-Authorization",
|
|
"Proxy-Connection", "Content-Length":
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
func truncate(s string, maxLen int) string {
|
|
if len(s) <= maxLen {
|
|
return s
|
|
}
|
|
return s[:maxLen]
|
|
}
|