package delivery import ( "bytes" "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "sync" "time" "go.uber.org/fx" "gorm.io/gorm" "sneak.berlin/go/webhooker/internal/database" "sneak.berlin/go/webhooker/internal/logger" ) const ( // pollInterval is how often the engine checks for pending deliveries. pollInterval = 2 * time.Second // httpClientTimeout is the timeout for outbound HTTP requests. httpClientTimeout = 30 * time.Second // maxBodyLog is the maximum response body length to store in DeliveryResult. maxBodyLog = 4096 ) // HTTPTargetConfig holds configuration for http and retry target types. type HTTPTargetConfig struct { URL string `json:"url"` Headers map[string]string `json:"headers,omitempty"` Timeout int `json:"timeout,omitempty"` // seconds, 0 = default } // EngineParams are the fx dependencies for the delivery engine. // //nolint:revive // EngineParams is a standard fx naming convention type EngineParams struct { fx.In DB *database.Database DBManager *database.WebhookDBManager Logger *logger.Logger } // Engine processes queued deliveries in the background. // It iterates over all active webhooks and polls each webhook's // per-webhook database for pending deliveries. type Engine struct { database *database.Database dbManager *database.WebhookDBManager log *slog.Logger client *http.Client cancel context.CancelFunc wg sync.WaitGroup } // New creates and registers the delivery engine with the fx lifecycle. func New(lc fx.Lifecycle, params EngineParams) *Engine { e := &Engine{ database: params.DB, dbManager: params.DBManager, log: params.Logger.Get(), client: &http.Client{ Timeout: httpClientTimeout, }, } lc.Append(fx.Hook{ OnStart: func(_ context.Context) error { e.start() return nil }, OnStop: func(_ context.Context) error { e.stop() return nil }, }) return e } func (e *Engine) start() { ctx, cancel := context.WithCancel(context.Background()) e.cancel = cancel e.wg.Add(1) go e.run(ctx) e.log.Info("delivery engine started") } func (e *Engine) stop() { e.log.Info("delivery engine stopping") e.cancel() e.wg.Wait() e.log.Info("delivery engine stopped") } func (e *Engine) run(ctx context.Context) { defer e.wg.Done() ticker := time.NewTicker(pollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: e.processPending(ctx) } } } // processPending iterates over all active webhooks and processes pending // deliveries from each webhook's per-webhook database. func (e *Engine) processPending(ctx context.Context) { // Get all active webhook IDs from the main application database var webhookIDs []string if err := e.database.DB().Model(&database.Webhook{}).Pluck("id", &webhookIDs).Error; err != nil { e.log.Error("failed to query webhook IDs", "error", err) return } for _, webhookID := range webhookIDs { select { case <-ctx.Done(): return default: // Only process webhooks that have an event database file if !e.dbManager.DBExists(webhookID) { continue } e.processWebhookDeliveries(ctx, webhookID) } } } // processWebhookDeliveries polls a single webhook's database for pending // deliveries and processes them. func (e *Engine) processWebhookDeliveries(ctx context.Context, webhookID string) { webhookDB, err := e.dbManager.GetDB(webhookID) if err != nil { e.log.Error("failed to get webhook database", "webhook_id", webhookID, "error", err, ) return } // Query pending and retrying deliveries from the per-webhook DB. // Preload Event (same DB) but NOT Target (Target is in the main DB). var deliveries []database.Delivery result := webhookDB. Where("status IN ?", []database.DeliveryStatus{ database.DeliveryStatusPending, database.DeliveryStatusRetrying, }). Preload("Event"). Find(&deliveries) if result.Error != nil { e.log.Error("failed to query pending deliveries", "webhook_id", webhookID, "error", result.Error, ) return } if len(deliveries) == 0 { return } // Collect unique target IDs and load targets from the main DB seen := make(map[string]bool) targetIDs := make([]string, 0, len(deliveries)) for _, d := range deliveries { if !seen[d.TargetID] { targetIDs = append(targetIDs, d.TargetID) seen[d.TargetID] = true } } var targets []database.Target if err := e.database.DB().Where("id IN ?", targetIDs).Find(&targets).Error; err != nil { e.log.Error("failed to load targets from main DB", "error", err) return } targetMap := make(map[string]database.Target, len(targets)) for _, t := range targets { targetMap[t.ID] = t } for i := range deliveries { select { case <-ctx.Done(): return default: target, ok := targetMap[deliveries[i].TargetID] if !ok { e.log.Error("target not found for delivery", "delivery_id", deliveries[i].ID, "target_id", deliveries[i].TargetID, ) continue } deliveries[i].Target = target e.processDelivery(ctx, webhookDB, &deliveries[i]) } } } func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *database.Delivery) { switch d.Target.Type { case database.TargetTypeHTTP: e.deliverHTTP(ctx, webhookDB, d) case database.TargetTypeRetry: e.deliverRetry(ctx, webhookDB, d) case database.TargetTypeDatabase: e.deliverDatabase(webhookDB, d) case database.TargetTypeLog: e.deliverLog(webhookDB, d) default: e.log.Error("unknown target type", "target_id", d.TargetID, "type", d.Target.Type, ) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } } func (e *Engine) deliverHTTP(_ context.Context, webhookDB *gorm.DB, d *database.Delivery) { cfg, err := e.parseHTTPConfig(d.Target.Config) if err != nil { e.log.Error("invalid HTTP target config", "target_id", d.TargetID, "error", err, ) e.recordResult(webhookDB, d, 1, false, 0, "", err.Error(), 0) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) return } statusCode, respBody, duration, err := e.doHTTPRequest(cfg, &d.Event) success := err == nil && statusCode >= 200 && statusCode < 300 errMsg := "" if err != nil { errMsg = err.Error() } e.recordResult(webhookDB, d, 1, success, statusCode, respBody, errMsg, duration) if success { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) } else { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } } func (e *Engine) deliverRetry(_ context.Context, webhookDB *gorm.DB, d *database.Delivery) { cfg, err := e.parseHTTPConfig(d.Target.Config) if err != nil { e.log.Error("invalid retry target config", "target_id", d.TargetID, "error", err, ) e.recordResult(webhookDB, d, 1, false, 0, "", err.Error(), 0) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) return } // Determine attempt number from existing results (in per-webhook DB) var resultCount int64 webhookDB.Model(&database.DeliveryResult{}).Where("delivery_id = ?", d.ID).Count(&resultCount) attemptNum := int(resultCount) + 1 // Check if we should wait before retrying (exponential backoff) if attemptNum > 1 { var lastResult database.DeliveryResult lookupErr := webhookDB.Where("delivery_id = ?", d.ID).Order("created_at DESC").First(&lastResult).Error if lookupErr == nil { shift := attemptNum - 2 if shift > 30 { shift = 30 } backoff := time.Duration(1<= 200 && statusCode < 300 errMsg := "" if err != nil { errMsg = err.Error() } e.recordResult(webhookDB, d, attemptNum, success, statusCode, respBody, errMsg, duration) if success { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) return } maxRetries := d.Target.MaxRetries if maxRetries <= 0 { maxRetries = 5 // default } if attemptNum >= maxRetries { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) } else { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusRetrying) } } // deliverDatabase handles the database target type. Since events are already // stored in the per-webhook database (that's the whole point of per-webhook // databases), the database target simply marks the delivery as successful. // The per-webhook DB IS the dedicated event database for this webhook. func (e *Engine) deliverDatabase(webhookDB *gorm.DB, d *database.Delivery) { e.recordResult(webhookDB, d, 1, true, 0, "", "", 0) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) } func (e *Engine) deliverLog(webhookDB *gorm.DB, d *database.Delivery) { e.log.Info("webhook event delivered to log target", "delivery_id", d.ID, "event_id", d.EventID, "target_id", d.TargetID, "target_name", d.Target.Name, "method", d.Event.Method, "content_type", d.Event.ContentType, "body_length", len(d.Event.Body), ) e.recordResult(webhookDB, d, 1, true, 0, "", "", 0) e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) } // doHTTPRequest performs the outbound HTTP POST to a target URL. func (e *Engine) doHTTPRequest(cfg *HTTPTargetConfig, event *database.Event) (statusCode int, respBody string, durationMs int64, err error) { start := time.Now() req, err := http.NewRequest(http.MethodPost, cfg.URL, bytes.NewReader([]byte(event.Body))) if err != nil { return 0, "", 0, fmt.Errorf("creating request: %w", err) } // Set content type from original event if event.ContentType != "" { req.Header.Set("Content-Type", event.ContentType) } // Apply original headers (filtered) var originalHeaders map[string][]string if event.Headers != "" { if jsonErr := json.Unmarshal([]byte(event.Headers), &originalHeaders); jsonErr == nil { for k, vals := range originalHeaders { if isForwardableHeader(k) { for _, v := range vals { req.Header.Add(k, v) } } } } } // Apply target-specific headers (override) for k, v := range cfg.Headers { req.Header.Set(k, v) } req.Header.Set("User-Agent", "webhooker/1.0") client := e.client if cfg.Timeout > 0 { client = &http.Client{Timeout: time.Duration(cfg.Timeout) * time.Second} } resp, err := client.Do(req) durationMs = time.Since(start).Milliseconds() if err != nil { return 0, "", durationMs, fmt.Errorf("sending request: %w", err) } defer resp.Body.Close() body, readErr := io.ReadAll(io.LimitReader(resp.Body, maxBodyLog)) if readErr != nil { return resp.StatusCode, "", durationMs, fmt.Errorf("reading response body: %w", readErr) } return resp.StatusCode, string(body), durationMs, nil } func (e *Engine) recordResult(webhookDB *gorm.DB, d *database.Delivery, attemptNum int, success bool, statusCode int, respBody, errMsg string, durationMs int64) { result := &database.DeliveryResult{ DeliveryID: d.ID, AttemptNum: attemptNum, Success: success, StatusCode: statusCode, ResponseBody: truncate(respBody, maxBodyLog), Error: errMsg, Duration: durationMs, } if err := webhookDB.Create(result).Error; err != nil { e.log.Error("failed to record delivery result", "delivery_id", d.ID, "error", err, ) } } func (e *Engine) updateDeliveryStatus(webhookDB *gorm.DB, d *database.Delivery, status database.DeliveryStatus) { if err := webhookDB.Model(d).Update("status", status).Error; err != nil { e.log.Error("failed to update delivery status", "delivery_id", d.ID, "status", status, "error", err, ) } } func (e *Engine) parseHTTPConfig(configJSON string) (*HTTPTargetConfig, error) { if configJSON == "" { return nil, fmt.Errorf("empty target config") } var cfg HTTPTargetConfig if err := json.Unmarshal([]byte(configJSON), &cfg); err != nil { return nil, fmt.Errorf("parsing config JSON: %w", err) } if cfg.URL == "" { return nil, fmt.Errorf("target URL is required") } return &cfg, nil } // isForwardableHeader returns true if the header should be forwarded to targets. // Hop-by-hop headers and internal headers are excluded. func isForwardableHeader(name string) bool { switch http.CanonicalHeaderKey(name) { case "Host", "Connection", "Keep-Alive", "Transfer-Encoding", "Te", "Trailer", "Upgrade", "Proxy-Authorization", "Proxy-Connection", "Content-Length": return false default: return true } } func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen] }