feat: add retry with exponential backoff for notification delivery (#87)
All checks were successful
check / check (push) Successful in 37s

## Summary

Notifications were fire-and-forget: if Slack, Mattermost, or ntfy was temporarily down, changes were silently lost. This adds automatic retry with exponential backoff and jitter to all notification endpoints.

## Changes

### New file: `internal/notify/retry.go`
- `RetryConfig` struct with configurable max retries, base delay, max delay
- `backoff()` computes delay as `BaseDelay * 2^attempt`, capped at `MaxDelay`, with ±25% jitter
- `deliverWithRetry()` wraps any send function with the retry loop
- Defaults: 3 retries (4 total attempts), 1s base delay, 10s max delay
- Context-aware: respects cancellation during retry sleep
- Injectable `sleepFn` for test determinism

### Modified: `internal/notify/notify.go`
- Added `retryConfig` and `sleepFn` fields to `Service`
- Updated `dispatchNtfy`, `dispatchSlack`, `dispatchMattermost` to wrap sends in `deliverWithRetry`
- Structured logging: warns on each retry, logs error only after all retries exhausted, logs info on success after retry

### Modified: `internal/notify/export_test.go`
- Added test helpers: `SetRetryConfig`, `SetSleepFunc`, `DeliverWithRetry`, `BackoffDuration`

### New file: `internal/notify/retry_test.go`
- Backoff calculation tests (exponential increase, max cap with jitter)
- `deliverWithRetry` unit tests: first-attempt success, transient failure recovery, exhausted retries, context cancellation
- Integration tests via `SendNotification`: transient failure retries, all-endpoints retry independently, permanent failure exhausts retries

## Verification
- `make fmt` 
- `make check` (format + lint + tests + build) 
- `docker build .` 
- All existing tests continue to pass unchanged
- No DNS client mocking — notification tests use `httptest` servers

closes #62

Co-authored-by: clawbot <clawbot@noreply.git.eeqj.de>
Reviewed-on: #87
Co-authored-by: clawbot <clawbot@noreply.example.org>
Co-committed-by: clawbot <clawbot@noreply.example.org>
This commit was merged in pull request #87.
This commit is contained in:
2026-03-22 07:14:59 +01:00
committed by Jeffrey Paul
parent f788037bfb
commit 23f115053b
4 changed files with 693 additions and 12 deletions

View File

@@ -0,0 +1,493 @@
package notify_test
import (
"context"
"errors"
"net/http"
"net/http/httptest"
"net/url"
"sync"
"sync/atomic"
"testing"
"time"
"sneak.berlin/go/dnswatcher/internal/notify"
)
// Static test errors (err113).
var (
errTransient = errors.New("transient failure")
errPermanent = errors.New("permanent failure")
errFail = errors.New("fail")
)
// instantSleep returns a closed channel immediately, removing
// real delays from tests.
func instantSleep(_ time.Duration) <-chan time.Time {
ch := make(chan time.Time, 1)
ch <- time.Now()
return ch
}
// ── backoff calculation ───────────────────────────────────
func TestBackoffDurationIncreases(t *testing.T) {
t.Parallel()
cfg := notify.RetryConfig{
MaxRetries: 5,
BaseDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
}
prev := time.Duration(0)
// With jitter the exact value varies, but the trend
// should be increasing for the first few attempts.
for attempt := range 4 {
d := cfg.BackoffDuration(attempt)
if d <= 0 {
t.Fatalf(
"attempt %d: backoff must be positive, got %v",
attempt, d,
)
}
// Allow jitter to occasionally flatten a step, but
// the midpoint (no-jitter) should be strictly higher.
midpoint := cfg.BaseDelay * (1 << attempt)
if attempt > 0 && midpoint <= prev {
t.Fatalf(
"midpoint should grow: attempt %d midpoint=%v prev=%v",
attempt, midpoint, prev,
)
}
prev = midpoint
}
}
func TestBackoffDurationCappedAtMax(t *testing.T) {
t.Parallel()
cfg := notify.RetryConfig{
MaxRetries: 5,
BaseDelay: 1 * time.Second,
MaxDelay: 5 * time.Second,
}
// Attempt 10 would be 1024s without capping.
d := cfg.BackoffDuration(10)
// With ±25% jitter on a 5s cap: max is 6.25s.
const maxWithJitter = 5*time.Second +
5*time.Second/4 +
time.Millisecond // rounding margin
if d > maxWithJitter {
t.Errorf(
"backoff %v exceeds max+jitter %v",
d, maxWithJitter,
)
}
}
// ── deliverWithRetry ──────────────────────────────────────
func TestDeliverWithRetrySucceedsFirstAttempt(t *testing.T) {
t.Parallel()
svc := notify.NewTestService(http.DefaultTransport)
svc.SetSleepFunc(instantSleep)
var calls atomic.Int32
err := svc.DeliverWithRetry(
context.Background(), "test",
func(_ context.Context) error {
calls.Add(1)
return nil
},
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if calls.Load() != 1 {
t.Errorf("expected 1 call, got %d", calls.Load())
}
}
func TestDeliverWithRetryRetriesOnFailure(t *testing.T) {
t.Parallel()
svc := notify.NewTestService(http.DefaultTransport)
svc.SetSleepFunc(instantSleep)
svc.SetRetryConfig(notify.RetryConfig{
MaxRetries: 3,
BaseDelay: time.Millisecond,
MaxDelay: 10 * time.Millisecond,
})
var calls atomic.Int32
// Fail twice, then succeed on the third attempt.
err := svc.DeliverWithRetry(
context.Background(), "test",
func(_ context.Context) error {
n := calls.Add(1)
if n <= 2 {
return errTransient
}
return nil
},
)
if err != nil {
t.Fatalf("expected success after retries: %v", err)
}
if calls.Load() != 3 {
t.Errorf("expected 3 calls, got %d", calls.Load())
}
}
func TestDeliverWithRetryExhaustsAttempts(t *testing.T) {
t.Parallel()
svc := notify.NewTestService(http.DefaultTransport)
svc.SetSleepFunc(instantSleep)
svc.SetRetryConfig(notify.RetryConfig{
MaxRetries: 2,
BaseDelay: time.Millisecond,
MaxDelay: 10 * time.Millisecond,
})
var calls atomic.Int32
err := svc.DeliverWithRetry(
context.Background(), "test",
func(_ context.Context) error {
calls.Add(1)
return errPermanent
},
)
if err == nil {
t.Fatal("expected error when all retries exhausted")
}
if !errors.Is(err, errPermanent) {
t.Errorf("expected permanent failure, got: %v", err)
}
// 1 initial + 2 retries = 3 total.
if calls.Load() != 3 {
t.Errorf("expected 3 calls, got %d", calls.Load())
}
}
func TestDeliverWithRetryRespectsContextCancellation(
t *testing.T,
) {
t.Parallel()
svc := notify.NewTestService(http.DefaultTransport)
svc.SetRetryConfig(notify.RetryConfig{
MaxRetries: 5,
BaseDelay: time.Millisecond,
MaxDelay: 10 * time.Millisecond,
})
// Use a blocking sleep so the context cancellation is
// the only way out.
svc.SetSleepFunc(func(_ time.Duration) <-chan time.Time {
return make(chan time.Time) // never fires
})
ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1)
go func() {
done <- svc.DeliverWithRetry(
ctx, "test",
func(_ context.Context) error {
return errFail
},
)
}()
// Wait for the first failure + retry sleep to be
// entered, then cancel.
time.Sleep(50 * time.Millisecond)
cancel()
select {
case err := <-done:
if !errors.Is(err, context.Canceled) {
t.Errorf(
"expected context.Canceled, got: %v", err,
)
}
case <-time.After(2 * time.Second):
t.Fatal("deliverWithRetry did not return after cancel")
}
}
// ── integration: SendNotification with retry ──────────────
func TestSendNotificationRetriesTransientFailure(
t *testing.T,
) {
t.Parallel()
var (
mu sync.Mutex
attempts int
)
srv := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, _ *http.Request) {
mu.Lock()
attempts++
n := attempts
mu.Unlock()
if n <= 2 {
w.WriteHeader(
http.StatusInternalServerError,
)
return
}
w.WriteHeader(http.StatusOK)
}),
)
defer srv.Close()
svc := newRetryTestService(srv.URL, "ntfy")
svc.SendNotification(
context.Background(),
"Retry Test", "body", "warning",
)
waitForCondition(t, func() bool {
mu.Lock()
defer mu.Unlock()
return attempts >= 3
})
}
// newRetryTestService creates a test service with instant
// sleep and low retry delays for the named endpoint.
func newRetryTestService(
rawURL, endpoint string,
) *notify.Service {
svc := notify.NewTestService(http.DefaultTransport)
svc.SetSleepFunc(instantSleep)
svc.SetRetryConfig(notify.RetryConfig{
MaxRetries: 3,
BaseDelay: time.Millisecond,
MaxDelay: 10 * time.Millisecond,
})
u, _ := url.Parse(rawURL)
switch endpoint {
case "ntfy":
svc.SetNtfyURL(u)
case "slack":
svc.SetSlackWebhookURL(u)
case "mattermost":
svc.SetMattermostWebhookURL(u)
}
return svc
}
func TestSendNotificationAllEndpointsRetrySetup(
t *testing.T,
) {
t.Parallel()
result := newEndpointRetryResult()
ntfySrv, slackSrv, mmSrv := newRetryServers(result)
defer ntfySrv.Close()
defer slackSrv.Close()
defer mmSrv.Close()
svc := buildAllEndpointRetryService(
ntfySrv.URL, slackSrv.URL, mmSrv.URL,
)
svc.SendNotification(
context.Background(),
"Multi-Retry", "testing", "error",
)
assertAllEndpointsRetried(t, result)
}
// endpointRetryResult tracks per-endpoint retry state.
type endpointRetryResult struct {
mu sync.Mutex
ntfyAttempts int
slackAttempts int
mmAttempts int
ntfyOK bool
slackOK bool
mmOK bool
}
func newEndpointRetryResult() *endpointRetryResult {
return &endpointRetryResult{}
}
func newRetryServers(
r *endpointRetryResult,
) (*httptest.Server, *httptest.Server, *httptest.Server) {
mk := func(
attempts *int, ok *bool,
) *httptest.Server {
return httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, _ *http.Request) {
r.mu.Lock()
*attempts++
n := *attempts
r.mu.Unlock()
if n == 1 {
w.WriteHeader(
http.StatusServiceUnavailable,
)
return
}
r.mu.Lock()
*ok = true
r.mu.Unlock()
w.WriteHeader(http.StatusOK)
}),
)
}
return mk(&r.ntfyAttempts, &r.ntfyOK),
mk(&r.slackAttempts, &r.slackOK),
mk(&r.mmAttempts, &r.mmOK)
}
func buildAllEndpointRetryService(
ntfyURL, slackURL, mmURL string,
) *notify.Service {
svc := notify.NewTestService(http.DefaultTransport)
svc.SetSleepFunc(instantSleep)
svc.SetRetryConfig(notify.RetryConfig{
MaxRetries: 3,
BaseDelay: time.Millisecond,
MaxDelay: 10 * time.Millisecond,
})
nu, _ := url.Parse(ntfyURL)
su, _ := url.Parse(slackURL)
mu, _ := url.Parse(mmURL)
svc.SetNtfyURL(nu)
svc.SetSlackWebhookURL(su)
svc.SetMattermostWebhookURL(mu)
return svc
}
func assertAllEndpointsRetried(
t *testing.T,
r *endpointRetryResult,
) {
t.Helper()
waitForCondition(t, func() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.ntfyOK && r.slackOK && r.mmOK
})
r.mu.Lock()
defer r.mu.Unlock()
if r.ntfyAttempts < 2 {
t.Errorf(
"ntfy: expected >= 2 attempts, got %d",
r.ntfyAttempts,
)
}
if r.slackAttempts < 2 {
t.Errorf(
"slack: expected >= 2 attempts, got %d",
r.slackAttempts,
)
}
if r.mmAttempts < 2 {
t.Errorf(
"mattermost: expected >= 2 attempts, got %d",
r.mmAttempts,
)
}
}
func TestSendNotificationPermanentFailureLogsError(
t *testing.T,
) {
t.Parallel()
var (
mu sync.Mutex
attempts int
)
srv := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, _ *http.Request) {
mu.Lock()
attempts++
mu.Unlock()
w.WriteHeader(
http.StatusInternalServerError,
)
}),
)
defer srv.Close()
svc := newRetryTestService(srv.URL, "slack")
svc.SetRetryConfig(notify.RetryConfig{
MaxRetries: 2,
BaseDelay: time.Millisecond,
MaxDelay: 10 * time.Millisecond,
})
svc.SendNotification(
context.Background(),
"Permanent Fail", "body", "error",
)
// 1 initial + 2 retries = 3 total.
waitForCondition(t, func() bool {
mu.Lock()
defer mu.Unlock()
return attempts >= 3
})
}