package notify_test import ( "context" "errors" "net/http" "net/http/httptest" "net/url" "sync" "sync/atomic" "testing" "time" "sneak.berlin/go/dnswatcher/internal/notify" ) // Static test errors (err113). var ( errTransient = errors.New("transient failure") errPermanent = errors.New("permanent failure") errFail = errors.New("fail") ) // instantSleep returns a closed channel immediately, removing // real delays from tests. func instantSleep(_ time.Duration) <-chan time.Time { ch := make(chan time.Time, 1) ch <- time.Now() return ch } // ── backoff calculation ─────────────────────────────────── func TestBackoffDurationIncreases(t *testing.T) { t.Parallel() cfg := notify.RetryConfig{ MaxRetries: 5, BaseDelay: 1 * time.Second, MaxDelay: 30 * time.Second, } prev := time.Duration(0) // With jitter the exact value varies, but the trend // should be increasing for the first few attempts. for attempt := range 4 { d := cfg.BackoffDuration(attempt) if d <= 0 { t.Fatalf( "attempt %d: backoff must be positive, got %v", attempt, d, ) } // Allow jitter to occasionally flatten a step, but // the midpoint (no-jitter) should be strictly higher. midpoint := cfg.BaseDelay * (1 << attempt) if attempt > 0 && midpoint <= prev { t.Fatalf( "midpoint should grow: attempt %d midpoint=%v prev=%v", attempt, midpoint, prev, ) } prev = midpoint } } func TestBackoffDurationCappedAtMax(t *testing.T) { t.Parallel() cfg := notify.RetryConfig{ MaxRetries: 5, BaseDelay: 1 * time.Second, MaxDelay: 5 * time.Second, } // Attempt 10 would be 1024s without capping. d := cfg.BackoffDuration(10) // With ±25% jitter on a 5s cap: max is 6.25s. const maxWithJitter = 5*time.Second + 5*time.Second/4 + time.Millisecond // rounding margin if d > maxWithJitter { t.Errorf( "backoff %v exceeds max+jitter %v", d, maxWithJitter, ) } } // ── deliverWithRetry ────────────────────────────────────── func TestDeliverWithRetrySucceedsFirstAttempt(t *testing.T) { t.Parallel() svc := notify.NewTestService(http.DefaultTransport) svc.SetSleepFunc(instantSleep) var calls atomic.Int32 err := svc.DeliverWithRetry( context.Background(), "test", func(_ context.Context) error { calls.Add(1) return nil }, ) if err != nil { t.Fatalf("unexpected error: %v", err) } if calls.Load() != 1 { t.Errorf("expected 1 call, got %d", calls.Load()) } } func TestDeliverWithRetryRetriesOnFailure(t *testing.T) { t.Parallel() svc := notify.NewTestService(http.DefaultTransport) svc.SetSleepFunc(instantSleep) svc.SetRetryConfig(notify.RetryConfig{ MaxRetries: 3, BaseDelay: time.Millisecond, MaxDelay: 10 * time.Millisecond, }) var calls atomic.Int32 // Fail twice, then succeed on the third attempt. err := svc.DeliverWithRetry( context.Background(), "test", func(_ context.Context) error { n := calls.Add(1) if n <= 2 { return errTransient } return nil }, ) if err != nil { t.Fatalf("expected success after retries: %v", err) } if calls.Load() != 3 { t.Errorf("expected 3 calls, got %d", calls.Load()) } } func TestDeliverWithRetryExhaustsAttempts(t *testing.T) { t.Parallel() svc := notify.NewTestService(http.DefaultTransport) svc.SetSleepFunc(instantSleep) svc.SetRetryConfig(notify.RetryConfig{ MaxRetries: 2, BaseDelay: time.Millisecond, MaxDelay: 10 * time.Millisecond, }) var calls atomic.Int32 err := svc.DeliverWithRetry( context.Background(), "test", func(_ context.Context) error { calls.Add(1) return errPermanent }, ) if err == nil { t.Fatal("expected error when all retries exhausted") } if !errors.Is(err, errPermanent) { t.Errorf("expected permanent failure, got: %v", err) } // 1 initial + 2 retries = 3 total. if calls.Load() != 3 { t.Errorf("expected 3 calls, got %d", calls.Load()) } } func TestDeliverWithRetryRespectsContextCancellation( t *testing.T, ) { t.Parallel() svc := notify.NewTestService(http.DefaultTransport) svc.SetRetryConfig(notify.RetryConfig{ MaxRetries: 5, BaseDelay: time.Millisecond, MaxDelay: 10 * time.Millisecond, }) // Use a blocking sleep so the context cancellation is // the only way out. svc.SetSleepFunc(func(_ time.Duration) <-chan time.Time { return make(chan time.Time) // never fires }) ctx, cancel := context.WithCancel(context.Background()) done := make(chan error, 1) go func() { done <- svc.DeliverWithRetry( ctx, "test", func(_ context.Context) error { return errFail }, ) }() // Wait for the first failure + retry sleep to be // entered, then cancel. time.Sleep(50 * time.Millisecond) cancel() select { case err := <-done: if !errors.Is(err, context.Canceled) { t.Errorf( "expected context.Canceled, got: %v", err, ) } case <-time.After(2 * time.Second): t.Fatal("deliverWithRetry did not return after cancel") } } // ── integration: SendNotification with retry ────────────── func TestSendNotificationRetriesTransientFailure( t *testing.T, ) { t.Parallel() var ( mu sync.Mutex attempts int ) srv := httptest.NewServer( http.HandlerFunc( func(w http.ResponseWriter, _ *http.Request) { mu.Lock() attempts++ n := attempts mu.Unlock() if n <= 2 { w.WriteHeader( http.StatusInternalServerError, ) return } w.WriteHeader(http.StatusOK) }), ) defer srv.Close() svc := newRetryTestService(srv.URL, "ntfy") svc.SendNotification( context.Background(), "Retry Test", "body", "warning", ) waitForCondition(t, func() bool { mu.Lock() defer mu.Unlock() return attempts >= 3 }) } // newRetryTestService creates a test service with instant // sleep and low retry delays for the named endpoint. func newRetryTestService( rawURL, endpoint string, ) *notify.Service { svc := notify.NewTestService(http.DefaultTransport) svc.SetSleepFunc(instantSleep) svc.SetRetryConfig(notify.RetryConfig{ MaxRetries: 3, BaseDelay: time.Millisecond, MaxDelay: 10 * time.Millisecond, }) u, _ := url.Parse(rawURL) switch endpoint { case "ntfy": svc.SetNtfyURL(u) case "slack": svc.SetSlackWebhookURL(u) case "mattermost": svc.SetMattermostWebhookURL(u) } return svc } func TestSendNotificationAllEndpointsRetrySetup( t *testing.T, ) { t.Parallel() result := newEndpointRetryResult() ntfySrv, slackSrv, mmSrv := newRetryServers(result) defer ntfySrv.Close() defer slackSrv.Close() defer mmSrv.Close() svc := buildAllEndpointRetryService( ntfySrv.URL, slackSrv.URL, mmSrv.URL, ) svc.SendNotification( context.Background(), "Multi-Retry", "testing", "error", ) assertAllEndpointsRetried(t, result) } // endpointRetryResult tracks per-endpoint retry state. type endpointRetryResult struct { mu sync.Mutex ntfyAttempts int slackAttempts int mmAttempts int ntfyOK bool slackOK bool mmOK bool } func newEndpointRetryResult() *endpointRetryResult { return &endpointRetryResult{} } func newRetryServers( r *endpointRetryResult, ) (*httptest.Server, *httptest.Server, *httptest.Server) { mk := func( attempts *int, ok *bool, ) *httptest.Server { return httptest.NewServer( http.HandlerFunc( func(w http.ResponseWriter, _ *http.Request) { r.mu.Lock() *attempts++ n := *attempts r.mu.Unlock() if n == 1 { w.WriteHeader( http.StatusServiceUnavailable, ) return } r.mu.Lock() *ok = true r.mu.Unlock() w.WriteHeader(http.StatusOK) }), ) } return mk(&r.ntfyAttempts, &r.ntfyOK), mk(&r.slackAttempts, &r.slackOK), mk(&r.mmAttempts, &r.mmOK) } func buildAllEndpointRetryService( ntfyURL, slackURL, mmURL string, ) *notify.Service { svc := notify.NewTestService(http.DefaultTransport) svc.SetSleepFunc(instantSleep) svc.SetRetryConfig(notify.RetryConfig{ MaxRetries: 3, BaseDelay: time.Millisecond, MaxDelay: 10 * time.Millisecond, }) nu, _ := url.Parse(ntfyURL) su, _ := url.Parse(slackURL) mu, _ := url.Parse(mmURL) svc.SetNtfyURL(nu) svc.SetSlackWebhookURL(su) svc.SetMattermostWebhookURL(mu) return svc } func assertAllEndpointsRetried( t *testing.T, r *endpointRetryResult, ) { t.Helper() waitForCondition(t, func() bool { r.mu.Lock() defer r.mu.Unlock() return r.ntfyOK && r.slackOK && r.mmOK }) r.mu.Lock() defer r.mu.Unlock() if r.ntfyAttempts < 2 { t.Errorf( "ntfy: expected >= 2 attempts, got %d", r.ntfyAttempts, ) } if r.slackAttempts < 2 { t.Errorf( "slack: expected >= 2 attempts, got %d", r.slackAttempts, ) } if r.mmAttempts < 2 { t.Errorf( "mattermost: expected >= 2 attempts, got %d", r.mmAttempts, ) } } func TestSendNotificationPermanentFailureLogsError( t *testing.T, ) { t.Parallel() var ( mu sync.Mutex attempts int ) srv := httptest.NewServer( http.HandlerFunc( func(w http.ResponseWriter, _ *http.Request) { mu.Lock() attempts++ mu.Unlock() w.WriteHeader( http.StatusInternalServerError, ) }), ) defer srv.Close() svc := newRetryTestService(srv.URL, "slack") svc.SetRetryConfig(notify.RetryConfig{ MaxRetries: 2, BaseDelay: time.Millisecond, MaxDelay: 10 * time.Millisecond, }) svc.SendNotification( context.Background(), "Permanent Fail", "body", "error", ) // 1 initial + 2 retries = 3 total. waitForCondition(t, func() bool { mu.Lock() defer mu.Unlock() return attempts >= 3 }) }