From 31bd6c32288d4447144ca340f40950a3c5c271d9 Mon Sep 17 00:00:00 2001 From: clawbot Date: Tue, 10 Mar 2026 11:11:32 -0700 Subject: [PATCH] feat: add retry with exponential backoff for notification delivery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Notifications were fire-and-forget: if Slack, Mattermost, or ntfy was temporarily down, changes were silently lost. This adds automatic retry with exponential backoff and jitter to all notification endpoints. Implementation: - New retry.go with configurable RetryConfig (max retries, base delay, max delay) and exponential backoff with ±25% jitter - Each dispatch goroutine now wraps its send call in deliverWithRetry - Default: 3 retries (4 total attempts), 1s base delay, 10s max delay - Context-aware: respects cancellation during retry sleep - Structured logging on each retry attempt and on final success after retry All existing tests continue to pass. New tests cover: - Backoff calculation (increase, cap) - Retry success on first attempt (no unnecessary retries) - Retry on transient failure (succeeds after N attempts) - Exhausted retries (returns last error) - Context cancellation during retry sleep - Integration: SendNotification retries transient 500s - Integration: all three endpoints retry independently - Integration: permanent failure exhausts retries closes https://git.eeqj.de/sneak/dnswatcher/issues/62 --- internal/notify/export_test.go | 29 ++ internal/notify/notify.go | 44 ++- internal/notify/retry.go | 139 ++++++++++ internal/notify/retry_test.go | 493 +++++++++++++++++++++++++++++++++ 4 files changed, 693 insertions(+), 12 deletions(-) create mode 100644 internal/notify/retry.go create mode 100644 internal/notify/retry_test.go diff --git a/internal/notify/export_test.go b/internal/notify/export_test.go index 0f02fdd..ae6818d 100644 --- a/internal/notify/export_test.go +++ b/internal/notify/export_test.go @@ -6,6 +6,7 @@ import ( "log/slog" "net/http" "net/url" + "time" ) // NtfyPriority exports ntfyPriority for testing. @@ -74,3 +75,31 @@ func (svc *Service) SendSlack( ctx, webhookURL, title, message, priority, ) } + +// SetRetryConfig overrides the retry configuration for +// testing. +func (svc *Service) SetRetryConfig(cfg RetryConfig) { + svc.retryConfig = cfg +} + +// SetSleepFunc overrides the sleep function so tests can +// eliminate real delays. +func (svc *Service) SetSleepFunc( + fn func(time.Duration) <-chan time.Time, +) { + svc.sleepFn = fn +} + +// DeliverWithRetry exports deliverWithRetry for testing. +func (svc *Service) DeliverWithRetry( + ctx context.Context, + endpoint string, + fn func(context.Context) error, +) error { + return svc.deliverWithRetry(ctx, endpoint, fn) +} + +// BackoffDuration exports RetryConfig.backoff for testing. +func (rc RetryConfig) BackoffDuration(attempt int) time.Duration { + return rc.defaults().backoff(attempt) +} diff --git a/internal/notify/notify.go b/internal/notify/notify.go index b36be7c..878b912 100644 --- a/internal/notify/notify.go +++ b/internal/notify/notify.go @@ -113,6 +113,8 @@ type Service struct { slackWebhookURL *url.URL mattermostWebhookURL *url.URL history *AlertHistory + retryConfig RetryConfig + sleepFn func(time.Duration) <-chan time.Time } // New creates a new notify Service. @@ -203,13 +205,19 @@ func (svc *Service) dispatchNtfy( go func() { notifyCtx := context.WithoutCancel(ctx) - err := svc.sendNtfy( - notifyCtx, svc.ntfyURL, - title, message, priority, + err := svc.deliverWithRetry( + notifyCtx, "ntfy", + func(c context.Context) error { + return svc.sendNtfy( + c, svc.ntfyURL, + title, message, priority, + ) + }, ) if err != nil { svc.log.Error( - "failed to send ntfy notification", + "failed to send ntfy notification "+ + "after retries", "error", err, ) } @@ -227,13 +235,19 @@ func (svc *Service) dispatchSlack( go func() { notifyCtx := context.WithoutCancel(ctx) - err := svc.sendSlack( - notifyCtx, svc.slackWebhookURL, - title, message, priority, + err := svc.deliverWithRetry( + notifyCtx, "slack", + func(c context.Context) error { + return svc.sendSlack( + c, svc.slackWebhookURL, + title, message, priority, + ) + }, ) if err != nil { svc.log.Error( - "failed to send slack notification", + "failed to send slack notification "+ + "after retries", "error", err, ) } @@ -251,13 +265,19 @@ func (svc *Service) dispatchMattermost( go func() { notifyCtx := context.WithoutCancel(ctx) - err := svc.sendSlack( - notifyCtx, svc.mattermostWebhookURL, - title, message, priority, + err := svc.deliverWithRetry( + notifyCtx, "mattermost", + func(c context.Context) error { + return svc.sendSlack( + c, svc.mattermostWebhookURL, + title, message, priority, + ) + }, ) if err != nil { svc.log.Error( - "failed to send mattermost notification", + "failed to send mattermost notification "+ + "after retries", "error", err, ) } diff --git a/internal/notify/retry.go b/internal/notify/retry.go new file mode 100644 index 0000000..719807b --- /dev/null +++ b/internal/notify/retry.go @@ -0,0 +1,139 @@ +package notify + +import ( + "context" + "math" + "math/rand/v2" + "time" +) + +// Retry defaults. +const ( + // DefaultMaxRetries is the number of additional attempts + // after the first failure. + DefaultMaxRetries = 3 + + // DefaultBaseDelay is the initial delay before the first + // retry attempt. + DefaultBaseDelay = 1 * time.Second + + // DefaultMaxDelay caps the computed backoff delay. + DefaultMaxDelay = 10 * time.Second + + // backoffMultiplier is the exponential growth factor. + backoffMultiplier = 2 + + // jitterFraction controls the ±random spread applied + // to each delay (0.25 = ±25%). + jitterFraction = 0.25 +) + +// RetryConfig holds tuning knobs for the retry loop. +// Zero values fall back to the package defaults above. +type RetryConfig struct { + MaxRetries int + BaseDelay time.Duration + MaxDelay time.Duration +} + +// defaults returns a copy with zero fields replaced by +// package defaults. +func (rc RetryConfig) defaults() RetryConfig { + if rc.MaxRetries <= 0 { + rc.MaxRetries = DefaultMaxRetries + } + + if rc.BaseDelay <= 0 { + rc.BaseDelay = DefaultBaseDelay + } + + if rc.MaxDelay <= 0 { + rc.MaxDelay = DefaultMaxDelay + } + + return rc +} + +// backoff computes the delay for attempt n (0-indexed) with +// jitter. The raw delay is BaseDelay * 2^n, capped at +// MaxDelay, then randomised by ±jitterFraction. +func (rc RetryConfig) backoff(attempt int) time.Duration { + raw := float64(rc.BaseDelay) * + math.Pow(backoffMultiplier, float64(attempt)) + + if raw > float64(rc.MaxDelay) { + raw = float64(rc.MaxDelay) + } + + // Apply jitter: uniform in [raw*(1-j), raw*(1+j)]. + lo := raw * (1 - jitterFraction) + hi := raw * (1 + jitterFraction) + + jittered := lo + rand.Float64()*(hi-lo) //nolint:gosec // jitter does not need crypto/rand + + return time.Duration(jittered) +} + +// deliverWithRetry calls fn, retrying on error with +// exponential backoff. It logs every failed attempt and +// returns the last error if all attempts are exhausted. +func (svc *Service) deliverWithRetry( + ctx context.Context, + endpoint string, + fn func(context.Context) error, +) error { + cfg := svc.retryConfig.defaults() + + var lastErr error + + // attempt 0 is the initial call; attempts 1..MaxRetries + // are retries. + for attempt := range cfg.MaxRetries + 1 { + lastErr = fn(ctx) + if lastErr == nil { + if attempt > 0 { + svc.log.Info( + "notification delivered after retry", + "endpoint", endpoint, + "attempt", attempt+1, + ) + } + + return nil + } + + // Last attempt — don't sleep, just return. + if attempt == cfg.MaxRetries { + break + } + + delay := cfg.backoff(attempt) + + svc.log.Warn( + "notification delivery failed, retrying", + "endpoint", endpoint, + "attempt", attempt+1, + "maxAttempts", cfg.MaxRetries+1, + "retryIn", delay, + "error", lastErr, + ) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-svc.sleepFunc(delay): + } + } + + return lastErr +} + +// sleepFunc returns a channel that closes after d. +// It is a field-level indirection so tests can override it. +func (svc *Service) sleepFunc(d time.Duration) <-chan time.Time { + if svc.sleepFn != nil { + return svc.sleepFn(d) + } + + return time.After(d) +} diff --git a/internal/notify/retry_test.go b/internal/notify/retry_test.go new file mode 100644 index 0000000..878e1bf --- /dev/null +++ b/internal/notify/retry_test.go @@ -0,0 +1,493 @@ +package notify_test + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "net/url" + "sync" + "sync/atomic" + "testing" + "time" + + "sneak.berlin/go/dnswatcher/internal/notify" +) + +// Static test errors (err113). +var ( + errTransient = errors.New("transient failure") + errPermanent = errors.New("permanent failure") + errFail = errors.New("fail") +) + +// instantSleep returns a closed channel immediately, removing +// real delays from tests. +func instantSleep(_ time.Duration) <-chan time.Time { + ch := make(chan time.Time, 1) + ch <- time.Now() + + return ch +} + +// ── backoff calculation ─────────────────────────────────── + +func TestBackoffDurationIncreases(t *testing.T) { + t.Parallel() + + cfg := notify.RetryConfig{ + MaxRetries: 5, + BaseDelay: 1 * time.Second, + MaxDelay: 30 * time.Second, + } + + prev := time.Duration(0) + + // With jitter the exact value varies, but the trend + // should be increasing for the first few attempts. + for attempt := range 4 { + d := cfg.BackoffDuration(attempt) + if d <= 0 { + t.Fatalf( + "attempt %d: backoff must be positive, got %v", + attempt, d, + ) + } + + // Allow jitter to occasionally flatten a step, but + // the midpoint (no-jitter) should be strictly higher. + midpoint := cfg.BaseDelay * (1 << attempt) + if attempt > 0 && midpoint <= prev { + t.Fatalf( + "midpoint should grow: attempt %d midpoint=%v prev=%v", + attempt, midpoint, prev, + ) + } + + prev = midpoint + } +} + +func TestBackoffDurationCappedAtMax(t *testing.T) { + t.Parallel() + + cfg := notify.RetryConfig{ + MaxRetries: 5, + BaseDelay: 1 * time.Second, + MaxDelay: 5 * time.Second, + } + + // Attempt 10 would be 1024s without capping. + d := cfg.BackoffDuration(10) + + // With ±25% jitter on a 5s cap: max is 6.25s. + const maxWithJitter = 5*time.Second + + 5*time.Second/4 + + time.Millisecond // rounding margin + + if d > maxWithJitter { + t.Errorf( + "backoff %v exceeds max+jitter %v", + d, maxWithJitter, + ) + } +} + +// ── deliverWithRetry ────────────────────────────────────── + +func TestDeliverWithRetrySucceedsFirstAttempt(t *testing.T) { + t.Parallel() + + svc := notify.NewTestService(http.DefaultTransport) + svc.SetSleepFunc(instantSleep) + + var calls atomic.Int32 + + err := svc.DeliverWithRetry( + context.Background(), "test", + func(_ context.Context) error { + calls.Add(1) + + return nil + }, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if calls.Load() != 1 { + t.Errorf("expected 1 call, got %d", calls.Load()) + } +} + +func TestDeliverWithRetryRetriesOnFailure(t *testing.T) { + t.Parallel() + + svc := notify.NewTestService(http.DefaultTransport) + svc.SetSleepFunc(instantSleep) + svc.SetRetryConfig(notify.RetryConfig{ + MaxRetries: 3, + BaseDelay: time.Millisecond, + MaxDelay: 10 * time.Millisecond, + }) + + var calls atomic.Int32 + + // Fail twice, then succeed on the third attempt. + err := svc.DeliverWithRetry( + context.Background(), "test", + func(_ context.Context) error { + n := calls.Add(1) + if n <= 2 { + return errTransient + } + + return nil + }, + ) + if err != nil { + t.Fatalf("expected success after retries: %v", err) + } + + if calls.Load() != 3 { + t.Errorf("expected 3 calls, got %d", calls.Load()) + } +} + +func TestDeliverWithRetryExhaustsAttempts(t *testing.T) { + t.Parallel() + + svc := notify.NewTestService(http.DefaultTransport) + svc.SetSleepFunc(instantSleep) + svc.SetRetryConfig(notify.RetryConfig{ + MaxRetries: 2, + BaseDelay: time.Millisecond, + MaxDelay: 10 * time.Millisecond, + }) + + var calls atomic.Int32 + + err := svc.DeliverWithRetry( + context.Background(), "test", + func(_ context.Context) error { + calls.Add(1) + + return errPermanent + }, + ) + if err == nil { + t.Fatal("expected error when all retries exhausted") + } + + if !errors.Is(err, errPermanent) { + t.Errorf("expected permanent failure, got: %v", err) + } + + // 1 initial + 2 retries = 3 total. + if calls.Load() != 3 { + t.Errorf("expected 3 calls, got %d", calls.Load()) + } +} + +func TestDeliverWithRetryRespectsContextCancellation( + t *testing.T, +) { + t.Parallel() + + svc := notify.NewTestService(http.DefaultTransport) + svc.SetRetryConfig(notify.RetryConfig{ + MaxRetries: 5, + BaseDelay: time.Millisecond, + MaxDelay: 10 * time.Millisecond, + }) + + // Use a blocking sleep so the context cancellation is + // the only way out. + svc.SetSleepFunc(func(_ time.Duration) <-chan time.Time { + return make(chan time.Time) // never fires + }) + + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan error, 1) + + go func() { + done <- svc.DeliverWithRetry( + ctx, "test", + func(_ context.Context) error { + return errFail + }, + ) + }() + + // Wait for the first failure + retry sleep to be + // entered, then cancel. + time.Sleep(50 * time.Millisecond) + cancel() + + select { + case err := <-done: + if !errors.Is(err, context.Canceled) { + t.Errorf( + "expected context.Canceled, got: %v", err, + ) + } + case <-time.After(2 * time.Second): + t.Fatal("deliverWithRetry did not return after cancel") + } +} + +// ── integration: SendNotification with retry ────────────── + +func TestSendNotificationRetriesTransientFailure( + t *testing.T, +) { + t.Parallel() + + var ( + mu sync.Mutex + attempts int + ) + + srv := httptest.NewServer( + http.HandlerFunc( + func(w http.ResponseWriter, _ *http.Request) { + mu.Lock() + attempts++ + n := attempts + mu.Unlock() + + if n <= 2 { + w.WriteHeader( + http.StatusInternalServerError, + ) + + return + } + + w.WriteHeader(http.StatusOK) + }), + ) + defer srv.Close() + + svc := newRetryTestService(srv.URL, "ntfy") + + svc.SendNotification( + context.Background(), + "Retry Test", "body", "warning", + ) + + waitForCondition(t, func() bool { + mu.Lock() + defer mu.Unlock() + + return attempts >= 3 + }) +} + +// newRetryTestService creates a test service with instant +// sleep and low retry delays for the named endpoint. +func newRetryTestService( + rawURL, endpoint string, +) *notify.Service { + svc := notify.NewTestService(http.DefaultTransport) + svc.SetSleepFunc(instantSleep) + svc.SetRetryConfig(notify.RetryConfig{ + MaxRetries: 3, + BaseDelay: time.Millisecond, + MaxDelay: 10 * time.Millisecond, + }) + + u, _ := url.Parse(rawURL) + + switch endpoint { + case "ntfy": + svc.SetNtfyURL(u) + case "slack": + svc.SetSlackWebhookURL(u) + case "mattermost": + svc.SetMattermostWebhookURL(u) + } + + return svc +} + +func TestSendNotificationAllEndpointsRetrySetup( + t *testing.T, +) { + t.Parallel() + + result := newEndpointRetryResult() + ntfySrv, slackSrv, mmSrv := newRetryServers(result) + + defer ntfySrv.Close() + defer slackSrv.Close() + defer mmSrv.Close() + + svc := buildAllEndpointRetryService( + ntfySrv.URL, slackSrv.URL, mmSrv.URL, + ) + + svc.SendNotification( + context.Background(), + "Multi-Retry", "testing", "error", + ) + + assertAllEndpointsRetried(t, result) +} + +// endpointRetryResult tracks per-endpoint retry state. +type endpointRetryResult struct { + mu sync.Mutex + ntfyAttempts int + slackAttempts int + mmAttempts int + ntfyOK bool + slackOK bool + mmOK bool +} + +func newEndpointRetryResult() *endpointRetryResult { + return &endpointRetryResult{} +} + +func newRetryServers( + r *endpointRetryResult, +) (*httptest.Server, *httptest.Server, *httptest.Server) { + mk := func( + attempts *int, ok *bool, + ) *httptest.Server { + return httptest.NewServer( + http.HandlerFunc( + func(w http.ResponseWriter, _ *http.Request) { + r.mu.Lock() + *attempts++ + n := *attempts + r.mu.Unlock() + + if n == 1 { + w.WriteHeader( + http.StatusServiceUnavailable, + ) + + return + } + + r.mu.Lock() + *ok = true + r.mu.Unlock() + + w.WriteHeader(http.StatusOK) + }), + ) + } + + return mk(&r.ntfyAttempts, &r.ntfyOK), + mk(&r.slackAttempts, &r.slackOK), + mk(&r.mmAttempts, &r.mmOK) +} + +func buildAllEndpointRetryService( + ntfyURL, slackURL, mmURL string, +) *notify.Service { + svc := notify.NewTestService(http.DefaultTransport) + svc.SetSleepFunc(instantSleep) + svc.SetRetryConfig(notify.RetryConfig{ + MaxRetries: 3, + BaseDelay: time.Millisecond, + MaxDelay: 10 * time.Millisecond, + }) + + nu, _ := url.Parse(ntfyURL) + su, _ := url.Parse(slackURL) + mu, _ := url.Parse(mmURL) + + svc.SetNtfyURL(nu) + svc.SetSlackWebhookURL(su) + svc.SetMattermostWebhookURL(mu) + + return svc +} + +func assertAllEndpointsRetried( + t *testing.T, + r *endpointRetryResult, +) { + t.Helper() + + waitForCondition(t, func() bool { + r.mu.Lock() + defer r.mu.Unlock() + + return r.ntfyOK && r.slackOK && r.mmOK + }) + + r.mu.Lock() + defer r.mu.Unlock() + + if r.ntfyAttempts < 2 { + t.Errorf( + "ntfy: expected >= 2 attempts, got %d", + r.ntfyAttempts, + ) + } + + if r.slackAttempts < 2 { + t.Errorf( + "slack: expected >= 2 attempts, got %d", + r.slackAttempts, + ) + } + + if r.mmAttempts < 2 { + t.Errorf( + "mattermost: expected >= 2 attempts, got %d", + r.mmAttempts, + ) + } +} + +func TestSendNotificationPermanentFailureLogsError( + t *testing.T, +) { + t.Parallel() + + var ( + mu sync.Mutex + attempts int + ) + + srv := httptest.NewServer( + http.HandlerFunc( + func(w http.ResponseWriter, _ *http.Request) { + mu.Lock() + attempts++ + mu.Unlock() + + w.WriteHeader( + http.StatusInternalServerError, + ) + }), + ) + defer srv.Close() + + svc := newRetryTestService(srv.URL, "slack") + svc.SetRetryConfig(notify.RetryConfig{ + MaxRetries: 2, + BaseDelay: time.Millisecond, + MaxDelay: 10 * time.Millisecond, + }) + + svc.SendNotification( + context.Background(), + "Permanent Fail", "body", "error", + ) + + // 1 initial + 2 retries = 3 total. + waitForCondition(t, func() bool { + mu.Lock() + defer mu.Unlock() + + return attempts >= 3 + }) +}