Store the *database.Database wrapper instead of calling .DB() eagerly at construction time. The GORM *gorm.DB is only available after the database's OnStart hook runs, but the engine constructor runs during fx resolution (before OnStart). Accessing .DB() lazily via the wrapper avoids the nil pointer panic.
383 lines
9.6 KiB
Go
383 lines
9.6 KiB
Go
package delivery
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/fx"
|
|
"sneak.berlin/go/webhooker/internal/database"
|
|
"sneak.berlin/go/webhooker/internal/logger"
|
|
)
|
|
|
|
const (
|
|
// pollInterval is how often the engine checks for pending deliveries.
|
|
pollInterval = 2 * time.Second
|
|
|
|
// httpClientTimeout is the timeout for outbound HTTP requests.
|
|
httpClientTimeout = 30 * time.Second
|
|
|
|
// maxBodyLog is the maximum response body length to store in DeliveryResult.
|
|
maxBodyLog = 4096
|
|
)
|
|
|
|
// HTTPTargetConfig holds configuration for http and retry target types.
|
|
type HTTPTargetConfig struct {
|
|
URL string `json:"url"`
|
|
Headers map[string]string `json:"headers,omitempty"`
|
|
Timeout int `json:"timeout,omitempty"` // seconds, 0 = default
|
|
}
|
|
|
|
// EngineParams are the fx dependencies for the delivery engine.
|
|
//
|
|
//nolint:revive // EngineParams is a standard fx naming convention
|
|
type EngineParams struct {
|
|
fx.In
|
|
DB *database.Database
|
|
Logger *logger.Logger
|
|
}
|
|
|
|
// Engine processes queued deliveries in the background.
|
|
type Engine struct {
|
|
database *database.Database
|
|
log *slog.Logger
|
|
client *http.Client
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// New creates and registers the delivery engine with the fx lifecycle.
|
|
func New(lc fx.Lifecycle, params EngineParams) *Engine {
|
|
e := &Engine{
|
|
database: params.DB,
|
|
log: params.Logger.Get(),
|
|
client: &http.Client{
|
|
Timeout: httpClientTimeout,
|
|
},
|
|
}
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(_ context.Context) error {
|
|
e.start()
|
|
return nil
|
|
},
|
|
OnStop: func(_ context.Context) error {
|
|
e.stop()
|
|
return nil
|
|
},
|
|
})
|
|
|
|
return e
|
|
}
|
|
|
|
func (e *Engine) start() {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
e.cancel = cancel
|
|
e.wg.Add(1)
|
|
go e.run(ctx)
|
|
e.log.Info("delivery engine started")
|
|
}
|
|
|
|
func (e *Engine) stop() {
|
|
e.log.Info("delivery engine stopping")
|
|
e.cancel()
|
|
e.wg.Wait()
|
|
e.log.Info("delivery engine stopped")
|
|
}
|
|
|
|
func (e *Engine) run(ctx context.Context) {
|
|
defer e.wg.Done()
|
|
|
|
ticker := time.NewTicker(pollInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
e.processPending(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) processPending(ctx context.Context) {
|
|
var deliveries []database.Delivery
|
|
result := e.database.DB().
|
|
Where("status IN ?", []database.DeliveryStatus{
|
|
database.DeliveryStatusPending,
|
|
database.DeliveryStatusRetrying,
|
|
}).
|
|
Preload("Target").
|
|
Preload("Event").
|
|
Find(&deliveries)
|
|
|
|
if result.Error != nil {
|
|
e.log.Error("failed to query pending deliveries", "error", result.Error)
|
|
return
|
|
}
|
|
|
|
for i := range deliveries {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
e.processDelivery(ctx, &deliveries[i])
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Engine) processDelivery(ctx context.Context, d *database.Delivery) {
|
|
switch d.Target.Type {
|
|
case database.TargetTypeHTTP:
|
|
e.deliverHTTP(ctx, d)
|
|
case database.TargetTypeRetry:
|
|
e.deliverRetry(ctx, d)
|
|
case database.TargetTypeDatabase:
|
|
e.deliverDatabase(d)
|
|
case database.TargetTypeLog:
|
|
e.deliverLog(d)
|
|
default:
|
|
e.log.Error("unknown target type",
|
|
"target_id", d.TargetID,
|
|
"type", d.Target.Type,
|
|
)
|
|
e.updateDeliveryStatus(d, database.DeliveryStatusFailed)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) deliverHTTP(_ context.Context, d *database.Delivery) {
|
|
cfg, err := e.parseHTTPConfig(d.Target.Config)
|
|
if err != nil {
|
|
e.log.Error("invalid HTTP target config",
|
|
"target_id", d.TargetID,
|
|
"error", err,
|
|
)
|
|
e.recordResult(d, 1, false, 0, "", err.Error(), 0)
|
|
e.updateDeliveryStatus(d, database.DeliveryStatusFailed)
|
|
return
|
|
}
|
|
|
|
statusCode, respBody, duration, err := e.doHTTPRequest(cfg, &d.Event)
|
|
|
|
success := err == nil && statusCode >= 200 && statusCode < 300
|
|
errMsg := ""
|
|
if err != nil {
|
|
errMsg = err.Error()
|
|
}
|
|
|
|
e.recordResult(d, 1, success, statusCode, respBody, errMsg, duration)
|
|
|
|
if success {
|
|
e.updateDeliveryStatus(d, database.DeliveryStatusDelivered)
|
|
} else {
|
|
e.updateDeliveryStatus(d, database.DeliveryStatusFailed)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) deliverRetry(_ context.Context, d *database.Delivery) {
|
|
cfg, err := e.parseHTTPConfig(d.Target.Config)
|
|
if err != nil {
|
|
e.log.Error("invalid retry target config",
|
|
"target_id", d.TargetID,
|
|
"error", err,
|
|
)
|
|
e.recordResult(d, 1, false, 0, "", err.Error(), 0)
|
|
e.updateDeliveryStatus(d, database.DeliveryStatusFailed)
|
|
return
|
|
}
|
|
|
|
// Determine attempt number from existing results
|
|
var resultCount int64
|
|
e.database.DB().Model(&database.DeliveryResult{}).Where("delivery_id = ?", d.ID).Count(&resultCount)
|
|
attemptNum := int(resultCount) + 1
|
|
|
|
// Check if we should wait before retrying (exponential backoff)
|
|
if attemptNum > 1 {
|
|
var lastResult database.DeliveryResult
|
|
lookupErr := e.database.DB().Where("delivery_id = ?", d.ID).Order("created_at DESC").First(&lastResult).Error
|
|
if lookupErr == nil {
|
|
shift := attemptNum - 2
|
|
if shift > 30 {
|
|
shift = 30
|
|
}
|
|
backoff := time.Duration(1<<uint(shift)) * time.Second //nolint:gosec // bounded above
|
|
nextAttempt := lastResult.CreatedAt.Add(backoff)
|
|
if time.Now().UTC().Before(nextAttempt) {
|
|
// Not time to retry yet
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
statusCode, respBody, duration, err := e.doHTTPRequest(cfg, &d.Event)
|
|
|
|
success := err == nil && statusCode >= 200 && statusCode < 300
|
|
errMsg := ""
|
|
if err != nil {
|
|
errMsg = err.Error()
|
|
}
|
|
|
|
e.recordResult(d, attemptNum, success, statusCode, respBody, errMsg, duration)
|
|
|
|
if success {
|
|
e.updateDeliveryStatus(d, database.DeliveryStatusDelivered)
|
|
return
|
|
}
|
|
|
|
maxRetries := d.Target.MaxRetries
|
|
if maxRetries <= 0 {
|
|
maxRetries = 5 // default
|
|
}
|
|
|
|
if attemptNum >= maxRetries {
|
|
e.updateDeliveryStatus(d, database.DeliveryStatusFailed)
|
|
} else {
|
|
e.updateDeliveryStatus(d, database.DeliveryStatusRetrying)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) deliverDatabase(d *database.Delivery) {
|
|
// The event is already stored in the database; mark as delivered.
|
|
e.recordResult(d, 1, true, 0, "", "", 0)
|
|
e.updateDeliveryStatus(d, database.DeliveryStatusDelivered)
|
|
}
|
|
|
|
func (e *Engine) deliverLog(d *database.Delivery) {
|
|
e.log.Info("webhook event delivered to log target",
|
|
"delivery_id", d.ID,
|
|
"event_id", d.EventID,
|
|
"target_id", d.TargetID,
|
|
"target_name", d.Target.Name,
|
|
"method", d.Event.Method,
|
|
"content_type", d.Event.ContentType,
|
|
"body_length", len(d.Event.Body),
|
|
)
|
|
e.recordResult(d, 1, true, 0, "", "", 0)
|
|
e.updateDeliveryStatus(d, database.DeliveryStatusDelivered)
|
|
}
|
|
|
|
// doHTTPRequest performs the outbound HTTP POST to a target URL.
|
|
func (e *Engine) doHTTPRequest(cfg *HTTPTargetConfig, event *database.Event) (statusCode int, respBody string, durationMs int64, err error) {
|
|
start := time.Now()
|
|
|
|
req, err := http.NewRequest(http.MethodPost, cfg.URL, bytes.NewReader([]byte(event.Body)))
|
|
if err != nil {
|
|
return 0, "", 0, fmt.Errorf("creating request: %w", err)
|
|
}
|
|
|
|
// Set content type from original event
|
|
if event.ContentType != "" {
|
|
req.Header.Set("Content-Type", event.ContentType)
|
|
}
|
|
|
|
// Apply original headers (filtered)
|
|
var originalHeaders map[string][]string
|
|
if event.Headers != "" {
|
|
if jsonErr := json.Unmarshal([]byte(event.Headers), &originalHeaders); jsonErr == nil {
|
|
for k, vals := range originalHeaders {
|
|
if isForwardableHeader(k) {
|
|
for _, v := range vals {
|
|
req.Header.Add(k, v)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Apply target-specific headers (override)
|
|
for k, v := range cfg.Headers {
|
|
req.Header.Set(k, v)
|
|
}
|
|
|
|
req.Header.Set("User-Agent", "webhooker/1.0")
|
|
|
|
client := e.client
|
|
if cfg.Timeout > 0 {
|
|
client = &http.Client{Timeout: time.Duration(cfg.Timeout) * time.Second}
|
|
}
|
|
|
|
resp, err := client.Do(req)
|
|
durationMs = time.Since(start).Milliseconds()
|
|
if err != nil {
|
|
return 0, "", durationMs, fmt.Errorf("sending request: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
body, readErr := io.ReadAll(io.LimitReader(resp.Body, maxBodyLog))
|
|
if readErr != nil {
|
|
return resp.StatusCode, "", durationMs, fmt.Errorf("reading response body: %w", readErr)
|
|
}
|
|
|
|
return resp.StatusCode, string(body), durationMs, nil
|
|
}
|
|
|
|
func (e *Engine) recordResult(d *database.Delivery, attemptNum int, success bool, statusCode int, respBody, errMsg string, durationMs int64) {
|
|
result := &database.DeliveryResult{
|
|
DeliveryID: d.ID,
|
|
AttemptNum: attemptNum,
|
|
Success: success,
|
|
StatusCode: statusCode,
|
|
ResponseBody: truncate(respBody, maxBodyLog),
|
|
Error: errMsg,
|
|
Duration: durationMs,
|
|
}
|
|
|
|
if err := e.database.DB().Create(result).Error; err != nil {
|
|
e.log.Error("failed to record delivery result",
|
|
"delivery_id", d.ID,
|
|
"error", err,
|
|
)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) updateDeliveryStatus(d *database.Delivery, status database.DeliveryStatus) {
|
|
if err := e.database.DB().Model(d).Update("status", status).Error; err != nil {
|
|
e.log.Error("failed to update delivery status",
|
|
"delivery_id", d.ID,
|
|
"status", status,
|
|
"error", err,
|
|
)
|
|
}
|
|
}
|
|
|
|
func (e *Engine) parseHTTPConfig(configJSON string) (*HTTPTargetConfig, error) {
|
|
if configJSON == "" {
|
|
return nil, fmt.Errorf("empty target config")
|
|
}
|
|
var cfg HTTPTargetConfig
|
|
if err := json.Unmarshal([]byte(configJSON), &cfg); err != nil {
|
|
return nil, fmt.Errorf("parsing config JSON: %w", err)
|
|
}
|
|
if cfg.URL == "" {
|
|
return nil, fmt.Errorf("target URL is required")
|
|
}
|
|
return &cfg, nil
|
|
}
|
|
|
|
// isForwardableHeader returns true if the header should be forwarded to targets.
|
|
// Hop-by-hop headers and internal headers are excluded.
|
|
func isForwardableHeader(name string) bool {
|
|
switch http.CanonicalHeaderKey(name) {
|
|
case "Host", "Connection", "Keep-Alive", "Transfer-Encoding",
|
|
"Te", "Trailer", "Upgrade", "Proxy-Authorization",
|
|
"Proxy-Connection", "Content-Length":
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
func truncate(s string, maxLen int) string {
|
|
if len(s) <= maxLen {
|
|
return s
|
|
}
|
|
return s[:maxLen]
|
|
}
|