// Package delivery manages asynchronous event delivery // to configured targets. package delivery import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" "strings" "sync" "time" "go.uber.org/fx" "gorm.io/gorm" "sneak.berlin/go/webhooker/internal/database" "sneak.berlin/go/webhooker/internal/logger" ) const ( // deliveryChannelSize is the buffer size for the delivery // channel. New Tasks from the webhook handler are sent // here. Workers drain this channel. Sized large enough // that the webhook handler should never block under // normal load. deliveryChannelSize = 10000 // retryChannelSize is the buffer size for the retry // channel. Timer-fired retries are sent here for // processing by workers. retryChannelSize = 10000 // defaultWorkers is the number of worker goroutines in // the delivery engine pool. At most this many deliveries // are in-flight at any time, preventing goroutine // explosions regardless of queue depth. defaultWorkers = 10 // retrySweepInterval is how often the periodic retry // sweep runs. retrySweepInterval = 60 * time.Second // MaxInlineBodySize is the maximum event body size that // will be carried inline in a Task through the channel. // Bodies at or above this size are left nil and fetched // from the per-webhook database on demand. MaxInlineBodySize = 16 * 1024 // httpClientTimeout is the timeout for outbound HTTP // requests. httpClientTimeout = 30 * time.Second // maxBodyLog is the maximum response body length to // store in DeliveryResult. maxBodyLog = 4096 // maxBackoffShift caps the exponential backoff shift to // avoid integer overflow in the 1<= httpSuccessMin && statusCode < httpSuccessMax errMsg := "" if reqErr != nil { errMsg = reqErr.Error() } e.recordResult( webhookDB, d, 1, success, statusCode, respBody, errMsg, duration, ) if success { e.updateDeliveryStatus( webhookDB, d, database.DeliveryStatusDelivered, ) } else { e.updateDeliveryStatus( webhookDB, d, database.DeliveryStatusFailed, ) } } func (e *Engine) deliverHTTPWithRetry( ctx context.Context, webhookDB *gorm.DB, d *database.Delivery, task *Task, cfg *HTTPTargetConfig, ) { cb := e.getCircuitBreaker(task.TargetID) if e.circuitBreakerBlock( webhookDB, d, task, cb, ) { return } attemptNum := task.AttemptNum statusCode, respBody, duration, reqErr := e.doHTTPRequest(ctx, cfg, &d.Event) success := reqErr == nil && statusCode >= httpSuccessMin && statusCode < httpSuccessMax errMsg := "" if reqErr != nil { errMsg = reqErr.Error() } e.recordResult( webhookDB, d, attemptNum, success, statusCode, respBody, errMsg, duration, ) if success { cb.RecordSuccess() e.updateDeliveryStatus( webhookDB, d, database.DeliveryStatusDelivered, ) return } cb.RecordFailure() e.handleHTTPRetry(webhookDB, d, task, attemptNum) } func (e *Engine) circuitBreakerBlock( webhookDB *gorm.DB, d *database.Delivery, task *Task, cb *CircuitBreaker, ) bool { if cb.Allow() { return false } remaining := cb.CooldownRemaining() e.log.Info( "circuit breaker open, skipping delivery", "target_id", task.TargetID, "target_name", task.TargetName, "delivery_id", d.ID, "cooldown_remaining", remaining, ) e.updateDeliveryStatus( webhookDB, d, database.DeliveryStatusRetrying, ) retryTask := *task e.scheduleRetry(retryTask, remaining) return true } func (e *Engine) handleHTTPRetry( webhookDB *gorm.DB, d *database.Delivery, task *Task, attemptNum int, ) { if attemptNum >= d.Target.MaxRetries { e.updateDeliveryStatus( webhookDB, d, database.DeliveryStatusFailed, ) return } e.updateDeliveryStatus( webhookDB, d, database.DeliveryStatusRetrying, ) backoff := calcBackoff(attemptNum) retryTask := *task retryTask.AttemptNum = attemptNum + 1 e.scheduleRetry(retryTask, backoff) } func (e *Engine) getCircuitBreaker( targetID string, ) *CircuitBreaker { if val, ok := e.circuitBreakers.Load(targetID); ok { cb, _ := val.(*CircuitBreaker) return cb } fresh := NewCircuitBreaker() actual, _ := e.circuitBreakers.LoadOrStore( targetID, fresh, ) cb, _ := actual.(*CircuitBreaker) return cb } func (e *Engine) deliverDatabase( webhookDB *gorm.DB, d *database.Delivery, ) { e.recordResult( webhookDB, d, 1, true, 0, "", "", 0, ) e.updateDeliveryStatus( webhookDB, d, database.DeliveryStatusDelivered, ) } func (e *Engine) deliverLog( webhookDB *gorm.DB, d *database.Delivery, ) { e.log.Info( "webhook event delivered to log target", "delivery_id", d.ID, "event_id", d.EventID, "target_id", d.TargetID, "target_name", d.Target.Name, "method", d.Event.Method, "content_type", d.Event.ContentType, "body_length", len(d.Event.Body), ) e.recordResult( webhookDB, d, 1, true, 0, "", "", 0, ) e.updateDeliveryStatus( webhookDB, d, database.DeliveryStatusDelivered, ) } func (e *Engine) deliverSlack( ctx context.Context, webhookDB *gorm.DB, d *database.Delivery, ) { cfg, err := e.parseSlackConfig(d.Target.Config) if err != nil { e.log.Error( "invalid Slack target config", "target_id", d.TargetID, "error", err, ) e.recordResult( webhookDB, d, 1, false, 0, "", err.Error(), 0, ) e.updateDeliveryStatus( webhookDB, d, database.DeliveryStatusFailed, ) return } msg := FormatSlackMessage(&d.Event) payload, err := json.Marshal( map[string]string{"text": msg}, ) if err != nil { e.log.Error( "failed to marshal Slack payload", "target_id", d.TargetID, "error", err, ) e.recordResult( webhookDB, d, 1, false, 0, "", err.Error(), 0, ) e.updateDeliveryStatus( webhookDB, d, database.DeliveryStatusFailed, ) return } e.sendSlackRequest( ctx, webhookDB, d, cfg, payload, ) } func (e *Engine) sendSlackRequest( ctx context.Context, webhookDB *gorm.DB, d *database.Delivery, cfg *SlackTargetConfig, payload []byte, ) { start := time.Now() req, err := http.NewRequestWithContext( ctx, http.MethodPost, cfg.WebhookURL, bytes.NewReader(payload), ) if err != nil { e.failSlackDelivery( webhookDB, d, err.Error(), 0, ) return } req.Header.Set("Content-Type", "application/json") req.Header.Set("User-Agent", "webhooker/1.0") resp, doErr := e.executeRequest(req) durationMs := time.Since(start).Milliseconds() if doErr != nil { errStr := fmt.Errorf( "sending request: %w", doErr, ).Error() e.failSlackDelivery( webhookDB, d, errStr, durationMs, ) return } defer func() { _ = resp.Body.Close() }() e.handleSlackResponse( webhookDB, d, resp, durationMs, ) } func (e *Engine) failSlackDelivery( webhookDB *gorm.DB, d *database.Delivery, errMsg string, durationMs int64, ) { e.recordResult( webhookDB, d, 1, false, 0, "", errMsg, durationMs, ) e.updateDeliveryStatus( webhookDB, d, database.DeliveryStatusFailed, ) } func (e *Engine) handleSlackResponse( webhookDB *gorm.DB, d *database.Delivery, resp *http.Response, durationMs int64, ) { body, readErr := io.ReadAll( io.LimitReader(resp.Body, maxBodyLog), ) if readErr != nil { e.log.Error( "failed to read Slack response body", "error", readErr, ) } respBody := string(body) success := resp.StatusCode >= httpSuccessMin && resp.StatusCode < httpSuccessMax errMsg := "" if !success { errMsg = fmt.Sprintf("HTTP %d", resp.StatusCode) } e.recordResult( webhookDB, d, 1, success, resp.StatusCode, respBody, errMsg, durationMs, ) if success { e.updateDeliveryStatus( webhookDB, d, database.DeliveryStatusDelivered, ) } else { e.updateDeliveryStatus( webhookDB, d, database.DeliveryStatusFailed, ) } } func (e *Engine) parseSlackConfig( configJSON string, ) (*SlackTargetConfig, error) { if configJSON == "" { return nil, errEmptyTargetConfig } var cfg SlackTargetConfig err := json.Unmarshal( []byte(configJSON), &cfg, ) if err != nil { return nil, fmt.Errorf( "parsing config JSON: %w", err, ) } if cfg.WebhookURL == "" { return nil, errMissingWebhookURL } return &cfg, nil } func (e *Engine) doHTTPRequest( ctx context.Context, cfg *HTTPTargetConfig, event *database.Event, ) (int, string, int64, error) { start := time.Now() req, reqErr := http.NewRequestWithContext( ctx, http.MethodPost, cfg.URL, bytes.NewReader([]byte(event.Body)), ) if reqErr != nil { return 0, "", 0, fmt.Errorf( "creating request: %w", reqErr, ) } applyRequestHeaders(req, event, cfg) client := e.clientForConfig(cfg) resp, doErr := executeHTTPRequest(client, req) dur := time.Since(start).Milliseconds() if doErr != nil { return 0, "", dur, fmt.Errorf( "sending request: %w", doErr, ) } defer func() { _ = resp.Body.Close() }() body, readErr := io.ReadAll( io.LimitReader(resp.Body, maxBodyLog), ) if readErr != nil { return resp.StatusCode, "", dur, fmt.Errorf( "reading response body: %w", readErr, ) } return resp.StatusCode, string(body), dur, nil } func (e *Engine) recordResult( webhookDB *gorm.DB, d *database.Delivery, attemptNum int, success bool, statusCode int, respBody, errMsg string, durationMs int64, ) { result := &database.DeliveryResult{ DeliveryID: d.ID, AttemptNum: attemptNum, Success: success, StatusCode: statusCode, ResponseBody: truncate(respBody, maxBodyLog), Error: errMsg, Duration: durationMs, } err := webhookDB.Create(result).Error if err != nil { e.log.Error( "failed to record delivery result", "delivery_id", d.ID, "error", err, ) } } func (e *Engine) updateDeliveryStatus( webhookDB *gorm.DB, d *database.Delivery, status database.DeliveryStatus, ) { err := webhookDB.Model(d). Update("status", status).Error if err != nil { e.log.Error( "failed to update delivery status", "delivery_id", d.ID, "status", status, "error", err, ) } } func (e *Engine) parseHTTPConfig( configJSON string, ) (*HTTPTargetConfig, error) { if configJSON == "" { return nil, errEmptyTargetConfig } var cfg HTTPTargetConfig err := json.Unmarshal( []byte(configJSON), &cfg, ) if err != nil { return nil, fmt.Errorf( "parsing config JSON: %w", err, ) } if cfg.URL == "" { return nil, errMissingTargetURL } return &cfg, nil } // isForwardableHeader returns true if the header should // be forwarded to targets. func isForwardableHeader(name string) bool { switch http.CanonicalHeaderKey(name) { case "Host", "Connection", "Keep-Alive", "Transfer-Encoding", "Te", "Trailer", "Upgrade", "Proxy-Authorization", "Proxy-Connection", "Content-Length": return false default: return true } } func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen] } // --- Helper functions --- func buildEventFromTask(task *Task) database.Event { event := database.Event{ Method: task.Method, Headers: task.Headers, ContentType: task.ContentType, } event.ID = task.EventID event.WebhookID = task.WebhookID return event } func buildTargetFromTask(task *Task) database.Target { target := database.Target{ Name: task.TargetName, Type: task.TargetType, Config: task.TargetConfig, MaxRetries: task.MaxRetries, } target.ID = task.TargetID return target } func (e *Engine) resolveEventBody( webhookDB *gorm.DB, event database.Event, task *Task, ) (database.Event, error) { if task.Body != nil { event.Body = *task.Body return event, nil } var dbEvent database.Event err := webhookDB.Select("body"). First(&dbEvent, "id = ?", task.EventID).Error if err != nil { return event, fmt.Errorf( "fetching event body: %w", err, ) } event.Body = dbEvent.Body return event, nil } func (e *Engine) loadRetryDelivery( webhookDB *gorm.DB, deliveryID string, ) (*database.Delivery, error) { var d database.Delivery err := webhookDB.Select("id", "status"). First(&d, "id = ?", deliveryID).Error if err != nil { return nil, fmt.Errorf( "loading delivery: %w", err, ) } return &d, nil } func (e *Engine) countAttempts( webhookDB *gorm.DB, deliveryID string, ) int { var resultCount int64 webhookDB.Model(&database.DeliveryResult{}). Where("delivery_id = ?", deliveryID). Count(&resultCount) return int(resultCount) } func (e *Engine) loadEvent( webhookDB *gorm.DB, eventID string, ) (database.Event, error) { var event database.Event err := webhookDB. First(&event, "id = ?", eventID).Error if err != nil { return event, fmt.Errorf( "loading event: %w", err, ) } return event, nil } func (e *Engine) loadTarget( targetID string, ) (database.Target, error) { var target database.Target err := e.database.DB(). First(&target, "id = ?", targetID).Error if err != nil { return target, fmt.Errorf( "loading target: %w", err, ) } return target, nil } func calcBackoff(attemptNum int) time.Duration { shift := max(attemptNum-1, 0) shift = min(shift, maxBackoffShift) return time.Duration(1<= backoff } func buildRecoveryTask( d *database.Delivery, webhookID string, event *database.Event, target *database.Target, attemptNum int, ) Task { var bodyPtr *string if len(event.Body) < MaxInlineBodySize { bodyStr := event.Body bodyPtr = &bodyStr } return Task{ DeliveryID: d.ID, EventID: d.EventID, WebhookID: webhookID, TargetID: target.ID, TargetName: target.Name, TargetType: target.Type, TargetConfig: target.Config, MaxRetries: target.MaxRetries, Method: event.Method, Headers: event.Headers, ContentType: event.ContentType, Body: bodyPtr, AttemptNum: attemptNum, } } func (e *Engine) loadTargetMap( deliveries []database.Delivery, ) map[string]database.Target { seen := make(map[string]bool) targetIDs := make([]string, 0, len(deliveries)) for _, d := range deliveries { if !seen[d.TargetID] { targetIDs = append(targetIDs, d.TargetID) seen[d.TargetID] = true } } var targets []database.Target err := e.database.DB(). Where("id IN ?", targetIDs). Find(&targets).Error if err != nil { e.log.Error( "failed to load targets from main DB", "error", err, ) return nil } targetMap := make( map[string]database.Target, len(targets), ) for _, t := range targets { targetMap[t.ID] = t } return targetMap } func (e *Engine) sendRecoveredDeliveries( ctx context.Context, deliveries []database.Delivery, webhookID string, targetMap map[string]database.Target, ) { for i := range deliveries { select { case <-ctx.Done(): return default: } target, ok := targetMap[deliveries[i].TargetID] if !ok { e.log.Error( "target not found for delivery", "delivery_id", deliveries[i].ID, "target_id", deliveries[i].TargetID, ) continue } task := buildRecoveryTask( &deliveries[i], webhookID, &deliveries[i].Event, &target, 1, ) select { case e.deliveryCh <- task: default: e.log.Warn( "delivery channel full during "+ "recovery, remaining deliveries "+ "will be recovered on next restart", "delivery_id", deliveries[i].ID, ) return } } } func formatJSONBody(body string) string { var parsed json.RawMessage if json.Unmarshal([]byte(body), &parsed) != nil { return "" } var pretty bytes.Buffer if json.Indent(&pretty, parsed, "", " ") != nil { return "" } var b strings.Builder b.WriteString("\n```\n") prettyStr := pretty.String() const maxPayloadDisplay = 3500 if len(prettyStr) > maxPayloadDisplay { b.WriteString(prettyStr[:maxPayloadDisplay]) b.WriteString("\n... (truncated)") } else { b.WriteString(prettyStr) } b.WriteString("\n```\n") return b.String() } func formatRawBody(b *strings.Builder, body string) { b.WriteString("\n```\n") const maxRawDisplay = 3500 if len(body) > maxRawDisplay { b.WriteString(body[:maxRawDisplay]) b.WriteString("\n... (truncated)") } else { b.WriteString(body) } b.WriteString("\n```\n") } func applyRequestHeaders( req *http.Request, event *database.Event, cfg *HTTPTargetConfig, ) { if event.ContentType != "" { req.Header.Set( "Content-Type", event.ContentType, ) } var originalHeaders map[string][]string if event.Headers != "" { jsonErr := json.Unmarshal( []byte(event.Headers), &originalHeaders, ) if jsonErr == nil { for k, vals := range originalHeaders { if isForwardableHeader(k) { for _, v := range vals { req.Header.Add(k, v) } } } } } for k, v := range cfg.Headers { req.Header.Set(k, v) } req.Header.Set("User-Agent", "webhooker/1.0") } func (e *Engine) clientForConfig( cfg *HTTPTargetConfig, ) *http.Client { if cfg.Timeout > 0 { return &http.Client{ Timeout: time.Duration( cfg.Timeout, ) * time.Second, } } return e.client } // executeRequest sends an HTTP request using the engine's // default client. URLs are validated by SSRF-safe // transport and config parsers before reaching here. func (e *Engine) executeRequest( req *http.Request, ) (*http.Response, error) { return e.client.Do(req) //#nosec G704 -- URL validated by parseSlackConfig and SSRF-safe transport } // executeHTTPRequest sends an HTTP request using the // provided client. URLs are validated by config parsers // and SSRF-safe transport before reaching here. func executeHTTPRequest( client *http.Client, req *http.Request, ) (*http.Response, error) { return client.Do(req) //#nosec G704 -- URL validated by parseHTTPConfig and SSRF-safe transport }