package delivery import ( "bytes" "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "sync" "time" "go.uber.org/fx" "gorm.io/gorm" "sneak.berlin/go/webhooker/internal/database" "sneak.berlin/go/webhooker/internal/logger" ) const ( // notifyChannelSize is the buffer size for the delivery notification channel. // Sized large enough that the webhook handler should never block. notifyChannelSize = 1000 // retryChannelSize is the buffer size for the retry channel. Timer-fired // retries are sent here for processing by the engine goroutine. retryChannelSize = 1000 // MaxInlineBodySize is the maximum event body size that will be carried // inline in a DeliveryTask through the channel. Bodies at or above this // size are left nil and fetched from the per-webhook database on demand. // This keeps channel buffer memory bounded under high traffic. MaxInlineBodySize = 16 * 1024 // httpClientTimeout is the timeout for outbound HTTP requests. httpClientTimeout = 30 * time.Second // maxBodyLog is the maximum response body length to store in DeliveryResult. maxBodyLog = 4096 ) // DeliveryTask contains everything needed to deliver an event to a single // target. In the ≤16KB happy path, Body is non-nil and the engine delivers // without touching any database — it trusts that the webhook handler wrote // the records correctly. Only after a delivery attempt (success or failure) // does the engine write to the DB to record the result. // // When Body is nil (payload ≥ MaxInlineBodySize), the engine fetches the // body from the per-webhook database using EventID before delivering. type DeliveryTask struct { DeliveryID string // ID of the Delivery record (for recording results) EventID string // Event ID (for DB lookup if body is nil) WebhookID string // Webhook ID (for per-webhook DB access) // Target info (from main DB, included at notification time) TargetID string TargetName string TargetType database.TargetType TargetConfig string // JSON config (URL, headers, etc.) MaxRetries int // Event data (inline for ≤16KB bodies) Method string Headers string // JSON ContentType string Body *string // nil if body ≥ MaxInlineBodySize; fetch from DB by EventID // AttemptNum tracks the delivery attempt number. Set to 1 for the // initial delivery and incremented for each retry. This avoids a DB // query to count prior results in the hot path. AttemptNum int } // Notifier is the interface for notifying the delivery engine about new // deliveries. Implemented by Engine and injected into handlers. type Notifier interface { Notify(tasks []DeliveryTask) } // HTTPTargetConfig holds configuration for http and retry target types. type HTTPTargetConfig struct { URL string `json:"url"` Headers map[string]string `json:"headers,omitempty"` Timeout int `json:"timeout,omitempty"` // seconds, 0 = default } // EngineParams are the fx dependencies for the delivery engine. // //nolint:revive // EngineParams is a standard fx naming convention type EngineParams struct { fx.In DB *database.Database DBManager *database.WebhookDBManager Logger *logger.Logger } // Engine processes queued deliveries in the background using an // event-driven architecture. New deliveries arrive as self-contained // DeliveryTask slices via a buffered channel from the webhook handler. // In the happy path (body ≤ 16KB), the engine delivers without reading // from any database — it only writes to record results. Failed deliveries // that need retry are scheduled via Go timers with exponential backoff; // each timer fires into a separate retry channel carrying the full // DeliveryTask so retries also avoid unnecessary DB reads. The database // stores delivery status for crash recovery only; on startup the engine // scans for interrupted deliveries and re-queues them. // // All targets for a single event are delivered in parallel — each // DeliveryTask is dispatched in its own goroutine for maximum fan-out // speed. Retry targets are protected by a per-target circuit breaker // that stops hammering a down target after consecutive failures. type Engine struct { database *database.Database dbManager *database.WebhookDBManager log *slog.Logger client *http.Client cancel context.CancelFunc wg sync.WaitGroup notifyCh chan []DeliveryTask retryCh chan DeliveryTask // circuitBreakers stores a *CircuitBreaker per target ID. Only used // for retry targets — HTTP, database, and log targets do not need // circuit breakers because they either fire once or are local ops. circuitBreakers sync.Map } // New creates and registers the delivery engine with the fx lifecycle. func New(lc fx.Lifecycle, params EngineParams) *Engine { e := &Engine{ database: params.DB, dbManager: params.DBManager, log: params.Logger.Get(), client: &http.Client{ Timeout: httpClientTimeout, }, notifyCh: make(chan []DeliveryTask, notifyChannelSize), retryCh: make(chan DeliveryTask, retryChannelSize), } lc.Append(fx.Hook{ OnStart: func(_ context.Context) error { e.start() return nil }, OnStop: func(_ context.Context) error { e.stop() return nil }, }) return e } func (e *Engine) start() { ctx, cancel := context.WithCancel(context.Background()) e.cancel = cancel e.wg.Add(1) go e.run(ctx) e.log.Info("delivery engine started") } func (e *Engine) stop() { e.log.Info("delivery engine stopping") e.cancel() e.wg.Wait() e.log.Info("delivery engine stopped") } // Notify signals the delivery engine that new deliveries are ready. // Called by the webhook handler after creating delivery records. Each // DeliveryTask carries all data needed for delivery in the ≤16KB case. // The call is non-blocking; if the channel is full, a warning is logged // and the deliveries will be recovered on the next engine restart. func (e *Engine) Notify(tasks []DeliveryTask) { select { case e.notifyCh <- tasks: default: e.log.Warn("delivery notification channel full, deliveries will be recovered on restart", "task_count", len(tasks), ) } } func (e *Engine) run(ctx context.Context) { defer e.wg.Done() // On startup, recover any pending or retrying deliveries that were // interrupted by an unexpected shutdown. Pending deliveries are // processed immediately; retrying deliveries get timers scheduled // for their remaining backoff. e.recoverInFlight(ctx) for { select { case <-ctx.Done(): return case tasks := <-e.notifyCh: e.processDeliveryTasks(ctx, tasks) case task := <-e.retryCh: e.processRetryTask(ctx, task) } } } // recoverInFlight scans all webhooks on startup for deliveries that were // interrupted by an unexpected shutdown. Pending deliveries are processed // immediately; retrying deliveries get timers scheduled for their // remaining backoff period. func (e *Engine) recoverInFlight(ctx context.Context) { var webhookIDs []string if err := e.database.DB().Model(&database.Webhook{}).Pluck("id", &webhookIDs).Error; err != nil { e.log.Error("failed to query webhook IDs for recovery", "error", err) return } for _, webhookID := range webhookIDs { select { case <-ctx.Done(): return default: } if !e.dbManager.DBExists(webhookID) { continue } e.recoverWebhookDeliveries(ctx, webhookID) } } // recoverWebhookDeliveries recovers pending and retrying deliveries for // a single webhook. This is the recovery path — it reads everything from // the database since there are no in-memory notifications available after // a restart. func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) { webhookDB, err := e.dbManager.GetDB(webhookID) if err != nil { e.log.Error("failed to get webhook database for recovery", "webhook_id", webhookID, "error", err, ) return } // Check for pending deliveries and process them immediately var pendingCount int64 webhookDB.Model(&database.Delivery{}). Where("status = ?", database.DeliveryStatusPending). Count(&pendingCount) if pendingCount > 0 { e.log.Info("recovering pending deliveries", "webhook_id", webhookID, "count", pendingCount, ) e.processWebhookPendingDeliveries(ctx, webhookID) } // Schedule timers for retrying deliveries based on remaining backoff var retrying []database.Delivery if err := webhookDB.Where("status = ?", database.DeliveryStatusRetrying). Find(&retrying).Error; err != nil { e.log.Error("failed to query retrying deliveries for recovery", "webhook_id", webhookID, "error", err, ) return } for i := range retrying { d := &retrying[i] var resultCount int64 webhookDB.Model(&database.DeliveryResult{}). Where("delivery_id = ?", d.ID). Count(&resultCount) attemptNum := int(resultCount) // Load event for this delivery var event database.Event if err := webhookDB.First(&event, "id = ?", d.EventID).Error; err != nil { e.log.Error("failed to load event for retrying delivery recovery", "delivery_id", d.ID, "event_id", d.EventID, "error", err, ) continue } // Load target from main DB var target database.Target if err := e.database.DB().First(&target, "id = ?", d.TargetID).Error; err != nil { e.log.Error("failed to load target for retrying delivery recovery", "delivery_id", d.ID, "target_id", d.TargetID, "error", err, ) continue } // Calculate remaining backoff from last attempt remaining := time.Duration(0) var lastResult database.DeliveryResult if err := webhookDB.Where("delivery_id = ?", d.ID). Order("created_at DESC"). First(&lastResult).Error; err == nil { shift := attemptNum - 1 if shift < 0 { shift = 0 } if shift > 30 { shift = 30 } backoff := time.Duration(1< 16KB), the body is // fetched once and shared across all goroutines in the batch. func (e *Engine) processDeliveryTasks(ctx context.Context, tasks []DeliveryTask) { if len(tasks) == 0 { return } // All tasks in a batch share the same webhook ID webhookID := tasks[0].WebhookID webhookDB, err := e.dbManager.GetDB(webhookID) if err != nil { e.log.Error("failed to get webhook database", "webhook_id", webhookID, "error", err, ) return } // For the large-body case, pre-fetch the event body once before // fanning out so all goroutines share the same data. var fetchedBody *string if tasks[0].Body == nil { var dbEvent database.Event if err := webhookDB.Select("body"). First(&dbEvent, "id = ?", tasks[0].EventID).Error; err != nil { e.log.Error("failed to fetch event body from database", "event_id", tasks[0].EventID, "error", err, ) return } fetchedBody = &dbEvent.Body } // Fan out: spin up a goroutine per task for parallel delivery. // Each goroutine is independent (fire-and-forget) and records its // own result. No need to wait for all goroutines to finish. for i := range tasks { select { case <-ctx.Done(): return default: } task := tasks[i] // copy for goroutine closure safety go func() { e.deliverTask(ctx, webhookDB, &task, fetchedBody) }() } } // deliverTask prepares and executes a single delivery task. Called from // a dedicated goroutine for parallel fan-out. func (e *Engine) deliverTask(ctx context.Context, webhookDB *gorm.DB, task *DeliveryTask, fetchedBody *string) { // Build Event from task data event := database.Event{ Method: task.Method, Headers: task.Headers, ContentType: task.ContentType, } event.ID = task.EventID event.WebhookID = task.WebhookID switch { case task.Body != nil: event.Body = *task.Body case fetchedBody != nil: event.Body = *fetchedBody default: e.log.Error("no body available for delivery task", "delivery_id", task.DeliveryID, "event_id", task.EventID, ) return } // Build Target from task data (no main DB query needed) target := database.Target{ Name: task.TargetName, Type: task.TargetType, Config: task.TargetConfig, MaxRetries: task.MaxRetries, } target.ID = task.TargetID // Build Delivery struct for the processing chain d := &database.Delivery{ EventID: task.EventID, TargetID: task.TargetID, Status: database.DeliveryStatusPending, Event: event, Target: target, } d.ID = task.DeliveryID e.processDelivery(ctx, webhookDB, d, task) } // processRetryTask handles a single delivery task fired by a retry timer. // The task carries all data needed for delivery (same as the initial // notification). The only DB read is a status check to verify the delivery // hasn't been cancelled or resolved while the timer was pending. func (e *Engine) processRetryTask(ctx context.Context, task DeliveryTask) { webhookDB, err := e.dbManager.GetDB(task.WebhookID) if err != nil { e.log.Error("failed to get webhook database for retry", "webhook_id", task.WebhookID, "delivery_id", task.DeliveryID, "error", err, ) return } // Verify delivery is still in retrying status (may have been // cancelled or manually resolved while the timer was pending) var d database.Delivery if err := webhookDB.Select("id", "status"). First(&d, "id = ?", task.DeliveryID).Error; err != nil { e.log.Error("failed to load delivery for retry", "delivery_id", task.DeliveryID, "error", err, ) return } if d.Status != database.DeliveryStatusRetrying { e.log.Debug("skipping retry for delivery no longer in retrying status", "delivery_id", d.ID, "status", d.Status, ) return } // Build Event from task data event := database.Event{ Method: task.Method, Headers: task.Headers, ContentType: task.ContentType, } event.ID = task.EventID event.WebhookID = task.WebhookID if task.Body != nil { event.Body = *task.Body } else { // Large body: fetch from per-webhook DB var dbEvent database.Event if err := webhookDB.Select("body"). First(&dbEvent, "id = ?", task.EventID).Error; err != nil { e.log.Error("failed to fetch event body for retry", "event_id", task.EventID, "error", err, ) return } event.Body = dbEvent.Body } // Build Target from task data target := database.Target{ Name: task.TargetName, Type: task.TargetType, Config: task.TargetConfig, MaxRetries: task.MaxRetries, } target.ID = task.TargetID // Populate the delivery with event and target for processing d.EventID = task.EventID d.TargetID = task.TargetID d.Event = event d.Target = target e.processDelivery(ctx, webhookDB, &d, &task) } // processWebhookPendingDeliveries queries a single webhook's database for // all pending deliveries and processes them. Used for crash recovery where // we don't have in-memory notifications — everything is loaded from the DB. func (e *Engine) processWebhookPendingDeliveries(ctx context.Context, webhookID string) { webhookDB, err := e.dbManager.GetDB(webhookID) if err != nil { e.log.Error("failed to get webhook database", "webhook_id", webhookID, "error", err, ) return } var deliveries []database.Delivery result := webhookDB. Where("status = ?", database.DeliveryStatusPending). Preload("Event"). Find(&deliveries) if result.Error != nil { e.log.Error("failed to query pending deliveries", "webhook_id", webhookID, "error", result.Error, ) return } if len(deliveries) == 0 { return } // Collect unique target IDs and load targets from the main DB seen := make(map[string]bool) targetIDs := make([]string, 0, len(deliveries)) for _, d := range deliveries { if !seen[d.TargetID] { targetIDs = append(targetIDs, d.TargetID) seen[d.TargetID] = true } } var targets []database.Target if err := e.database.DB().Where("id IN ?", targetIDs).Find(&targets).Error; err != nil { e.log.Error("failed to load targets from main DB", "error", err) return } targetMap := make(map[string]database.Target, len(targets)) for _, t := range targets { targetMap[t.ID] = t } // Fan out recovered deliveries in parallel — same as the normal // delivery path, each task gets its own goroutine. for i := range deliveries { select { case <-ctx.Done(): return default: } target, ok := targetMap[deliveries[i].TargetID] if !ok { e.log.Error("target not found for delivery", "delivery_id", deliveries[i].ID, "target_id", deliveries[i].TargetID, ) continue } deliveries[i].Target = target // Build task from DB data for the recovery path bodyStr := deliveries[i].Event.Body task := DeliveryTask{ DeliveryID: deliveries[i].ID, EventID: deliveries[i].EventID, WebhookID: webhookID, TargetID: target.ID, TargetName: target.Name, TargetType: target.Type, TargetConfig: target.Config, MaxRetries: target.MaxRetries, Method: deliveries[i].Event.Method, Headers: deliveries[i].Event.Headers, ContentType: deliveries[i].Event.ContentType, Body: &bodyStr, AttemptNum: 1, } d := deliveries[i] // copy for goroutine closure safety go func() { e.processDelivery(ctx, webhookDB, &d, &task) }() } } // scheduleRetry creates a Go timer that fires after the given delay and // sends the full DeliveryTask to the engine's retry channel. The task // carries all data needed for the retry attempt, so when it fires, the // engine can deliver without reading event or target data from the DB. func (e *Engine) scheduleRetry(task DeliveryTask, delay time.Duration) { e.log.Debug("scheduling delivery retry", "webhook_id", task.WebhookID, "delivery_id", task.DeliveryID, "delay", delay, "next_attempt", task.AttemptNum, ) time.AfterFunc(delay, func() { select { case e.retryCh <- task: default: e.log.Warn("retry channel full, delivery will be recovered on restart", "delivery_id", task.DeliveryID, ) } }) } func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) { switch d.Target.Type { case database.TargetTypeHTTP: e.deliverHTTP(ctx, webhookDB, d) case database.TargetTypeRetry: e.deliverRetry(ctx, webhookDB, d, task) case database.TargetTypeDatabase: e.deliverDatabase(webhookDB, d) case database.TargetTypeLog: e.deliverLog(webhookDB, d) default: e.log.Error("unknown target type", "target_id", d.TargetID, "type", d.Target.Type, ) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } } func (e *Engine) deliverHTTP(_ context.Context, webhookDB *gorm.DB, d *database.Delivery) { cfg, err := e.parseHTTPConfig(d.Target.Config) if err != nil { e.log.Error("invalid HTTP target config", "target_id", d.TargetID, "error", err, ) e.recordResult(webhookDB, d, 1, false, 0, "", err.Error(), 0) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) return } statusCode, respBody, duration, err := e.doHTTPRequest(cfg, &d.Event) success := err == nil && statusCode >= 200 && statusCode < 300 errMsg := "" if err != nil { errMsg = err.Error() } e.recordResult(webhookDB, d, 1, success, statusCode, respBody, errMsg, duration) if success { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) } else { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } } func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database.Delivery, task *DeliveryTask) { cfg, err := e.parseHTTPConfig(d.Target.Config) if err != nil { e.log.Error("invalid retry target config", "target_id", d.TargetID, "error", err, ) e.recordResult(webhookDB, d, task.AttemptNum, false, 0, "", err.Error(), 0) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) return } // Check the circuit breaker for this target before attempting delivery. cb := e.getCircuitBreaker(task.TargetID) if !cb.Allow() { // Circuit is open — skip delivery, mark as retrying, and // schedule a retry for after the cooldown expires. remaining := cb.CooldownRemaining() e.log.Info("circuit breaker open, skipping delivery", "target_id", task.TargetID, "target_name", task.TargetName, "delivery_id", d.ID, "cooldown_remaining", remaining, ) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying) retryTask := *task // Don't increment AttemptNum — this wasn't a real attempt e.scheduleRetry(retryTask, remaining) return } attemptNum := task.AttemptNum // Attempt delivery immediately — backoff is handled by the timer // that triggered this call, not by polling. statusCode, respBody, duration, err := e.doHTTPRequest(cfg, &d.Event) success := err == nil && statusCode >= 200 && statusCode < 300 errMsg := "" if err != nil { errMsg = err.Error() } e.recordResult(webhookDB, d, attemptNum, success, statusCode, respBody, errMsg, duration) if success { cb.RecordSuccess() e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) return } // Delivery failed — record failure in circuit breaker cb.RecordFailure() maxRetries := d.Target.MaxRetries if maxRetries <= 0 { maxRetries = 5 // default } if attemptNum >= maxRetries { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } else { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying) // Schedule a timer for the next retry with exponential backoff. // The timer fires a DeliveryTask into the retry channel carrying // all data needed for the next attempt. shift := attemptNum - 1 if shift > 30 { shift = 30 } backoff := time.Duration(1< 0 { client = &http.Client{Timeout: time.Duration(cfg.Timeout) * time.Second} } resp, err := client.Do(req) durationMs = time.Since(start).Milliseconds() if err != nil { return 0, "", durationMs, fmt.Errorf("sending request: %w", err) } defer resp.Body.Close() body, readErr := io.ReadAll(io.LimitReader(resp.Body, maxBodyLog)) if readErr != nil { return resp.StatusCode, "", durationMs, fmt.Errorf("reading response body: %w", readErr) } return resp.StatusCode, string(body), durationMs, nil } func (e *Engine) recordResult(webhookDB *gorm.DB, d *database.Delivery, attemptNum int, success bool, statusCode int, respBody, errMsg string, durationMs int64) { result := &database.DeliveryResult{ DeliveryID: d.ID, AttemptNum: attemptNum, Success: success, StatusCode: statusCode, ResponseBody: truncate(respBody, maxBodyLog), Error: errMsg, Duration: durationMs, } if err := webhookDB.Create(result).Error; err != nil { e.log.Error("failed to record delivery result", "delivery_id", d.ID, "error", err, ) } } func (e *Engine) updateDeliveryStatus(webhookDB *gorm.DB, d *database.Delivery, status database.DeliveryStatus) { if err := webhookDB.Model(d).Update("status", status).Error; err != nil { e.log.Error("failed to update delivery status", "delivery_id", d.ID, "status", status, "error", err, ) } } func (e *Engine) parseHTTPConfig(configJSON string) (*HTTPTargetConfig, error) { if configJSON == "" { return nil, fmt.Errorf("empty target config") } var cfg HTTPTargetConfig if err := json.Unmarshal([]byte(configJSON), &cfg); err != nil { return nil, fmt.Errorf("parsing config JSON: %w", err) } if cfg.URL == "" { return nil, fmt.Errorf("target URL is required") } return &cfg, nil } // isForwardableHeader returns true if the header should be forwarded to targets. // Hop-by-hop headers and internal headers are excluded. func isForwardableHeader(name string) bool { switch http.CanonicalHeaderKey(name) { case "Host", "Connection", "Keep-Alive", "Transfer-Encoding", "Te", "Trailer", "Upgrade", "Proxy-Authorization", "Proxy-Connection", "Content-Length": return false default: return true } } func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen] }