feat: add retry with exponential backoff for notification delivery
All checks were successful
check / check (push) Successful in 42s
All checks were successful
check / check (push) Successful in 42s
Notifications were fire-and-forget: if Slack, Mattermost, or ntfy was temporarily down, changes were silently lost. This adds automatic retry with exponential backoff and jitter to all notification endpoints. Implementation: - New retry.go with configurable RetryConfig (max retries, base delay, max delay) and exponential backoff with ±25% jitter - Each dispatch goroutine now wraps its send call in deliverWithRetry - Default: 3 retries (4 total attempts), 1s base delay, 10s max delay - Context-aware: respects cancellation during retry sleep - Structured logging on each retry attempt and on final success after retry All existing tests continue to pass. New tests cover: - Backoff calculation (increase, cap) - Retry success on first attempt (no unnecessary retries) - Retry on transient failure (succeeds after N attempts) - Exhausted retries (returns last error) - Context cancellation during retry sleep - Integration: SendNotification retries transient 500s - Integration: all three endpoints retry independently - Integration: permanent failure exhausts retries closes #62
This commit is contained in:
493
internal/notify/retry_test.go
Normal file
493
internal/notify/retry_test.go
Normal file
@@ -0,0 +1,493 @@
|
||||
package notify_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"sneak.berlin/go/dnswatcher/internal/notify"
|
||||
)
|
||||
|
||||
// Static test errors (err113).
|
||||
var (
|
||||
errTransient = errors.New("transient failure")
|
||||
errPermanent = errors.New("permanent failure")
|
||||
errFail = errors.New("fail")
|
||||
)
|
||||
|
||||
// instantSleep returns a closed channel immediately, removing
|
||||
// real delays from tests.
|
||||
func instantSleep(_ time.Duration) <-chan time.Time {
|
||||
ch := make(chan time.Time, 1)
|
||||
ch <- time.Now()
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
// ── backoff calculation ───────────────────────────────────
|
||||
|
||||
func TestBackoffDurationIncreases(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cfg := notify.RetryConfig{
|
||||
MaxRetries: 5,
|
||||
BaseDelay: 1 * time.Second,
|
||||
MaxDelay: 30 * time.Second,
|
||||
}
|
||||
|
||||
prev := time.Duration(0)
|
||||
|
||||
// With jitter the exact value varies, but the trend
|
||||
// should be increasing for the first few attempts.
|
||||
for attempt := range 4 {
|
||||
d := cfg.BackoffDuration(attempt)
|
||||
if d <= 0 {
|
||||
t.Fatalf(
|
||||
"attempt %d: backoff must be positive, got %v",
|
||||
attempt, d,
|
||||
)
|
||||
}
|
||||
|
||||
// Allow jitter to occasionally flatten a step, but
|
||||
// the midpoint (no-jitter) should be strictly higher.
|
||||
midpoint := cfg.BaseDelay * (1 << attempt)
|
||||
if attempt > 0 && midpoint <= prev {
|
||||
t.Fatalf(
|
||||
"midpoint should grow: attempt %d midpoint=%v prev=%v",
|
||||
attempt, midpoint, prev,
|
||||
)
|
||||
}
|
||||
|
||||
prev = midpoint
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackoffDurationCappedAtMax(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cfg := notify.RetryConfig{
|
||||
MaxRetries: 5,
|
||||
BaseDelay: 1 * time.Second,
|
||||
MaxDelay: 5 * time.Second,
|
||||
}
|
||||
|
||||
// Attempt 10 would be 1024s without capping.
|
||||
d := cfg.BackoffDuration(10)
|
||||
|
||||
// With ±25% jitter on a 5s cap: max is 6.25s.
|
||||
const maxWithJitter = 5*time.Second +
|
||||
5*time.Second/4 +
|
||||
time.Millisecond // rounding margin
|
||||
|
||||
if d > maxWithJitter {
|
||||
t.Errorf(
|
||||
"backoff %v exceeds max+jitter %v",
|
||||
d, maxWithJitter,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// ── deliverWithRetry ──────────────────────────────────────
|
||||
|
||||
func TestDeliverWithRetrySucceedsFirstAttempt(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
svc := notify.NewTestService(http.DefaultTransport)
|
||||
svc.SetSleepFunc(instantSleep)
|
||||
|
||||
var calls atomic.Int32
|
||||
|
||||
err := svc.DeliverWithRetry(
|
||||
context.Background(), "test",
|
||||
func(_ context.Context) error {
|
||||
calls.Add(1)
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if calls.Load() != 1 {
|
||||
t.Errorf("expected 1 call, got %d", calls.Load())
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeliverWithRetryRetriesOnFailure(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
svc := notify.NewTestService(http.DefaultTransport)
|
||||
svc.SetSleepFunc(instantSleep)
|
||||
svc.SetRetryConfig(notify.RetryConfig{
|
||||
MaxRetries: 3,
|
||||
BaseDelay: time.Millisecond,
|
||||
MaxDelay: 10 * time.Millisecond,
|
||||
})
|
||||
|
||||
var calls atomic.Int32
|
||||
|
||||
// Fail twice, then succeed on the third attempt.
|
||||
err := svc.DeliverWithRetry(
|
||||
context.Background(), "test",
|
||||
func(_ context.Context) error {
|
||||
n := calls.Add(1)
|
||||
if n <= 2 {
|
||||
return errTransient
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("expected success after retries: %v", err)
|
||||
}
|
||||
|
||||
if calls.Load() != 3 {
|
||||
t.Errorf("expected 3 calls, got %d", calls.Load())
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeliverWithRetryExhaustsAttempts(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
svc := notify.NewTestService(http.DefaultTransport)
|
||||
svc.SetSleepFunc(instantSleep)
|
||||
svc.SetRetryConfig(notify.RetryConfig{
|
||||
MaxRetries: 2,
|
||||
BaseDelay: time.Millisecond,
|
||||
MaxDelay: 10 * time.Millisecond,
|
||||
})
|
||||
|
||||
var calls atomic.Int32
|
||||
|
||||
err := svc.DeliverWithRetry(
|
||||
context.Background(), "test",
|
||||
func(_ context.Context) error {
|
||||
calls.Add(1)
|
||||
|
||||
return errPermanent
|
||||
},
|
||||
)
|
||||
if err == nil {
|
||||
t.Fatal("expected error when all retries exhausted")
|
||||
}
|
||||
|
||||
if !errors.Is(err, errPermanent) {
|
||||
t.Errorf("expected permanent failure, got: %v", err)
|
||||
}
|
||||
|
||||
// 1 initial + 2 retries = 3 total.
|
||||
if calls.Load() != 3 {
|
||||
t.Errorf("expected 3 calls, got %d", calls.Load())
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeliverWithRetryRespectsContextCancellation(
|
||||
t *testing.T,
|
||||
) {
|
||||
t.Parallel()
|
||||
|
||||
svc := notify.NewTestService(http.DefaultTransport)
|
||||
svc.SetRetryConfig(notify.RetryConfig{
|
||||
MaxRetries: 5,
|
||||
BaseDelay: time.Millisecond,
|
||||
MaxDelay: 10 * time.Millisecond,
|
||||
})
|
||||
|
||||
// Use a blocking sleep so the context cancellation is
|
||||
// the only way out.
|
||||
svc.SetSleepFunc(func(_ time.Duration) <-chan time.Time {
|
||||
return make(chan time.Time) // never fires
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
done := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
done <- svc.DeliverWithRetry(
|
||||
ctx, "test",
|
||||
func(_ context.Context) error {
|
||||
return errFail
|
||||
},
|
||||
)
|
||||
}()
|
||||
|
||||
// Wait for the first failure + retry sleep to be
|
||||
// entered, then cancel.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
t.Errorf(
|
||||
"expected context.Canceled, got: %v", err,
|
||||
)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("deliverWithRetry did not return after cancel")
|
||||
}
|
||||
}
|
||||
|
||||
// ── integration: SendNotification with retry ──────────────
|
||||
|
||||
func TestSendNotificationRetriesTransientFailure(
|
||||
t *testing.T,
|
||||
) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
attempts int
|
||||
)
|
||||
|
||||
srv := httptest.NewServer(
|
||||
http.HandlerFunc(
|
||||
func(w http.ResponseWriter, _ *http.Request) {
|
||||
mu.Lock()
|
||||
attempts++
|
||||
n := attempts
|
||||
mu.Unlock()
|
||||
|
||||
if n <= 2 {
|
||||
w.WriteHeader(
|
||||
http.StatusInternalServerError,
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}),
|
||||
)
|
||||
defer srv.Close()
|
||||
|
||||
svc := newRetryTestService(srv.URL, "ntfy")
|
||||
|
||||
svc.SendNotification(
|
||||
context.Background(),
|
||||
"Retry Test", "body", "warning",
|
||||
)
|
||||
|
||||
waitForCondition(t, func() bool {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
return attempts >= 3
|
||||
})
|
||||
}
|
||||
|
||||
// newRetryTestService creates a test service with instant
|
||||
// sleep and low retry delays for the named endpoint.
|
||||
func newRetryTestService(
|
||||
rawURL, endpoint string,
|
||||
) *notify.Service {
|
||||
svc := notify.NewTestService(http.DefaultTransport)
|
||||
svc.SetSleepFunc(instantSleep)
|
||||
svc.SetRetryConfig(notify.RetryConfig{
|
||||
MaxRetries: 3,
|
||||
BaseDelay: time.Millisecond,
|
||||
MaxDelay: 10 * time.Millisecond,
|
||||
})
|
||||
|
||||
u, _ := url.Parse(rawURL)
|
||||
|
||||
switch endpoint {
|
||||
case "ntfy":
|
||||
svc.SetNtfyURL(u)
|
||||
case "slack":
|
||||
svc.SetSlackWebhookURL(u)
|
||||
case "mattermost":
|
||||
svc.SetMattermostWebhookURL(u)
|
||||
}
|
||||
|
||||
return svc
|
||||
}
|
||||
|
||||
func TestSendNotificationAllEndpointsRetrySetup(
|
||||
t *testing.T,
|
||||
) {
|
||||
t.Parallel()
|
||||
|
||||
result := newEndpointRetryResult()
|
||||
ntfySrv, slackSrv, mmSrv := newRetryServers(result)
|
||||
|
||||
defer ntfySrv.Close()
|
||||
defer slackSrv.Close()
|
||||
defer mmSrv.Close()
|
||||
|
||||
svc := buildAllEndpointRetryService(
|
||||
ntfySrv.URL, slackSrv.URL, mmSrv.URL,
|
||||
)
|
||||
|
||||
svc.SendNotification(
|
||||
context.Background(),
|
||||
"Multi-Retry", "testing", "error",
|
||||
)
|
||||
|
||||
assertAllEndpointsRetried(t, result)
|
||||
}
|
||||
|
||||
// endpointRetryResult tracks per-endpoint retry state.
|
||||
type endpointRetryResult struct {
|
||||
mu sync.Mutex
|
||||
ntfyAttempts int
|
||||
slackAttempts int
|
||||
mmAttempts int
|
||||
ntfyOK bool
|
||||
slackOK bool
|
||||
mmOK bool
|
||||
}
|
||||
|
||||
func newEndpointRetryResult() *endpointRetryResult {
|
||||
return &endpointRetryResult{}
|
||||
}
|
||||
|
||||
func newRetryServers(
|
||||
r *endpointRetryResult,
|
||||
) (*httptest.Server, *httptest.Server, *httptest.Server) {
|
||||
mk := func(
|
||||
attempts *int, ok *bool,
|
||||
) *httptest.Server {
|
||||
return httptest.NewServer(
|
||||
http.HandlerFunc(
|
||||
func(w http.ResponseWriter, _ *http.Request) {
|
||||
r.mu.Lock()
|
||||
*attempts++
|
||||
n := *attempts
|
||||
r.mu.Unlock()
|
||||
|
||||
if n == 1 {
|
||||
w.WriteHeader(
|
||||
http.StatusServiceUnavailable,
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
r.mu.Lock()
|
||||
*ok = true
|
||||
r.mu.Unlock()
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
return mk(&r.ntfyAttempts, &r.ntfyOK),
|
||||
mk(&r.slackAttempts, &r.slackOK),
|
||||
mk(&r.mmAttempts, &r.mmOK)
|
||||
}
|
||||
|
||||
func buildAllEndpointRetryService(
|
||||
ntfyURL, slackURL, mmURL string,
|
||||
) *notify.Service {
|
||||
svc := notify.NewTestService(http.DefaultTransport)
|
||||
svc.SetSleepFunc(instantSleep)
|
||||
svc.SetRetryConfig(notify.RetryConfig{
|
||||
MaxRetries: 3,
|
||||
BaseDelay: time.Millisecond,
|
||||
MaxDelay: 10 * time.Millisecond,
|
||||
})
|
||||
|
||||
nu, _ := url.Parse(ntfyURL)
|
||||
su, _ := url.Parse(slackURL)
|
||||
mu, _ := url.Parse(mmURL)
|
||||
|
||||
svc.SetNtfyURL(nu)
|
||||
svc.SetSlackWebhookURL(su)
|
||||
svc.SetMattermostWebhookURL(mu)
|
||||
|
||||
return svc
|
||||
}
|
||||
|
||||
func assertAllEndpointsRetried(
|
||||
t *testing.T,
|
||||
r *endpointRetryResult,
|
||||
) {
|
||||
t.Helper()
|
||||
|
||||
waitForCondition(t, func() bool {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
return r.ntfyOK && r.slackOK && r.mmOK
|
||||
})
|
||||
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
if r.ntfyAttempts < 2 {
|
||||
t.Errorf(
|
||||
"ntfy: expected >= 2 attempts, got %d",
|
||||
r.ntfyAttempts,
|
||||
)
|
||||
}
|
||||
|
||||
if r.slackAttempts < 2 {
|
||||
t.Errorf(
|
||||
"slack: expected >= 2 attempts, got %d",
|
||||
r.slackAttempts,
|
||||
)
|
||||
}
|
||||
|
||||
if r.mmAttempts < 2 {
|
||||
t.Errorf(
|
||||
"mattermost: expected >= 2 attempts, got %d",
|
||||
r.mmAttempts,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendNotificationPermanentFailureLogsError(
|
||||
t *testing.T,
|
||||
) {
|
||||
t.Parallel()
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
attempts int
|
||||
)
|
||||
|
||||
srv := httptest.NewServer(
|
||||
http.HandlerFunc(
|
||||
func(w http.ResponseWriter, _ *http.Request) {
|
||||
mu.Lock()
|
||||
attempts++
|
||||
mu.Unlock()
|
||||
|
||||
w.WriteHeader(
|
||||
http.StatusInternalServerError,
|
||||
)
|
||||
}),
|
||||
)
|
||||
defer srv.Close()
|
||||
|
||||
svc := newRetryTestService(srv.URL, "slack")
|
||||
svc.SetRetryConfig(notify.RetryConfig{
|
||||
MaxRetries: 2,
|
||||
BaseDelay: time.Millisecond,
|
||||
MaxDelay: 10 * time.Millisecond,
|
||||
})
|
||||
|
||||
svc.SendNotification(
|
||||
context.Background(),
|
||||
"Permanent Fail", "body", "error",
|
||||
)
|
||||
|
||||
// 1 initial + 2 retries = 3 total.
|
||||
waitForCondition(t, func() bool {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
return attempts >= 3
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user