package delivery import ( "bytes" "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "sync" "time" "go.uber.org/fx" "gorm.io/gorm" "sneak.berlin/go/webhooker/internal/database" "sneak.berlin/go/webhooker/internal/logger" ) const ( // notifyChannelSize is the buffer size for the delivery notification channel. // Sized large enough that the webhook handler should never block. notifyChannelSize = 1000 // retryChannelSize is the buffer size for the retry channel. Timer-fired // retries are sent here for processing by the engine goroutine. retryChannelSize = 1000 // MaxInlineBodySize is the maximum event body size that will be carried // inline in a Notification through the channel. Bodies at or above this // size are left nil and fetched from the per-webhook database on demand. // This keeps channel buffer memory bounded under high traffic. MaxInlineBodySize = 16 * 1024 // httpClientTimeout is the timeout for outbound HTTP requests. httpClientTimeout = 30 * time.Second // maxBodyLog is the maximum response body length to store in DeliveryResult. maxBodyLog = 4096 ) // Notification carries event data through the delivery notification channel. // The Body field is a pointer: non-nil for payloads under MaxInlineBodySize // (16 KB), nil for larger payloads. When nil, the engine fetches the body // from the per-webhook database using EventID. This keeps channel buffer // memory bounded regardless of payload sizes during high traffic. type Notification struct { WebhookID string EventID string Method string Headers string ContentType string Body *string // nil if body >= MaxInlineBodySize; fetch from DB by EventID } // Notifier is the interface for notifying the delivery engine about new // deliveries. Implemented by Engine and injected into handlers. type Notifier interface { Notify(n Notification) } // retryRequest carries the information needed to retry a specific delivery. // Sent from timer goroutines to the engine's retry channel. type retryRequest struct { webhookID string deliveryID string } // HTTPTargetConfig holds configuration for http and retry target types. type HTTPTargetConfig struct { URL string `json:"url"` Headers map[string]string `json:"headers,omitempty"` Timeout int `json:"timeout,omitempty"` // seconds, 0 = default } // EngineParams are the fx dependencies for the delivery engine. // //nolint:revive // EngineParams is a standard fx naming convention type EngineParams struct { fx.In DB *database.Database DBManager *database.WebhookDBManager Logger *logger.Logger } // Engine processes queued deliveries in the background using an // event-driven architecture. New deliveries are signaled via a buffered // channel from the webhook handler and processed immediately. Failed // deliveries that need retry are scheduled via Go timers with exponential // backoff — each timer fires into a separate retry channel when the // backoff period expires. The database stores delivery status for crash // recovery only; on startup the engine scans for interrupted deliveries // and re-queues them. type Engine struct { database *database.Database dbManager *database.WebhookDBManager log *slog.Logger client *http.Client cancel context.CancelFunc wg sync.WaitGroup notifyCh chan Notification retryCh chan retryRequest } // New creates and registers the delivery engine with the fx lifecycle. func New(lc fx.Lifecycle, params EngineParams) *Engine { e := &Engine{ database: params.DB, dbManager: params.DBManager, log: params.Logger.Get(), client: &http.Client{ Timeout: httpClientTimeout, }, notifyCh: make(chan Notification, notifyChannelSize), retryCh: make(chan retryRequest, retryChannelSize), } lc.Append(fx.Hook{ OnStart: func(_ context.Context) error { e.start() return nil }, OnStop: func(_ context.Context) error { e.stop() return nil }, }) return e } func (e *Engine) start() { ctx, cancel := context.WithCancel(context.Background()) e.cancel = cancel e.wg.Add(1) go e.run(ctx) e.log.Info("delivery engine started") } func (e *Engine) stop() { e.log.Info("delivery engine stopping") e.cancel() e.wg.Wait() e.log.Info("delivery engine stopped") } // Notify signals the delivery engine that new deliveries are available. // This is called by the webhook handler after creating delivery records. // The notification carries the event data inline (with body pointer // semantics for memory efficiency). The call is non-blocking; if the // channel is full, a warning is logged and the deliveries will be // recovered on the next engine restart. func (e *Engine) Notify(n Notification) { select { case e.notifyCh <- n: default: e.log.Warn("delivery notification channel full, deliveries will be recovered on restart", "webhook_id", n.WebhookID, "event_id", n.EventID, ) } } func (e *Engine) run(ctx context.Context) { defer e.wg.Done() // On startup, recover any pending or retrying deliveries that were // interrupted by an unexpected shutdown. Pending deliveries are // processed immediately; retrying deliveries get timers scheduled // for their remaining backoff. e.recoverInFlight(ctx) for { select { case <-ctx.Done(): return case n := <-e.notifyCh: e.processNotification(ctx, n) case req := <-e.retryCh: e.processRetryDelivery(ctx, req) } } } // recoverInFlight scans all webhooks on startup for deliveries that were // interrupted by an unexpected shutdown. Pending deliveries are processed // immediately; retrying deliveries get timers scheduled for their // remaining backoff period. func (e *Engine) recoverInFlight(ctx context.Context) { var webhookIDs []string if err := e.database.DB().Model(&database.Webhook{}).Pluck("id", &webhookIDs).Error; err != nil { e.log.Error("failed to query webhook IDs for recovery", "error", err) return } for _, webhookID := range webhookIDs { select { case <-ctx.Done(): return default: } if !e.dbManager.DBExists(webhookID) { continue } e.recoverWebhookDeliveries(ctx, webhookID) } } // recoverWebhookDeliveries recovers pending and retrying deliveries for // a single webhook. Pending deliveries are processed directly (loading // event data from DB); retrying deliveries get timers scheduled based on // the elapsed time since the last attempt. func (e *Engine) recoverWebhookDeliveries(ctx context.Context, webhookID string) { webhookDB, err := e.dbManager.GetDB(webhookID) if err != nil { e.log.Error("failed to get webhook database for recovery", "webhook_id", webhookID, "error", err, ) return } // Check for pending deliveries and process them immediately var pendingCount int64 webhookDB.Model(&database.Delivery{}). Where("status = ?", database.DeliveryStatusPending). Count(&pendingCount) if pendingCount > 0 { e.log.Info("recovering pending deliveries", "webhook_id", webhookID, "count", pendingCount, ) e.processWebhookPendingDeliveries(ctx, webhookID) } // Schedule timers for retrying deliveries based on remaining backoff var retrying []database.Delivery if err := webhookDB.Where("status = ?", database.DeliveryStatusRetrying). Find(&retrying).Error; err != nil { e.log.Error("failed to query retrying deliveries for recovery", "webhook_id", webhookID, "error", err, ) return } for i := range retrying { d := &retrying[i] var resultCount int64 webhookDB.Model(&database.DeliveryResult{}). Where("delivery_id = ?", d.ID). Count(&resultCount) attemptNum := int(resultCount) // Calculate remaining backoff from last attempt remaining := time.Duration(0) var lastResult database.DeliveryResult if err := webhookDB.Where("delivery_id = ?", d.ID). Order("created_at DESC"). First(&lastResult).Error; err == nil { shift := attemptNum - 1 if shift < 0 { shift = 0 } if shift > 30 { shift = 30 } backoff := time.Duration(1<= 200 && statusCode < 300 errMsg := "" if err != nil { errMsg = err.Error() } e.recordResult(webhookDB, d, 1, success, statusCode, respBody, errMsg, duration) if success { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) } else { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } } func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database.Delivery) { cfg, err := e.parseHTTPConfig(d.Target.Config) if err != nil { e.log.Error("invalid retry target config", "target_id", d.TargetID, "error", err, ) e.recordResult(webhookDB, d, 1, false, 0, "", err.Error(), 0) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) return } // Determine attempt number from existing results (in per-webhook DB) var resultCount int64 webhookDB.Model(&database.DeliveryResult{}).Where("delivery_id = ?", d.ID).Count(&resultCount) attemptNum := int(resultCount) + 1 // Attempt delivery immediately — backoff is handled by the timer // that triggered this call, not by polling. statusCode, respBody, duration, err := e.doHTTPRequest(cfg, &d.Event) success := err == nil && statusCode >= 200 && statusCode < 300 errMsg := "" if err != nil { errMsg = err.Error() } e.recordResult(webhookDB, d, attemptNum, success, statusCode, respBody, errMsg, duration) if success { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) return } maxRetries := d.Target.MaxRetries if maxRetries <= 0 { maxRetries = 5 // default } if attemptNum >= maxRetries { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } else { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying) // Schedule a timer for the next retry with exponential backoff. // The timer will fire and send a retryRequest to the engine's // retry channel, which triggers processRetryDelivery. shift := attemptNum - 1 if shift > 30 { shift = 30 } backoff := time.Duration(1< 0 { client = &http.Client{Timeout: time.Duration(cfg.Timeout) * time.Second} } resp, err := client.Do(req) durationMs = time.Since(start).Milliseconds() if err != nil { return 0, "", durationMs, fmt.Errorf("sending request: %w", err) } defer resp.Body.Close() body, readErr := io.ReadAll(io.LimitReader(resp.Body, maxBodyLog)) if readErr != nil { return resp.StatusCode, "", durationMs, fmt.Errorf("reading response body: %w", readErr) } return resp.StatusCode, string(body), durationMs, nil } func (e *Engine) recordResult(webhookDB *gorm.DB, d *database.Delivery, attemptNum int, success bool, statusCode int, respBody, errMsg string, durationMs int64) { result := &database.DeliveryResult{ DeliveryID: d.ID, AttemptNum: attemptNum, Success: success, StatusCode: statusCode, ResponseBody: truncate(respBody, maxBodyLog), Error: errMsg, Duration: durationMs, } if err := webhookDB.Create(result).Error; err != nil { e.log.Error("failed to record delivery result", "delivery_id", d.ID, "error", err, ) } } func (e *Engine) updateDeliveryStatus(webhookDB *gorm.DB, d *database.Delivery, status database.DeliveryStatus) { if err := webhookDB.Model(d).Update("status", status).Error; err != nil { e.log.Error("failed to update delivery status", "delivery_id", d.ID, "status", status, "error", err, ) } } func (e *Engine) parseHTTPConfig(configJSON string) (*HTTPTargetConfig, error) { if configJSON == "" { return nil, fmt.Errorf("empty target config") } var cfg HTTPTargetConfig if err := json.Unmarshal([]byte(configJSON), &cfg); err != nil { return nil, fmt.Errorf("parsing config JSON: %w", err) } if cfg.URL == "" { return nil, fmt.Errorf("target URL is required") } return &cfg, nil } // isForwardableHeader returns true if the header should be forwarded to targets. // Hop-by-hop headers and internal headers are excluded. func isForwardableHeader(name string) bool { switch http.CanonicalHeaderKey(name) { case "Host", "Connection", "Keep-Alive", "Transfer-Encoding", "Te", "Trailer", "Upgrade", "Proxy-Authorization", "Proxy-Connection", "Content-Length": return false default: return true } } func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen] }