feat: implement core webhook engine, delivery system, and management UI (Phase 2)
All checks were successful
check / check (push) Successful in 1m49s
All checks were successful
check / check (push) Successful in 1m49s
- Webhook reception handler: look up entrypoint by UUID, verify active,
capture full HTTP request (method, headers, body, content-type), create
Event record, queue Delivery records for each active Target, return 200 OK.
Handles edge cases: unknown UUID → 404, inactive → 410, oversized → 413.
- Delivery engine (internal/delivery): fx-managed background goroutine that
polls for pending/retrying deliveries and dispatches to target type handlers.
Graceful shutdown via context cancellation.
- Target type implementations:
- HTTP: fire-and-forget POST with original headers forwarding
- Retry: exponential backoff (1s, 2s, 4s...) up to max_retries
- Database: immediate success (event already stored)
- Log: slog output with event details
- Webhook management pages with Tailwind CSS + Alpine.js:
- List (/sources): webhooks with entrypoint/target/event counts
- Create (/sources/new): form with auto-created default entrypoint
- Detail (/source/{id}): config, entrypoints, targets, recent events
- Edit (/source/{id}/edit): name, description, retention_days
- Delete (/source/{id}/delete): soft-delete with child records
- Add Entrypoint (/source/{id}/entrypoints): inline form
- Add Target (/source/{id}/targets): type-aware form
- Event Log (/source/{id}/logs): paginated with delivery status
- Updated README: marked completed items, updated naming conventions
table, added delivery engine to package layout and DI docs, updated
column names to reflect entity rename.
- Rebuilt Tailwind CSS for new template classes.
Part of: #15
This commit is contained in:
383
internal/delivery/engine.go
Normal file
383
internal/delivery/engine.go
Normal file
@@ -0,0 +1,383 @@
|
||||
package delivery
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/fx"
|
||||
"gorm.io/gorm"
|
||||
"sneak.berlin/go/webhooker/internal/database"
|
||||
"sneak.berlin/go/webhooker/internal/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
// pollInterval is how often the engine checks for pending deliveries.
|
||||
pollInterval = 2 * time.Second
|
||||
|
||||
// httpClientTimeout is the timeout for outbound HTTP requests.
|
||||
httpClientTimeout = 30 * time.Second
|
||||
|
||||
// maxBodyLog is the maximum response body length to store in DeliveryResult.
|
||||
maxBodyLog = 4096
|
||||
)
|
||||
|
||||
// HTTPTargetConfig holds configuration for http and retry target types.
|
||||
type HTTPTargetConfig struct {
|
||||
URL string `json:"url"`
|
||||
Headers map[string]string `json:"headers,omitempty"`
|
||||
Timeout int `json:"timeout,omitempty"` // seconds, 0 = default
|
||||
}
|
||||
|
||||
// EngineParams are the fx dependencies for the delivery engine.
|
||||
//
|
||||
//nolint:revive // EngineParams is a standard fx naming convention
|
||||
type EngineParams struct {
|
||||
fx.In
|
||||
DB *database.Database
|
||||
Logger *logger.Logger
|
||||
}
|
||||
|
||||
// Engine processes queued deliveries in the background.
|
||||
type Engine struct {
|
||||
db *gorm.DB
|
||||
log *slog.Logger
|
||||
client *http.Client
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// New creates and registers the delivery engine with the fx lifecycle.
|
||||
func New(lc fx.Lifecycle, params EngineParams) *Engine {
|
||||
e := &Engine{
|
||||
db: params.DB.DB(),
|
||||
log: params.Logger.Get(),
|
||||
client: &http.Client{
|
||||
Timeout: httpClientTimeout,
|
||||
},
|
||||
}
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(_ context.Context) error {
|
||||
e.start()
|
||||
return nil
|
||||
},
|
||||
OnStop: func(_ context.Context) error {
|
||||
e.stop()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *Engine) start() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
e.cancel = cancel
|
||||
e.wg.Add(1)
|
||||
go e.run(ctx)
|
||||
e.log.Info("delivery engine started")
|
||||
}
|
||||
|
||||
func (e *Engine) stop() {
|
||||
e.log.Info("delivery engine stopping")
|
||||
e.cancel()
|
||||
e.wg.Wait()
|
||||
e.log.Info("delivery engine stopped")
|
||||
}
|
||||
|
||||
func (e *Engine) run(ctx context.Context) {
|
||||
defer e.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(pollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
e.processPending(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) processPending(ctx context.Context) {
|
||||
var deliveries []database.Delivery
|
||||
result := e.db.
|
||||
Where("status IN ?", []database.DeliveryStatus{
|
||||
database.DeliveryStatusPending,
|
||||
database.DeliveryStatusRetrying,
|
||||
}).
|
||||
Preload("Target").
|
||||
Preload("Event").
|
||||
Find(&deliveries)
|
||||
|
||||
if result.Error != nil {
|
||||
e.log.Error("failed to query pending deliveries", "error", result.Error)
|
||||
return
|
||||
}
|
||||
|
||||
for i := range deliveries {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
e.processDelivery(ctx, &deliveries[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) processDelivery(ctx context.Context, d *database.Delivery) {
|
||||
switch d.Target.Type {
|
||||
case database.TargetTypeHTTP:
|
||||
e.deliverHTTP(ctx, d)
|
||||
case database.TargetTypeRetry:
|
||||
e.deliverRetry(ctx, d)
|
||||
case database.TargetTypeDatabase:
|
||||
e.deliverDatabase(d)
|
||||
case database.TargetTypeLog:
|
||||
e.deliverLog(d)
|
||||
default:
|
||||
e.log.Error("unknown target type",
|
||||
"target_id", d.TargetID,
|
||||
"type", d.Target.Type,
|
||||
)
|
||||
e.updateDeliveryStatus(d, database.DeliveryStatusFailed)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) deliverHTTP(_ context.Context, d *database.Delivery) {
|
||||
cfg, err := e.parseHTTPConfig(d.Target.Config)
|
||||
if err != nil {
|
||||
e.log.Error("invalid HTTP target config",
|
||||
"target_id", d.TargetID,
|
||||
"error", err,
|
||||
)
|
||||
e.recordResult(d, 1, false, 0, "", err.Error(), 0)
|
||||
e.updateDeliveryStatus(d, database.DeliveryStatusFailed)
|
||||
return
|
||||
}
|
||||
|
||||
statusCode, respBody, duration, err := e.doHTTPRequest(cfg, &d.Event)
|
||||
|
||||
success := err == nil && statusCode >= 200 && statusCode < 300
|
||||
errMsg := ""
|
||||
if err != nil {
|
||||
errMsg = err.Error()
|
||||
}
|
||||
|
||||
e.recordResult(d, 1, success, statusCode, respBody, errMsg, duration)
|
||||
|
||||
if success {
|
||||
e.updateDeliveryStatus(d, database.DeliveryStatusDelivered)
|
||||
} else {
|
||||
e.updateDeliveryStatus(d, database.DeliveryStatusFailed)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) deliverRetry(_ context.Context, d *database.Delivery) {
|
||||
cfg, err := e.parseHTTPConfig(d.Target.Config)
|
||||
if err != nil {
|
||||
e.log.Error("invalid retry target config",
|
||||
"target_id", d.TargetID,
|
||||
"error", err,
|
||||
)
|
||||
e.recordResult(d, 1, false, 0, "", err.Error(), 0)
|
||||
e.updateDeliveryStatus(d, database.DeliveryStatusFailed)
|
||||
return
|
||||
}
|
||||
|
||||
// Determine attempt number from existing results
|
||||
var resultCount int64
|
||||
e.db.Model(&database.DeliveryResult{}).Where("delivery_id = ?", d.ID).Count(&resultCount)
|
||||
attemptNum := int(resultCount) + 1
|
||||
|
||||
// Check if we should wait before retrying (exponential backoff)
|
||||
if attemptNum > 1 {
|
||||
var lastResult database.DeliveryResult
|
||||
lookupErr := e.db.Where("delivery_id = ?", d.ID).Order("created_at DESC").First(&lastResult).Error
|
||||
if lookupErr == nil {
|
||||
shift := attemptNum - 2
|
||||
if shift > 30 {
|
||||
shift = 30
|
||||
}
|
||||
backoff := time.Duration(1<<uint(shift)) * time.Second //nolint:gosec // bounded above
|
||||
nextAttempt := lastResult.CreatedAt.Add(backoff)
|
||||
if time.Now().UTC().Before(nextAttempt) {
|
||||
// Not time to retry yet
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
statusCode, respBody, duration, err := e.doHTTPRequest(cfg, &d.Event)
|
||||
|
||||
success := err == nil && statusCode >= 200 && statusCode < 300
|
||||
errMsg := ""
|
||||
if err != nil {
|
||||
errMsg = err.Error()
|
||||
}
|
||||
|
||||
e.recordResult(d, attemptNum, success, statusCode, respBody, errMsg, duration)
|
||||
|
||||
if success {
|
||||
e.updateDeliveryStatus(d, database.DeliveryStatusDelivered)
|
||||
return
|
||||
}
|
||||
|
||||
maxRetries := d.Target.MaxRetries
|
||||
if maxRetries <= 0 {
|
||||
maxRetries = 5 // default
|
||||
}
|
||||
|
||||
if attemptNum >= maxRetries {
|
||||
e.updateDeliveryStatus(d, database.DeliveryStatusFailed)
|
||||
} else {
|
||||
e.updateDeliveryStatus(d, database.DeliveryStatusRetrying)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) deliverDatabase(d *database.Delivery) {
|
||||
// The event is already stored in the database; mark as delivered.
|
||||
e.recordResult(d, 1, true, 0, "", "", 0)
|
||||
e.updateDeliveryStatus(d, database.DeliveryStatusDelivered)
|
||||
}
|
||||
|
||||
func (e *Engine) deliverLog(d *database.Delivery) {
|
||||
e.log.Info("webhook event delivered to log target",
|
||||
"delivery_id", d.ID,
|
||||
"event_id", d.EventID,
|
||||
"target_id", d.TargetID,
|
||||
"target_name", d.Target.Name,
|
||||
"method", d.Event.Method,
|
||||
"content_type", d.Event.ContentType,
|
||||
"body_length", len(d.Event.Body),
|
||||
)
|
||||
e.recordResult(d, 1, true, 0, "", "", 0)
|
||||
e.updateDeliveryStatus(d, database.DeliveryStatusDelivered)
|
||||
}
|
||||
|
||||
// doHTTPRequest performs the outbound HTTP POST to a target URL.
|
||||
func (e *Engine) doHTTPRequest(cfg *HTTPTargetConfig, event *database.Event) (statusCode int, respBody string, durationMs int64, err error) {
|
||||
start := time.Now()
|
||||
|
||||
req, err := http.NewRequest(http.MethodPost, cfg.URL, bytes.NewReader([]byte(event.Body)))
|
||||
if err != nil {
|
||||
return 0, "", 0, fmt.Errorf("creating request: %w", err)
|
||||
}
|
||||
|
||||
// Set content type from original event
|
||||
if event.ContentType != "" {
|
||||
req.Header.Set("Content-Type", event.ContentType)
|
||||
}
|
||||
|
||||
// Apply original headers (filtered)
|
||||
var originalHeaders map[string][]string
|
||||
if event.Headers != "" {
|
||||
if jsonErr := json.Unmarshal([]byte(event.Headers), &originalHeaders); jsonErr == nil {
|
||||
for k, vals := range originalHeaders {
|
||||
if isForwardableHeader(k) {
|
||||
for _, v := range vals {
|
||||
req.Header.Add(k, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Apply target-specific headers (override)
|
||||
for k, v := range cfg.Headers {
|
||||
req.Header.Set(k, v)
|
||||
}
|
||||
|
||||
req.Header.Set("User-Agent", "webhooker/1.0")
|
||||
|
||||
client := e.client
|
||||
if cfg.Timeout > 0 {
|
||||
client = &http.Client{Timeout: time.Duration(cfg.Timeout) * time.Second}
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
durationMs = time.Since(start).Milliseconds()
|
||||
if err != nil {
|
||||
return 0, "", durationMs, fmt.Errorf("sending request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, readErr := io.ReadAll(io.LimitReader(resp.Body, maxBodyLog))
|
||||
if readErr != nil {
|
||||
return resp.StatusCode, "", durationMs, fmt.Errorf("reading response body: %w", readErr)
|
||||
}
|
||||
|
||||
return resp.StatusCode, string(body), durationMs, nil
|
||||
}
|
||||
|
||||
func (e *Engine) recordResult(d *database.Delivery, attemptNum int, success bool, statusCode int, respBody, errMsg string, durationMs int64) {
|
||||
result := &database.DeliveryResult{
|
||||
DeliveryID: d.ID,
|
||||
AttemptNum: attemptNum,
|
||||
Success: success,
|
||||
StatusCode: statusCode,
|
||||
ResponseBody: truncate(respBody, maxBodyLog),
|
||||
Error: errMsg,
|
||||
Duration: durationMs,
|
||||
}
|
||||
|
||||
if err := e.db.Create(result).Error; err != nil {
|
||||
e.log.Error("failed to record delivery result",
|
||||
"delivery_id", d.ID,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) updateDeliveryStatus(d *database.Delivery, status database.DeliveryStatus) {
|
||||
if err := e.db.Model(d).Update("status", status).Error; err != nil {
|
||||
e.log.Error("failed to update delivery status",
|
||||
"delivery_id", d.ID,
|
||||
"status", status,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) parseHTTPConfig(configJSON string) (*HTTPTargetConfig, error) {
|
||||
if configJSON == "" {
|
||||
return nil, fmt.Errorf("empty target config")
|
||||
}
|
||||
var cfg HTTPTargetConfig
|
||||
if err := json.Unmarshal([]byte(configJSON), &cfg); err != nil {
|
||||
return nil, fmt.Errorf("parsing config JSON: %w", err)
|
||||
}
|
||||
if cfg.URL == "" {
|
||||
return nil, fmt.Errorf("target URL is required")
|
||||
}
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
// isForwardableHeader returns true if the header should be forwarded to targets.
|
||||
// Hop-by-hop headers and internal headers are excluded.
|
||||
func isForwardableHeader(name string) bool {
|
||||
switch http.CanonicalHeaderKey(name) {
|
||||
case "Host", "Connection", "Keep-Alive", "Transfer-Encoding",
|
||||
"Te", "Trailer", "Upgrade", "Proxy-Authorization",
|
||||
"Proxy-Connection", "Content-Length":
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func truncate(s string, maxLen int) string {
|
||||
if len(s) <= maxLen {
|
||||
return s
|
||||
}
|
||||
return s[:maxLen]
|
||||
}
|
||||
Reference in New Issue
Block a user