package delivery import ( "bytes" "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "sync" "time" "go.uber.org/fx" "gorm.io/gorm" "sneak.berlin/go/webhooker/internal/database" "sneak.berlin/go/webhooker/internal/logger" ) const ( // deliveryChannelSize is the buffer size for the delivery channel. // New DeliveryTasks from the webhook handler are sent here. Workers // drain this channel. Sized large enough that the webhook handler // should never block under normal load. deliveryChannelSize = 10000 // retryChannelSize is the buffer size for the retry channel. // Timer-fired retries are sent here for processing by workers. retryChannelSize = 10000 // defaultWorkers is the number of worker goroutines in the delivery // engine pool. At most this many deliveries are in-flight at any // time, preventing goroutine explosions regardless of queue depth. defaultWorkers = 10 // retrySweepInterval is how often the periodic retry sweep runs. // The sweep scans all per-webhook databases for "orphaned" retrying // deliveries — ones whose in-memory timer was dropped because the // retry channel was full. This is the DB-mediated fallback path. retrySweepInterval = 60 * time.Second // MaxInlineBodySize is the maximum event body size that will be carried // inline in a DeliveryTask through the channel. Bodies at or above this // size are left nil and fetched from the per-webhook database on demand. // This keeps channel buffer memory bounded under high traffic. MaxInlineBodySize = 16 * 1024 // httpClientTimeout is the timeout for outbound HTTP requests. httpClientTimeout = 30 * time.Second // maxBodyLog is the maximum response body length to store in DeliveryResult. maxBodyLog = 4096 ) // DeliveryTask contains everything needed to deliver an event to a single // target. In the ≤16KB happy path, Body is non-nil and the engine delivers // without touching any database — it trusts that the webhook handler wrote // the records correctly. Only after a delivery attempt (success or failure) // does the engine write to the DB to record the result. // // When Body is nil (payload ≥ MaxInlineBodySize), the engine fetches the // body from the per-webhook database using EventID before delivering. type DeliveryTask struct { DeliveryID string // ID of the Delivery record (for recording results) EventID string // Event ID (for DB lookup if body is nil) WebhookID string // Webhook ID (for per-webhook DB access) // Target info (from main DB, included at notification time) TargetID string TargetName string TargetType database.TargetType TargetConfig string // JSON config (URL, headers, etc.) MaxRetries int // Event data (inline for ≤16KB bodies) Method string Headers string // JSON ContentType string Body *string // nil if body ≥ MaxInlineBodySize; fetch from DB by EventID // AttemptNum tracks the delivery attempt number. Set to 1 for the // initial delivery and incremented for each retry. This avoids a DB // query to count prior results in the hot path. AttemptNum int } // Notifier is the interface for notifying the delivery engine about new // deliveries. Implemented by Engine and injected into handlers. type Notifier interface { Notify(tasks []DeliveryTask) } // HTTPTargetConfig holds configuration for http and retry target types. type HTTPTargetConfig struct { URL string `json:"url"` Headers map[string]string `json:"headers,omitempty"` Timeout int `json:"timeout,omitempty"` // seconds, 0 = default } // EngineParams are the fx dependencies for the delivery engine. // //nolint:revive // EngineParams is a standard fx naming convention type EngineParams struct { fx.In DB *database.Database DBManager *database.WebhookDBManager Logger *logger.Logger } // Engine processes queued deliveries in the background using a bounded // worker pool architecture. New deliveries arrive as individual // DeliveryTask values via a buffered delivery channel from the webhook // handler. Failed deliveries that need retry are scheduled via Go timers // with exponential backoff; each timer fires into a separate retry // channel. A fixed number of worker goroutines drain both channels, // ensuring at most N deliveries are in-flight at any time (N = number // of workers). This prevents goroutine explosions when a circuit breaker // is open for a long period and many retries queue up. // // In the happy path (body ≤ 16KB), a worker delivers without reading // from any database — it only writes to record results. The database // stores delivery status for crash recovery only; on startup the engine // scans for interrupted deliveries and re-queues them into the channels. type Engine struct { database *database.Database dbManager *database.WebhookDBManager log *slog.Logger client *http.Client cancel context.CancelFunc wg sync.WaitGroup deliveryCh chan DeliveryTask retryCh chan DeliveryTask workers int // circuitBreakers stores a *CircuitBreaker per target ID. Only used // for retry targets — HTTP, database, and log targets do not need // circuit breakers because they either fire once or are local ops. circuitBreakers sync.Map } // New creates and registers the delivery engine with the fx lifecycle. func New(lc fx.Lifecycle, params EngineParams) *Engine { e := &Engine{ database: params.DB, dbManager: params.DBManager, log: params.Logger.Get(), client: &http.Client{ Timeout: httpClientTimeout, }, deliveryCh: make(chan DeliveryTask, deliveryChannelSize), retryCh: make(chan DeliveryTask, retryChannelSize), workers: defaultWorkers, } lc.Append(fx.Hook{ OnStart: func(_ context.Context) error { e.start() return nil }, OnStop: func(_ context.Context) error { e.stop() return nil }, }) return e } func (e *Engine) start() { ctx, cancel := context.WithCancel(context.Background()) e.cancel = cancel // Start the worker pool. These are the ONLY goroutines that // perform HTTP delivery. Bounded concurrency is guaranteed. for i := 0; i < e.workers; i++ { e.wg.Add(1) go e.worker(ctx) } // Start recovery scan in a separate goroutine. Recovered tasks // are sent into the delivery/retry channels and picked up by workers. e.wg.Add(1) go e.recoverPending(ctx) // Start the periodic retry sweep. This is the DB-mediated fallback // for retries whose timers were dropped due to channel overflow. e.wg.Add(1) go e.retrySweep(ctx) e.log.Info("delivery engine started", "workers", e.workers) } func (e *Engine) stop() { e.log.Info("delivery engine stopping") e.cancel() e.wg.Wait() e.log.Info("delivery engine stopped") } // Notify signals the delivery engine that new deliveries are ready. // Called by the webhook handler after creating delivery records. Each // DeliveryTask carries all data needed for delivery in the ≤16KB case. // Tasks are sent individually to the delivery channel. The call is // non-blocking; if the channel is full, a warning is logged and the // delivery will be recovered on the next engine restart. func (e *Engine) Notify(tasks []DeliveryTask) { for i := range tasks { select { case e.deliveryCh <- tasks[i]: default: e.log.Warn("delivery channel full, task will be recovered on restart", "delivery_id", tasks[i].DeliveryID, "event_id", tasks[i].EventID, ) } } } // worker is the main loop for a worker goroutine. It selects from both // the delivery channel (new tasks from the handler) and the retry channel // (tasks from backoff timers). At most e.workers deliveries are in-flight // at any time. func (e *Engine) worker(ctx context.Context) { defer e.wg.Done() for { select { case <-ctx.Done(): return case task := <-e.deliveryCh: e.processNewTask(ctx, &task) case task := <-e.retryCh: e.processRetryTask(ctx, &task) } } } // recoverPending runs on startup to recover any pending or retrying // deliveries that were interrupted by an unexpected shutdown. Recovered // tasks are sent into the delivery/retry channels for workers to pick up. func (e *Engine) recoverPending(ctx context.Context) { defer e.wg.Done() e.recoverInFlight(ctx) } // processNewTask handles a single new delivery task from the delivery // channel. It builds the event and target context from the task's inline // data and executes the delivery. For large bodies (≥ MaxInlineBodySize), // the body is fetched from the per-webhook database on demand. func (e *Engine) processNewTask(ctx context.Context, task *DeliveryTask) { webhookDB, err := e.dbManager.GetDB(task.WebhookID) if err != nil { e.log.Error("failed to get webhook database", "webhook_id", task.WebhookID, "error", err, ) return } // Build Event from task data event := database.Event{ Method: task.Method, Headers: task.Headers, ContentType: task.ContentType, } event.ID = task.EventID event.WebhookID = task.WebhookID if task.Body != nil { event.Body = *task.Body } else { // Large body: fetch from per-webhook DB var dbEvent database.Event if err := webhookDB.Select("body"). First(&dbEvent, "id = ?", task.EventID).Error; err != nil { e.log.Error("failed to fetch event body from database", "event_id", task.EventID, "error", err, ) return } event.Body = dbEvent.Body } // Build Target from task data (no main DB query needed) target := database.Target{ Name: task.TargetName, Type: task.TargetType, Config: task.TargetConfig, MaxRetries: task.MaxRetries, } target.ID = task.TargetID // Build Delivery struct for the processing chain d := &database.Delivery{ EventID: task.EventID, TargetID: task.TargetID, Status: database.DeliveryStatusPending, Event: event, Target: target, } d.ID = task.DeliveryID e.processDelivery(ctx, webhookDB, d, task) } // processRetryTask handles a single delivery task fired by a retry timer. // The task carries all data needed for delivery (same as the initial // notification). The only DB read is a status check to verify the delivery // hasn't been cancelled or resolved while the timer was pending. func (e *Engine) processRetryTask(ctx context.Context, task *DeliveryTask) { webhookDB, err := e.dbManager.GetDB(task.WebhookID) if err != nil { e.log.Error("failed to get webhook database for retry", "webhook_id", task.WebhookID, "delivery_id", task.DeliveryID, "error", err, ) return } // Verify delivery is still in retrying status (may have been // cancelled or manually resolved while the timer was pending) var d database.Delivery if err := webhookDB.Select("id", "status"). First(&d, "id = ?", task.DeliveryID).Error; err != nil { e.log.Error("failed to load delivery for retry", "delivery_id", task.DeliveryID, "error", err, ) return } if d.Status != database.DeliveryStatusRetrying { e.log.Debug("skipping retry for delivery no longer in retrying status", "delivery_id", d.ID, "status", d.Status, ) return } // Build Event from task data event := database.Event{ Method: task.Method, Headers: task.Headers, ContentType: task.ContentType, } event.ID = task.EventID event.WebhookID = task.WebhookID if task.Body != nil { event.Body = *task.Body } else { // Large body: fetch from per-webhook DB var dbEvent database.Event if err := webhookDB.Select("body"). First(&dbEvent, "id = ?", task.EventID).Error; err != nil { e.log.Error("failed to fetch event body for retry", "event_id", task.EventID, "error", err, ) return } event.Body = dbEvent.Body } // Build Target from task data target := database.Target{ Name: task.TargetName, Type: task.TargetType, Config: task.TargetConfig, MaxRetries: task.MaxRetries, } target.ID = task.TargetID // Populate the delivery with event and target for processing d.EventID = task.EventID d.TargetID = task.TargetID d.Event = event d.Target = target e.processDelivery(ctx, webhookDB, &d, task) } // recoverInFlight scans all webhooks on startup for deliveries that were // interrupted by an unexpected shutdown. Pending deliveries are sent to // the delivery channel; retrying deliveries get timers scheduled for // their remaining backoff period. func (e *Engine) recoverInFlight(ctx context.Context) { var webhookIDs []string if err := e.database.DB().Model(&database.Webhook{}).Pluck("id", &webhookIDs).Error; err != nil { e.log.Error("failed to query webhook IDs for recovery", "error", err) return } for _, webhookID := range webhookIDs { select { case <-ctx.Done(): return default: } if !e.dbManager.DBExists(webhookID) { continue } e.recoverWebhookDeliveries(ctx, webhookID) } } // recoverWebhookDeliveries recovers pending and retrying deliveries for // a single webhook. Pending deliveries are sent to the delivery channel; // retrying deliveries get timers scheduled for their remaining backoff. func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) { webhookDB, err := e.dbManager.GetDB(webhookID) if err != nil { e.log.Error("failed to get webhook database for recovery", "webhook_id", webhookID, "error", err, ) return } // Recover pending deliveries by sending them to the delivery channel e.recoverPendingDeliveries(ctx, webhookDB, webhookID) // Schedule timers for retrying deliveries based on remaining backoff var retrying []database.Delivery if err := webhookDB.Where("status = ?", database.DeliveryStatusRetrying). Find(&retrying).Error; err != nil { e.log.Error("failed to query retrying deliveries for recovery", "webhook_id", webhookID, "error", err, ) return } for i := range retrying { d := &retrying[i] var resultCount int64 webhookDB.Model(&database.DeliveryResult{}). Where("delivery_id = ?", d.ID). Count(&resultCount) attemptNum := int(resultCount) // Load event for this delivery var event database.Event if err := webhookDB.First(&event, "id = ?", d.EventID).Error; err != nil { e.log.Error("failed to load event for retrying delivery recovery", "delivery_id", d.ID, "event_id", d.EventID, "error", err, ) continue } // Load target from main DB var target database.Target if err := e.database.DB().First(&target, "id = ?", d.TargetID).Error; err != nil { e.log.Error("failed to load target for retrying delivery recovery", "delivery_id", d.ID, "target_id", d.TargetID, "error", err, ) continue } // Calculate remaining backoff from last attempt remaining := time.Duration(0) var lastResult database.DeliveryResult if err := webhookDB.Where("delivery_id = ?", d.ID). Order("created_at DESC"). First(&lastResult).Error; err == nil { shift := attemptNum - 1 if shift < 0 { shift = 0 } if shift > 30 { shift = 30 } backoff := time.Duration(1< 30 { shift = 30 } backoff := time.Duration(1<= 200 && statusCode < 300 errMsg := "" if err != nil { errMsg = err.Error() } e.recordResult(webhookDB, d, 1, success, statusCode, respBody, errMsg, duration) if success { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) } else { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } } func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) { cfg, err := e.parseHTTPConfig(d.Target.Config) if err != nil { e.log.Error("invalid retry target config", "target_id", d.TargetID, "error", err, ) e.recordResult(webhookDB, d, task.AttemptNum, false, 0, "", err.Error(), 0) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) return } // Check the circuit breaker for this target before attempting delivery. cb := e.getCircuitBreaker(task.TargetID) if !cb.Allow() { // Circuit is open — skip delivery, mark as retrying, and // schedule a retry for after the cooldown expires. remaining := cb.CooldownRemaining() e.log.Info("circuit breaker open, skipping delivery", "target_id", task.TargetID, "target_name", task.TargetName, "delivery_id", d.ID, "cooldown_remaining", remaining, ) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying) retryTask := *task // Don't increment AttemptNum — this wasn't a real attempt e.scheduleRetry(retryTask, remaining) return } attemptNum := task.AttemptNum // Attempt delivery immediately — backoff is handled by the timer // that triggered this call, not by polling. statusCode, respBody, duration, err := e.doHTTPRequest(cfg, &d.Event) success := err == nil && statusCode >= 200 && statusCode < 300 errMsg := "" if err != nil { errMsg = err.Error() } e.recordResult(webhookDB, d, attemptNum, success, statusCode, respBody, errMsg, duration) if success { cb.RecordSuccess() e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) return } // Delivery failed — record failure in circuit breaker cb.RecordFailure() maxRetries := d.Target.MaxRetries if maxRetries <= 0 { maxRetries = 5 // default } if attemptNum >= maxRetries { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } else { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying) // Schedule a timer for the next retry with exponential backoff. // The timer fires a DeliveryTask into the retry channel carrying // all data needed for the next attempt. shift := attemptNum - 1 if shift > 30 { shift = 30 } backoff := time.Duration(1< 0 { client = &http.Client{Timeout: time.Duration(cfg.Timeout) * time.Second} } resp, err := client.Do(req) durationMs = time.Since(start).Milliseconds() if err != nil { return 0, "", durationMs, fmt.Errorf("sending request: %w", err) } defer resp.Body.Close() body, readErr := io.ReadAll(io.LimitReader(resp.Body, maxBodyLog)) if readErr != nil { return resp.StatusCode, "", durationMs, fmt.Errorf("reading response body: %w", readErr) } return resp.StatusCode, string(body), durationMs, nil } func (e *Engine) recordResult(webhookDB *gorm.DB, d *database.Delivery, attemptNum int, success bool, statusCode int, respBody, errMsg string, durationMs int64) { result := &database.DeliveryResult{ DeliveryID: d.ID, AttemptNum: attemptNum, Success: success, StatusCode: statusCode, ResponseBody: truncate(respBody, maxBodyLog), Error: errMsg, Duration: durationMs, } if err := webhookDB.Create(result).Error; err != nil { e.log.Error("failed to record delivery result", "delivery_id", d.ID, "error", err, ) } } func (e *Engine) updateDeliveryStatus(webhookDB *gorm.DB, d *database.Delivery, status database.DeliveryStatus) { if err := webhookDB.Model(d).Update("status", status).Error; err != nil { e.log.Error("failed to update delivery status", "delivery_id", d.ID, "status", status, "error", err, ) } } func (e *Engine) parseHTTPConfig(configJSON string) (*HTTPTargetConfig, error) { if configJSON == "" { return nil, fmt.Errorf("empty target config") } var cfg HTTPTargetConfig if err := json.Unmarshal([]byte(configJSON), &cfg); err != nil { return nil, fmt.Errorf("parsing config JSON: %w", err) } if cfg.URL == "" { return nil, fmt.Errorf("target URL is required") } return &cfg, nil } // isForwardableHeader returns true if the header should be forwarded to targets. // Hop-by-hop headers and internal headers are excluded. func isForwardableHeader(name string) bool { switch http.CanonicalHeaderKey(name) { case "Host", "Connection", "Keep-Alive", "Transfer-Encoding", "Te", "Trailer", "Upgrade", "Proxy-Authorization", "Proxy-Connection", "Content-Length": return false default: return true } } func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen] }