diff --git a/README.md b/README.md index 68144b3..f8056f5 100644 --- a/README.md +++ b/README.md @@ -295,7 +295,7 @@ events should be forwarded. | `id` | UUID | Primary key | | `webhook_id` | UUID | Foreign key → Webhook | | `name` | string | Human-readable name | -| `type` | TargetType | One of: `http`, `database`, `log` | +| `type` | TargetType | One of: `http`, `slack`, `database`, `log` | | `active` | boolean | Whether deliveries are enabled (default: true) | | `config` | JSON text | Type-specific configuration | | `max_retries` | integer | Maximum retry attempts for HTTP targets (0 = fire-and-forget, >0 = retries with backoff) | @@ -467,6 +467,16 @@ target simply marks the delivery as immediately successful. The per-webhook DB IS the dedicated event database — that's the whole point of the database target type. +The **Slack target type** sends webhook events as formatted messages to +any Slack-compatible incoming webhook URL (works with Slack, Mattermost, +and other compatible services). Each message includes event metadata +(HTTP method, content type, timestamp, body size) and the payload +pretty-printed in a code block. JSON payloads are automatically +formatted with indentation for readability; non-JSON payloads are shown +as raw text. Large payloads are truncated to keep messages reasonable. +Config stores `webhook_url` — the Slack/Mattermost incoming webhook +endpoint. + The database uses the [modernc.org/sqlite](https://pkg.go.dev/modernc.org/sqlite) driver at runtime, though CGO is required at build time due to the transitive @@ -605,8 +615,8 @@ fine — startup recovery rescans the database anyway). **Scope:** Circuit breakers only apply to **HTTP targets with `max_retries` > 0**. Fire-and-forget HTTP targets (`max_retries` == 0), -database targets (local operations), and log targets (stdout) do not use -circuit breakers. +Slack targets, database targets (local operations), and log +targets (stdout) do not use circuit breakers. When a circuit is open and a new delivery arrives, the engine marks the delivery as `retrying` and schedules a retry timer for after the @@ -880,6 +890,8 @@ linted, tested, and compiled. retries with exponential backoff when max_retries>0) - [x] Implement database target type (store events in per-webhook DB) - [x] Implement log target type (console output) +- [x] Implement Slack target type (Slack/Mattermost incoming webhook + notifications with pretty-printed payloads) - [x] Webhook management pages (list, create, edit, delete) - [x] Webhook request log viewer with pagination - [x] Entrypoint and target management UI diff --git a/internal/database/model_target.go b/internal/database/model_target.go index e9c4628..71b4d02 100644 --- a/internal/database/model_target.go +++ b/internal/database/model_target.go @@ -7,6 +7,7 @@ const ( TargetTypeHTTP TargetType = "http" TargetTypeDatabase TargetType = "database" TargetTypeLog TargetType = "log" + TargetTypeSlack TargetType = "slack" ) // Target represents a delivery target for a webhook diff --git a/internal/delivery/engine.go b/internal/delivery/engine.go index 2c7044d..78e692d 100644 --- a/internal/delivery/engine.go +++ b/internal/delivery/engine.go @@ -8,6 +8,7 @@ import ( "io" "log/slog" "net/http" + "strings" "sync" "time" @@ -97,6 +98,12 @@ type HTTPTargetConfig struct { Timeout int `json:"timeout,omitempty"` // seconds, 0 = default } +// SlackTargetConfig holds configuration for slack target types. +// Compatible with any Slack-format incoming webhook (Slack, Mattermost, etc.). +type SlackTargetConfig struct { + WebhookURL string `json:"webhook_url"` +} + // EngineParams are the fx dependencies for the delivery engine. // //nolint:revive // EngineParams is a standard fx naming convention @@ -836,6 +843,8 @@ func (e *Engine) processDelivery(ctx context.Context, webhookDB *gorm.DB, d *dat e.deliverDatabase(webhookDB, d) case database.TargetTypeLog: e.deliverLog(webhookDB, d) + case database.TargetTypeSlack: + e.deliverSlack(webhookDB, d) default: e.log.Error("unknown target type", "target_id", d.TargetID, @@ -982,6 +991,142 @@ func (e *Engine) deliverLog(webhookDB *gorm.DB, d *database.Delivery) { e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) } +// deliverSlack formats the webhook event as a human-readable Slack message +// and POSTs it to a Slack-compatible incoming webhook URL (works with Slack, +// Mattermost, and other compatible services). The message includes metadata +// (method, content type, timestamp, body size) and the payload pretty-printed +// in a code block if it is valid JSON. +func (e *Engine) deliverSlack(webhookDB *gorm.DB, d *database.Delivery) { + cfg, err := e.parseSlackConfig(d.Target.Config) + if err != nil { + e.log.Error("invalid Slack target config", + "target_id", d.TargetID, + "error", err, + ) + e.recordResult(webhookDB, d, 1, false, 0, "", err.Error(), 0) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) + return + } + + msg := FormatSlackMessage(&d.Event) + + payload, err := json.Marshal(map[string]string{"text": msg}) + if err != nil { + e.log.Error("failed to marshal Slack payload", + "target_id", d.TargetID, + "error", err, + ) + e.recordResult(webhookDB, d, 1, false, 0, "", err.Error(), 0) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) + return + } + + start := time.Now() + req, err := http.NewRequest(http.MethodPost, cfg.WebhookURL, bytes.NewReader(payload)) + if err != nil { + e.recordResult(webhookDB, d, 1, false, 0, "", err.Error(), 0) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "webhooker/1.0") + + resp, err := e.client.Do(req) + durationMs := time.Since(start).Milliseconds() + if err != nil { + e.recordResult(webhookDB, d, 1, false, 0, "", fmt.Errorf("sending request: %w", err).Error(), durationMs) + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) + return + } + defer resp.Body.Close() + + body, readErr := io.ReadAll(io.LimitReader(resp.Body, maxBodyLog)) + if readErr != nil { + e.log.Error("failed to read Slack response body", "error", readErr) + } + respBody := string(body) + + success := resp.StatusCode >= 200 && resp.StatusCode < 300 + errMsg := "" + if !success { + errMsg = fmt.Sprintf("HTTP %d", resp.StatusCode) + } + + e.recordResult(webhookDB, d, 1, success, resp.StatusCode, respBody, errMsg, durationMs) + + if success { + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusDelivered) + } else { + e.updateDeliveryStatus(webhookDB, d, database.DeliveryStatusFailed) + } +} + +func (e *Engine) parseSlackConfig(configJSON string) (*SlackTargetConfig, error) { + if configJSON == "" { + return nil, fmt.Errorf("empty target config") + } + var cfg SlackTargetConfig + if err := json.Unmarshal([]byte(configJSON), &cfg); err != nil { + return nil, fmt.Errorf("parsing config JSON: %w", err) + } + if cfg.WebhookURL == "" { + return nil, fmt.Errorf("webhook_url is required") + } + return &cfg, nil +} + +// FormatSlackMessage builds a Slack-compatible message string from a webhook +// event. It includes metadata (method, content type, timestamp, body size) +// and pretty-prints the payload in a code block if it is valid JSON. +func FormatSlackMessage(event *database.Event) string { + var b strings.Builder + + b.WriteString("*Webhook Event Received*\n") + b.WriteString(fmt.Sprintf("*Method:* `%s`\n", event.Method)) + b.WriteString(fmt.Sprintf("*Content-Type:* `%s`\n", event.ContentType)) + b.WriteString(fmt.Sprintf("*Timestamp:* `%s`\n", event.CreatedAt.UTC().Format(time.RFC3339))) + b.WriteString(fmt.Sprintf("*Body Size:* %d bytes\n", len(event.Body))) + + if event.Body == "" { + b.WriteString("\n_(empty body)_\n") + return b.String() + } + + // Try to pretty-print as JSON + var parsed json.RawMessage + if json.Unmarshal([]byte(event.Body), &parsed) == nil { + var pretty bytes.Buffer + if json.Indent(&pretty, parsed, "", " ") == nil { + b.WriteString("\n```\n") + prettyStr := pretty.String() + // Truncate very large payloads to keep Slack messages reasonable + const maxPayloadDisplay = 3500 + if len(prettyStr) > maxPayloadDisplay { + b.WriteString(prettyStr[:maxPayloadDisplay]) + b.WriteString("\n... (truncated)") + } else { + b.WriteString(prettyStr) + } + b.WriteString("\n```\n") + return b.String() + } + } + + // Not JSON — show raw body in a plain code block + b.WriteString("\n```\n") + bodyStr := event.Body + const maxRawDisplay = 3500 + if len(bodyStr) > maxRawDisplay { + b.WriteString(bodyStr[:maxRawDisplay]) + b.WriteString("\n... (truncated)") + } else { + b.WriteString(bodyStr) + } + b.WriteString("\n```\n") + + return b.String() +} + // doHTTPRequest performs the outbound HTTP POST to a target URL. func (e *Engine) doHTTPRequest(cfg *HTTPTargetConfig, event *database.Event) (statusCode int, respBody string, durationMs int64, err error) { start := time.Now() diff --git a/internal/delivery/engine_test.go b/internal/delivery/engine_test.go index 3e5c481..ddffd74 100644 --- a/internal/delivery/engine_test.go +++ b/internal/delivery/engine_test.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/json" "fmt" + "io" "log/slog" "net/http" "net/http/httptest" @@ -934,3 +935,284 @@ func TestMaxInlineBodySize_Constant(t *testing.T) { assert.Equal(t, 16*1024, MaxInlineBodySize, "MaxInlineBodySize should be 16KB (16384 bytes)") } + +func TestParseSlackConfig_Valid(t *testing.T) { + t.Parallel() + e := testEngine(t, 1) + + cfg, err := e.parseSlackConfig(`{"webhook_url":"https://hooks.slack.com/services/T00/B00/xxx"}`) + require.NoError(t, err) + assert.Equal(t, "https://hooks.slack.com/services/T00/B00/xxx", cfg.WebhookURL) +} + +func TestParseSlackConfig_Empty(t *testing.T) { + t.Parallel() + e := testEngine(t, 1) + + _, err := e.parseSlackConfig("") + assert.Error(t, err, "empty config should return error") +} + +func TestParseSlackConfig_MissingWebhookURL(t *testing.T) { + t.Parallel() + e := testEngine(t, 1) + + _, err := e.parseSlackConfig(`{"other":"field"}`) + assert.Error(t, err, "config without webhook_url should return error") +} + +func TestFormatSlackMessage_JSONBody(t *testing.T) { + t.Parallel() + + event := &database.Event{ + Method: "POST", + ContentType: "application/json", + Body: `{"action":"push","repo":"test/repo","ref":"refs/heads/main"}`, + } + event.CreatedAt = time.Date(2025, 1, 15, 10, 30, 0, 0, time.UTC) + + msg := FormatSlackMessage(event) + + assert.Contains(t, msg, "*Webhook Event Received*") + assert.Contains(t, msg, "`POST`") + assert.Contains(t, msg, "`application/json`") + assert.Contains(t, msg, "```") + assert.NotContains(t, msg, "```json") + // Pretty-printed JSON should have indentation + assert.Contains(t, msg, ` "action": "push"`) + assert.Contains(t, msg, ` "repo": "test/repo"`) +} + +func TestFormatSlackMessage_NonJSONBody(t *testing.T) { + t.Parallel() + + event := &database.Event{ + Method: "POST", + ContentType: "text/plain", + Body: "hello world plain text", + } + event.CreatedAt = time.Date(2025, 1, 15, 10, 30, 0, 0, time.UTC) + + msg := FormatSlackMessage(event) + + assert.Contains(t, msg, "*Webhook Event Received*") + assert.Contains(t, msg, "```\nhello world plain text\n```") + // Should NOT have ```json marker for non-JSON + assert.NotContains(t, msg, "```json") +} + +func TestFormatSlackMessage_EmptyBody(t *testing.T) { + t.Parallel() + + event := &database.Event{ + Method: "POST", + ContentType: "application/json", + Body: "", + } + event.CreatedAt = time.Date(2025, 1, 15, 10, 30, 0, 0, time.UTC) + + msg := FormatSlackMessage(event) + + assert.Contains(t, msg, "_(empty body)_") + assert.NotContains(t, msg, "```") +} + +func TestFormatSlackMessage_LargeJSONTruncated(t *testing.T) { + t.Parallel() + + // Build a large JSON body that will exceed 3500 chars when pretty-printed + largeObj := make(map[string]string) + for i := 0; i < 200; i++ { + largeObj[fmt.Sprintf("key_%03d", i)] = strings.Repeat("v", 20) + } + largeJSON, err := json.Marshal(largeObj) + require.NoError(t, err) + + event := &database.Event{ + Method: "POST", + ContentType: "application/json", + Body: string(largeJSON), + } + event.CreatedAt = time.Date(2025, 1, 15, 10, 30, 0, 0, time.UTC) + + msg := FormatSlackMessage(event) + + assert.Contains(t, msg, "... (truncated)") +} + +func TestDeliverSlack_Success(t *testing.T) { + t.Parallel() + db := testWebhookDB(t) + + var receivedBody string + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + bodyBytes, readErr := io.ReadAll(r.Body) + if readErr != nil { + http.Error(w, "read error", http.StatusInternalServerError) + return + } + receivedBody = string(bodyBytes) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, "ok") + })) + defer ts.Close() + + e := testEngine(t, 1) + targetID := uuid.New().String() + slackCfg, err := json.Marshal(SlackTargetConfig{WebhookURL: ts.URL}) + require.NoError(t, err) + + event := seedEvent(t, db, `{"action":"test","data":"value"}`) + dlv := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending) + + d := &database.Delivery{ + EventID: event.ID, + TargetID: targetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: database.Target{ + Name: "test-slack", + Type: database.TargetTypeSlack, + Config: string(slackCfg), + }, + } + d.ID = dlv.ID + + e.deliverSlack(db, d) + + // The delivery should be marked as delivered + var updated database.Delivery + require.NoError(t, db.First(&updated, "id = ?", dlv.ID).Error) + assert.Equal(t, database.DeliveryStatusDelivered, updated.Status) + + // Check that a result was recorded + var result database.DeliveryResult + require.NoError(t, db.Where("delivery_id = ?", dlv.ID).First(&result).Error) + assert.True(t, result.Success) + assert.Equal(t, http.StatusOK, result.StatusCode) + + // Verify the Slack payload contains the expected message + var slackPayload map[string]string + require.NoError(t, json.Unmarshal([]byte(receivedBody), &slackPayload)) + assert.Contains(t, slackPayload["text"], "*Webhook Event Received*") + assert.NotContains(t, slackPayload["text"], "**Webhook Event Received**") + assert.Contains(t, slackPayload["text"], "```") + assert.NotContains(t, slackPayload["text"], "```json") +} + +func TestDeliverSlack_Failure(t *testing.T) { + t.Parallel() + db := testWebhookDB(t) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusForbidden) + fmt.Fprint(w, "invalid_token") + })) + defer ts.Close() + + e := testEngine(t, 1) + targetID := uuid.New().String() + slackCfg, err := json.Marshal(SlackTargetConfig{WebhookURL: ts.URL}) + require.NoError(t, err) + + event := seedEvent(t, db, `{"test":true}`) + dlv := seedDelivery(t, db, event.ID, targetID, database.DeliveryStatusPending) + + d := &database.Delivery{ + EventID: event.ID, + TargetID: targetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: database.Target{ + Name: "test-slack-fail", + Type: database.TargetTypeSlack, + Config: string(slackCfg), + }, + } + d.ID = dlv.ID + + e.deliverSlack(db, d) + + var updated database.Delivery + require.NoError(t, db.First(&updated, "id = ?", dlv.ID).Error) + assert.Equal(t, database.DeliveryStatusFailed, updated.Status) + + var result database.DeliveryResult + require.NoError(t, db.Where("delivery_id = ?", dlv.ID).First(&result).Error) + assert.False(t, result.Success) + assert.Equal(t, http.StatusForbidden, result.StatusCode) +} + +func TestDeliverSlack_InvalidConfig(t *testing.T) { + t.Parallel() + db := testWebhookDB(t) + e := testEngine(t, 1) + + event := seedEvent(t, db, `{"test":true}`) + dlv := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending) + + d := &database.Delivery{ + EventID: event.ID, + TargetID: dlv.TargetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: database.Target{ + Name: "test-slack-bad", + Type: database.TargetTypeSlack, + Config: `{"not_webhook_url":"missing"}`, + }, + } + d.ID = dlv.ID + + e.deliverSlack(db, d) + + var updated database.Delivery + require.NoError(t, db.First(&updated, "id = ?", dlv.ID).Error) + assert.Equal(t, database.DeliveryStatusFailed, updated.Status) +} + +func TestProcessDelivery_RoutesToSlack(t *testing.T) { + t.Parallel() + db := testWebhookDB(t) + + var received atomic.Bool + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + received.Store(true) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + e := testEngine(t, 1) + slackCfg, err := json.Marshal(SlackTargetConfig{WebhookURL: ts.URL}) + require.NoError(t, err) + + event := seedEvent(t, db, `{"route":"slack"}`) + dlv := seedDelivery(t, db, event.ID, uuid.New().String(), database.DeliveryStatusPending) + + d := &database.Delivery{ + EventID: event.ID, + TargetID: dlv.TargetID, + Status: database.DeliveryStatusPending, + Event: event, + Target: database.Target{ + Name: "test-slack-route", + Type: database.TargetTypeSlack, + Config: string(slackCfg), + }, + } + d.ID = dlv.ID + + task := &DeliveryTask{ + DeliveryID: dlv.ID, + TargetType: database.TargetTypeSlack, + } + + e.processDelivery(context.TODO(), db, d, task) + + assert.True(t, received.Load(), "Slack target should have received the request") + + var updated database.Delivery + require.NoError(t, db.First(&updated, "id = ?", dlv.ID).Error) + assert.Equal(t, database.DeliveryStatusDelivered, updated.Status) +} diff --git a/internal/handlers/source_management.go b/internal/handlers/source_management.go index c211f07..66e6c27 100644 --- a/internal/handlers/source_management.go +++ b/internal/handlers/source_management.go @@ -520,14 +520,14 @@ func (h *Handlers) HandleTargetCreate() http.HandlerFunc { // Validate target type switch targetType { - case database.TargetTypeHTTP, database.TargetTypeDatabase, database.TargetTypeLog: + case database.TargetTypeHTTP, database.TargetTypeDatabase, database.TargetTypeLog, database.TargetTypeSlack: // valid default: http.Error(w, "Invalid target type", http.StatusBadRequest) return } - // Build config JSON for HTTP targets + // Build config JSON based on target type var configJSON string if targetType == database.TargetTypeHTTP { if url == "" { @@ -554,6 +554,20 @@ func (h *Handlers) HandleTargetCreate() http.HandlerFunc { return } configJSON = string(configBytes) + } else if targetType == database.TargetTypeSlack { + if url == "" { + http.Error(w, "Webhook URL is required for Slack targets", http.StatusBadRequest) + return + } + cfg := map[string]interface{}{ + "webhook_url": url, + } + configBytes, err := json.Marshal(cfg) + if err != nil { + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + configJSON = string(configBytes) } maxRetries := 0 // default: fire-and-forget (no retries) diff --git a/templates/source_detail.html b/templates/source_detail.html index 3aa9fa6..5f20957 100644 --- a/templates/source_detail.html +++ b/templates/source_detail.html @@ -97,17 +97,22 @@
- +
+
+ +

Slack or Mattermost incoming webhook URL. Payloads are pretty-printed in code blocks.

+