Compare commits
1 Commits
feature/62
...
fix/88-abs
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b4f986a2e5 |
@@ -219,7 +219,7 @@ the following precedence (highest to lowest):
|
|||||||
|---------------------------------|--------------------------------------------|-------------|
|
|---------------------------------|--------------------------------------------|-------------|
|
||||||
| `PORT` | HTTP listen port | `8080` |
|
| `PORT` | HTTP listen port | `8080` |
|
||||||
| `DNSWATCHER_DEBUG` | Enable debug logging | `false` |
|
| `DNSWATCHER_DEBUG` | Enable debug logging | `false` |
|
||||||
| `DNSWATCHER_DATA_DIR` | Directory for state file | `./data` |
|
| `DNSWATCHER_DATA_DIR` | Directory for state file | `/var/lib/dnswatcher` |
|
||||||
| `DNSWATCHER_TARGETS` | Comma-separated DNS names (auto-classified via PSL) | `""` |
|
| `DNSWATCHER_TARGETS` | Comma-separated DNS names (auto-classified via PSL) | `""` |
|
||||||
| `DNSWATCHER_SLACK_WEBHOOK` | Slack incoming webhook URL | `""` |
|
| `DNSWATCHER_SLACK_WEBHOOK` | Slack incoming webhook URL | `""` |
|
||||||
| `DNSWATCHER_MATTERMOST_WEBHOOK` | Mattermost incoming webhook URL | `""` |
|
| `DNSWATCHER_MATTERMOST_WEBHOOK` | Mattermost incoming webhook URL | `""` |
|
||||||
@@ -244,7 +244,7 @@ list of DNS names before starting.
|
|||||||
```sh
|
```sh
|
||||||
PORT=8080
|
PORT=8080
|
||||||
DNSWATCHER_DEBUG=false
|
DNSWATCHER_DEBUG=false
|
||||||
DNSWATCHER_DATA_DIR=./data
|
DNSWATCHER_DATA_DIR=/var/lib/dnswatcher
|
||||||
DNSWATCHER_TARGETS=example.com,example.org,www.example.com,api.example.com,mail.example.org
|
DNSWATCHER_TARGETS=example.com,example.org,www.example.com,api.example.com,mail.example.org
|
||||||
DNSWATCHER_SLACK_WEBHOOK=https://hooks.slack.com/services/T.../B.../xxx
|
DNSWATCHER_SLACK_WEBHOOK=https://hooks.slack.com/services/T.../B.../xxx
|
||||||
DNSWATCHER_MATTERMOST_WEBHOOK=https://mattermost.example.com/hooks/xxx
|
DNSWATCHER_MATTERMOST_WEBHOOK=https://mattermost.example.com/hooks/xxx
|
||||||
|
|||||||
@@ -94,7 +94,7 @@ func setupViper(name string) {
|
|||||||
|
|
||||||
viper.SetDefault("PORT", defaultPort)
|
viper.SetDefault("PORT", defaultPort)
|
||||||
viper.SetDefault("DEBUG", false)
|
viper.SetDefault("DEBUG", false)
|
||||||
viper.SetDefault("DATA_DIR", "./data")
|
viper.SetDefault("DATA_DIR", "/var/lib/"+name)
|
||||||
viper.SetDefault("TARGETS", "")
|
viper.SetDefault("TARGETS", "")
|
||||||
viper.SetDefault("SLACK_WEBHOOK", "")
|
viper.SetDefault("SLACK_WEBHOOK", "")
|
||||||
viper.SetDefault("MATTERMOST_WEBHOOK", "")
|
viper.SetDefault("MATTERMOST_WEBHOOK", "")
|
||||||
|
|||||||
@@ -44,7 +44,7 @@ func TestNew_DefaultValues(t *testing.T) {
|
|||||||
|
|
||||||
assert.Equal(t, 8080, cfg.Port)
|
assert.Equal(t, 8080, cfg.Port)
|
||||||
assert.False(t, cfg.Debug)
|
assert.False(t, cfg.Debug)
|
||||||
assert.Equal(t, "./data", cfg.DataDir)
|
assert.Equal(t, "/var/lib/dnswatcher", cfg.DataDir)
|
||||||
assert.Equal(t, time.Hour, cfg.DNSInterval)
|
assert.Equal(t, time.Hour, cfg.DNSInterval)
|
||||||
assert.Equal(t, 12*time.Hour, cfg.TLSInterval)
|
assert.Equal(t, 12*time.Hour, cfg.TLSInterval)
|
||||||
assert.Equal(t, 7, cfg.TLSExpiryWarning)
|
assert.Equal(t, 7, cfg.TLSExpiryWarning)
|
||||||
@@ -245,7 +245,7 @@ func TestStatePath(t *testing.T) {
|
|||||||
dataDir string
|
dataDir string
|
||||||
want string
|
want string
|
||||||
}{
|
}{
|
||||||
{"default", "./data", "./data/state.json"},
|
{"default", "/var/lib/dnswatcher", "/var/lib/dnswatcher/state.json"},
|
||||||
{"absolute", "/var/lib/dw", "/var/lib/dw/state.json"},
|
{"absolute", "/var/lib/dw", "/var/lib/dw/state.json"},
|
||||||
{"nested", "/opt/app/data", "/opt/app/data/state.json"},
|
{"nested", "/opt/app/data", "/opt/app/data/state.json"},
|
||||||
{"empty", "", "/state.json"},
|
{"empty", "", "/state.json"},
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import (
|
|||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// NtfyPriority exports ntfyPriority for testing.
|
// NtfyPriority exports ntfyPriority for testing.
|
||||||
@@ -75,31 +74,3 @@ func (svc *Service) SendSlack(
|
|||||||
ctx, webhookURL, title, message, priority,
|
ctx, webhookURL, title, message, priority,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetRetryConfig overrides the retry configuration for
|
|
||||||
// testing.
|
|
||||||
func (svc *Service) SetRetryConfig(cfg RetryConfig) {
|
|
||||||
svc.retryConfig = cfg
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetSleepFunc overrides the sleep function so tests can
|
|
||||||
// eliminate real delays.
|
|
||||||
func (svc *Service) SetSleepFunc(
|
|
||||||
fn func(time.Duration) <-chan time.Time,
|
|
||||||
) {
|
|
||||||
svc.sleepFn = fn
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeliverWithRetry exports deliverWithRetry for testing.
|
|
||||||
func (svc *Service) DeliverWithRetry(
|
|
||||||
ctx context.Context,
|
|
||||||
endpoint string,
|
|
||||||
fn func(context.Context) error,
|
|
||||||
) error {
|
|
||||||
return svc.deliverWithRetry(ctx, endpoint, fn)
|
|
||||||
}
|
|
||||||
|
|
||||||
// BackoffDuration exports RetryConfig.backoff for testing.
|
|
||||||
func (rc RetryConfig) BackoffDuration(attempt int) time.Duration {
|
|
||||||
return rc.defaults().backoff(attempt)
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -113,8 +113,6 @@ type Service struct {
|
|||||||
slackWebhookURL *url.URL
|
slackWebhookURL *url.URL
|
||||||
mattermostWebhookURL *url.URL
|
mattermostWebhookURL *url.URL
|
||||||
history *AlertHistory
|
history *AlertHistory
|
||||||
retryConfig RetryConfig
|
|
||||||
sleepFn func(time.Duration) <-chan time.Time
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new notify Service.
|
// New creates a new notify Service.
|
||||||
@@ -205,19 +203,13 @@ func (svc *Service) dispatchNtfy(
|
|||||||
go func() {
|
go func() {
|
||||||
notifyCtx := context.WithoutCancel(ctx)
|
notifyCtx := context.WithoutCancel(ctx)
|
||||||
|
|
||||||
err := svc.deliverWithRetry(
|
err := svc.sendNtfy(
|
||||||
notifyCtx, "ntfy",
|
notifyCtx, svc.ntfyURL,
|
||||||
func(c context.Context) error {
|
|
||||||
return svc.sendNtfy(
|
|
||||||
c, svc.ntfyURL,
|
|
||||||
title, message, priority,
|
title, message, priority,
|
||||||
)
|
)
|
||||||
},
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
svc.log.Error(
|
svc.log.Error(
|
||||||
"failed to send ntfy notification "+
|
"failed to send ntfy notification",
|
||||||
"after retries",
|
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -235,19 +227,13 @@ func (svc *Service) dispatchSlack(
|
|||||||
go func() {
|
go func() {
|
||||||
notifyCtx := context.WithoutCancel(ctx)
|
notifyCtx := context.WithoutCancel(ctx)
|
||||||
|
|
||||||
err := svc.deliverWithRetry(
|
err := svc.sendSlack(
|
||||||
notifyCtx, "slack",
|
notifyCtx, svc.slackWebhookURL,
|
||||||
func(c context.Context) error {
|
|
||||||
return svc.sendSlack(
|
|
||||||
c, svc.slackWebhookURL,
|
|
||||||
title, message, priority,
|
title, message, priority,
|
||||||
)
|
)
|
||||||
},
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
svc.log.Error(
|
svc.log.Error(
|
||||||
"failed to send slack notification "+
|
"failed to send slack notification",
|
||||||
"after retries",
|
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -265,19 +251,13 @@ func (svc *Service) dispatchMattermost(
|
|||||||
go func() {
|
go func() {
|
||||||
notifyCtx := context.WithoutCancel(ctx)
|
notifyCtx := context.WithoutCancel(ctx)
|
||||||
|
|
||||||
err := svc.deliverWithRetry(
|
err := svc.sendSlack(
|
||||||
notifyCtx, "mattermost",
|
notifyCtx, svc.mattermostWebhookURL,
|
||||||
func(c context.Context) error {
|
|
||||||
return svc.sendSlack(
|
|
||||||
c, svc.mattermostWebhookURL,
|
|
||||||
title, message, priority,
|
title, message, priority,
|
||||||
)
|
)
|
||||||
},
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
svc.log.Error(
|
svc.log.Error(
|
||||||
"failed to send mattermost notification "+
|
"failed to send mattermost notification",
|
||||||
"after retries",
|
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,139 +0,0 @@
|
|||||||
package notify
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"math"
|
|
||||||
"math/rand/v2"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Retry defaults.
|
|
||||||
const (
|
|
||||||
// DefaultMaxRetries is the number of additional attempts
|
|
||||||
// after the first failure.
|
|
||||||
DefaultMaxRetries = 5
|
|
||||||
|
|
||||||
// DefaultBaseDelay is the initial delay before the first
|
|
||||||
// retry attempt.
|
|
||||||
DefaultBaseDelay = 1 * time.Second
|
|
||||||
|
|
||||||
// DefaultMaxDelay caps the computed backoff delay.
|
|
||||||
DefaultMaxDelay = 60 * time.Second
|
|
||||||
|
|
||||||
// backoffMultiplier is the exponential growth factor.
|
|
||||||
backoffMultiplier = 2
|
|
||||||
|
|
||||||
// jitterFraction controls the ±random spread applied
|
|
||||||
// to each delay (0.25 = ±25%).
|
|
||||||
jitterFraction = 0.25
|
|
||||||
)
|
|
||||||
|
|
||||||
// RetryConfig holds tuning knobs for the retry loop.
|
|
||||||
// Zero values fall back to the package defaults above.
|
|
||||||
type RetryConfig struct {
|
|
||||||
MaxRetries int
|
|
||||||
BaseDelay time.Duration
|
|
||||||
MaxDelay time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// defaults returns a copy with zero fields replaced by
|
|
||||||
// package defaults.
|
|
||||||
func (rc RetryConfig) defaults() RetryConfig {
|
|
||||||
if rc.MaxRetries <= 0 {
|
|
||||||
rc.MaxRetries = DefaultMaxRetries
|
|
||||||
}
|
|
||||||
|
|
||||||
if rc.BaseDelay <= 0 {
|
|
||||||
rc.BaseDelay = DefaultBaseDelay
|
|
||||||
}
|
|
||||||
|
|
||||||
if rc.MaxDelay <= 0 {
|
|
||||||
rc.MaxDelay = DefaultMaxDelay
|
|
||||||
}
|
|
||||||
|
|
||||||
return rc
|
|
||||||
}
|
|
||||||
|
|
||||||
// backoff computes the delay for attempt n (0-indexed) with
|
|
||||||
// jitter. The raw delay is BaseDelay * 2^n, capped at
|
|
||||||
// MaxDelay, then randomised by ±jitterFraction.
|
|
||||||
func (rc RetryConfig) backoff(attempt int) time.Duration {
|
|
||||||
raw := float64(rc.BaseDelay) *
|
|
||||||
math.Pow(backoffMultiplier, float64(attempt))
|
|
||||||
|
|
||||||
if raw > float64(rc.MaxDelay) {
|
|
||||||
raw = float64(rc.MaxDelay)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply jitter: uniform in [raw*(1-j), raw*(1+j)].
|
|
||||||
lo := raw * (1 - jitterFraction)
|
|
||||||
hi := raw * (1 + jitterFraction)
|
|
||||||
|
|
||||||
jittered := lo + rand.Float64()*(hi-lo) //nolint:gosec // jitter does not need crypto/rand
|
|
||||||
|
|
||||||
return time.Duration(jittered)
|
|
||||||
}
|
|
||||||
|
|
||||||
// deliverWithRetry calls fn, retrying on error with
|
|
||||||
// exponential backoff. It logs every failed attempt and
|
|
||||||
// returns the last error if all attempts are exhausted.
|
|
||||||
func (svc *Service) deliverWithRetry(
|
|
||||||
ctx context.Context,
|
|
||||||
endpoint string,
|
|
||||||
fn func(context.Context) error,
|
|
||||||
) error {
|
|
||||||
cfg := svc.retryConfig.defaults()
|
|
||||||
|
|
||||||
var lastErr error
|
|
||||||
|
|
||||||
// attempt 0 is the initial call; attempts 1..MaxRetries
|
|
||||||
// are retries.
|
|
||||||
for attempt := range cfg.MaxRetries + 1 {
|
|
||||||
lastErr = fn(ctx)
|
|
||||||
if lastErr == nil {
|
|
||||||
if attempt > 0 {
|
|
||||||
svc.log.Info(
|
|
||||||
"notification delivered after retry",
|
|
||||||
"endpoint", endpoint,
|
|
||||||
"attempt", attempt+1,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Last attempt — don't sleep, just return.
|
|
||||||
if attempt == cfg.MaxRetries {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
delay := cfg.backoff(attempt)
|
|
||||||
|
|
||||||
svc.log.Warn(
|
|
||||||
"notification delivery failed, retrying",
|
|
||||||
"endpoint", endpoint,
|
|
||||||
"attempt", attempt+1,
|
|
||||||
"maxAttempts", cfg.MaxRetries+1,
|
|
||||||
"retryIn", delay,
|
|
||||||
"error", lastErr,
|
|
||||||
)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
case <-svc.sleepFunc(delay):
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return lastErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// sleepFunc returns a channel that closes after d.
|
|
||||||
// It is a field-level indirection so tests can override it.
|
|
||||||
func (svc *Service) sleepFunc(d time.Duration) <-chan time.Time {
|
|
||||||
if svc.sleepFn != nil {
|
|
||||||
return svc.sleepFn(d)
|
|
||||||
}
|
|
||||||
|
|
||||||
return time.After(d)
|
|
||||||
}
|
|
||||||
@@ -1,493 +0,0 @@
|
|||||||
package notify_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"net/url"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"sneak.berlin/go/dnswatcher/internal/notify"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Static test errors (err113).
|
|
||||||
var (
|
|
||||||
errTransient = errors.New("transient failure")
|
|
||||||
errPermanent = errors.New("permanent failure")
|
|
||||||
errFail = errors.New("fail")
|
|
||||||
)
|
|
||||||
|
|
||||||
// instantSleep returns a closed channel immediately, removing
|
|
||||||
// real delays from tests.
|
|
||||||
func instantSleep(_ time.Duration) <-chan time.Time {
|
|
||||||
ch := make(chan time.Time, 1)
|
|
||||||
ch <- time.Now()
|
|
||||||
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── backoff calculation ───────────────────────────────────
|
|
||||||
|
|
||||||
func TestBackoffDurationIncreases(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
cfg := notify.RetryConfig{
|
|
||||||
MaxRetries: 5,
|
|
||||||
BaseDelay: 1 * time.Second,
|
|
||||||
MaxDelay: 30 * time.Second,
|
|
||||||
}
|
|
||||||
|
|
||||||
prev := time.Duration(0)
|
|
||||||
|
|
||||||
// With jitter the exact value varies, but the trend
|
|
||||||
// should be increasing for the first few attempts.
|
|
||||||
for attempt := range 4 {
|
|
||||||
d := cfg.BackoffDuration(attempt)
|
|
||||||
if d <= 0 {
|
|
||||||
t.Fatalf(
|
|
||||||
"attempt %d: backoff must be positive, got %v",
|
|
||||||
attempt, d,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allow jitter to occasionally flatten a step, but
|
|
||||||
// the midpoint (no-jitter) should be strictly higher.
|
|
||||||
midpoint := cfg.BaseDelay * (1 << attempt)
|
|
||||||
if attempt > 0 && midpoint <= prev {
|
|
||||||
t.Fatalf(
|
|
||||||
"midpoint should grow: attempt %d midpoint=%v prev=%v",
|
|
||||||
attempt, midpoint, prev,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
prev = midpoint
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBackoffDurationCappedAtMax(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
cfg := notify.RetryConfig{
|
|
||||||
MaxRetries: 5,
|
|
||||||
BaseDelay: 1 * time.Second,
|
|
||||||
MaxDelay: 5 * time.Second,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempt 10 would be 1024s without capping.
|
|
||||||
d := cfg.BackoffDuration(10)
|
|
||||||
|
|
||||||
// With ±25% jitter on a 5s cap: max is 6.25s.
|
|
||||||
const maxWithJitter = 5*time.Second +
|
|
||||||
5*time.Second/4 +
|
|
||||||
time.Millisecond // rounding margin
|
|
||||||
|
|
||||||
if d > maxWithJitter {
|
|
||||||
t.Errorf(
|
|
||||||
"backoff %v exceeds max+jitter %v",
|
|
||||||
d, maxWithJitter,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── deliverWithRetry ──────────────────────────────────────
|
|
||||||
|
|
||||||
func TestDeliverWithRetrySucceedsFirstAttempt(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
svc := notify.NewTestService(http.DefaultTransport)
|
|
||||||
svc.SetSleepFunc(instantSleep)
|
|
||||||
|
|
||||||
var calls atomic.Int32
|
|
||||||
|
|
||||||
err := svc.DeliverWithRetry(
|
|
||||||
context.Background(), "test",
|
|
||||||
func(_ context.Context) error {
|
|
||||||
calls.Add(1)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if calls.Load() != 1 {
|
|
||||||
t.Errorf("expected 1 call, got %d", calls.Load())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDeliverWithRetryRetriesOnFailure(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
svc := notify.NewTestService(http.DefaultTransport)
|
|
||||||
svc.SetSleepFunc(instantSleep)
|
|
||||||
svc.SetRetryConfig(notify.RetryConfig{
|
|
||||||
MaxRetries: 3,
|
|
||||||
BaseDelay: time.Millisecond,
|
|
||||||
MaxDelay: 10 * time.Millisecond,
|
|
||||||
})
|
|
||||||
|
|
||||||
var calls atomic.Int32
|
|
||||||
|
|
||||||
// Fail twice, then succeed on the third attempt.
|
|
||||||
err := svc.DeliverWithRetry(
|
|
||||||
context.Background(), "test",
|
|
||||||
func(_ context.Context) error {
|
|
||||||
n := calls.Add(1)
|
|
||||||
if n <= 2 {
|
|
||||||
return errTransient
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("expected success after retries: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if calls.Load() != 3 {
|
|
||||||
t.Errorf("expected 3 calls, got %d", calls.Load())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDeliverWithRetryExhaustsAttempts(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
svc := notify.NewTestService(http.DefaultTransport)
|
|
||||||
svc.SetSleepFunc(instantSleep)
|
|
||||||
svc.SetRetryConfig(notify.RetryConfig{
|
|
||||||
MaxRetries: 2,
|
|
||||||
BaseDelay: time.Millisecond,
|
|
||||||
MaxDelay: 10 * time.Millisecond,
|
|
||||||
})
|
|
||||||
|
|
||||||
var calls atomic.Int32
|
|
||||||
|
|
||||||
err := svc.DeliverWithRetry(
|
|
||||||
context.Background(), "test",
|
|
||||||
func(_ context.Context) error {
|
|
||||||
calls.Add(1)
|
|
||||||
|
|
||||||
return errPermanent
|
|
||||||
},
|
|
||||||
)
|
|
||||||
if err == nil {
|
|
||||||
t.Fatal("expected error when all retries exhausted")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !errors.Is(err, errPermanent) {
|
|
||||||
t.Errorf("expected permanent failure, got: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 1 initial + 2 retries = 3 total.
|
|
||||||
if calls.Load() != 3 {
|
|
||||||
t.Errorf("expected 3 calls, got %d", calls.Load())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDeliverWithRetryRespectsContextCancellation(
|
|
||||||
t *testing.T,
|
|
||||||
) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
svc := notify.NewTestService(http.DefaultTransport)
|
|
||||||
svc.SetRetryConfig(notify.RetryConfig{
|
|
||||||
MaxRetries: 5,
|
|
||||||
BaseDelay: time.Millisecond,
|
|
||||||
MaxDelay: 10 * time.Millisecond,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Use a blocking sleep so the context cancellation is
|
|
||||||
// the only way out.
|
|
||||||
svc.SetSleepFunc(func(_ time.Duration) <-chan time.Time {
|
|
||||||
return make(chan time.Time) // never fires
|
|
||||||
})
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
|
|
||||||
done := make(chan error, 1)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
done <- svc.DeliverWithRetry(
|
|
||||||
ctx, "test",
|
|
||||||
func(_ context.Context) error {
|
|
||||||
return errFail
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Wait for the first failure + retry sleep to be
|
|
||||||
// entered, then cancel.
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
cancel()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err := <-done:
|
|
||||||
if !errors.Is(err, context.Canceled) {
|
|
||||||
t.Errorf(
|
|
||||||
"expected context.Canceled, got: %v", err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
case <-time.After(2 * time.Second):
|
|
||||||
t.Fatal("deliverWithRetry did not return after cancel")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── integration: SendNotification with retry ──────────────
|
|
||||||
|
|
||||||
func TestSendNotificationRetriesTransientFailure(
|
|
||||||
t *testing.T,
|
|
||||||
) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
var (
|
|
||||||
mu sync.Mutex
|
|
||||||
attempts int
|
|
||||||
)
|
|
||||||
|
|
||||||
srv := httptest.NewServer(
|
|
||||||
http.HandlerFunc(
|
|
||||||
func(w http.ResponseWriter, _ *http.Request) {
|
|
||||||
mu.Lock()
|
|
||||||
attempts++
|
|
||||||
n := attempts
|
|
||||||
mu.Unlock()
|
|
||||||
|
|
||||||
if n <= 2 {
|
|
||||||
w.WriteHeader(
|
|
||||||
http.StatusInternalServerError,
|
|
||||||
)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
svc := newRetryTestService(srv.URL, "ntfy")
|
|
||||||
|
|
||||||
svc.SendNotification(
|
|
||||||
context.Background(),
|
|
||||||
"Retry Test", "body", "warning",
|
|
||||||
)
|
|
||||||
|
|
||||||
waitForCondition(t, func() bool {
|
|
||||||
mu.Lock()
|
|
||||||
defer mu.Unlock()
|
|
||||||
|
|
||||||
return attempts >= 3
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// newRetryTestService creates a test service with instant
|
|
||||||
// sleep and low retry delays for the named endpoint.
|
|
||||||
func newRetryTestService(
|
|
||||||
rawURL, endpoint string,
|
|
||||||
) *notify.Service {
|
|
||||||
svc := notify.NewTestService(http.DefaultTransport)
|
|
||||||
svc.SetSleepFunc(instantSleep)
|
|
||||||
svc.SetRetryConfig(notify.RetryConfig{
|
|
||||||
MaxRetries: 3,
|
|
||||||
BaseDelay: time.Millisecond,
|
|
||||||
MaxDelay: 10 * time.Millisecond,
|
|
||||||
})
|
|
||||||
|
|
||||||
u, _ := url.Parse(rawURL)
|
|
||||||
|
|
||||||
switch endpoint {
|
|
||||||
case "ntfy":
|
|
||||||
svc.SetNtfyURL(u)
|
|
||||||
case "slack":
|
|
||||||
svc.SetSlackWebhookURL(u)
|
|
||||||
case "mattermost":
|
|
||||||
svc.SetMattermostWebhookURL(u)
|
|
||||||
}
|
|
||||||
|
|
||||||
return svc
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSendNotificationAllEndpointsRetrySetup(
|
|
||||||
t *testing.T,
|
|
||||||
) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
result := newEndpointRetryResult()
|
|
||||||
ntfySrv, slackSrv, mmSrv := newRetryServers(result)
|
|
||||||
|
|
||||||
defer ntfySrv.Close()
|
|
||||||
defer slackSrv.Close()
|
|
||||||
defer mmSrv.Close()
|
|
||||||
|
|
||||||
svc := buildAllEndpointRetryService(
|
|
||||||
ntfySrv.URL, slackSrv.URL, mmSrv.URL,
|
|
||||||
)
|
|
||||||
|
|
||||||
svc.SendNotification(
|
|
||||||
context.Background(),
|
|
||||||
"Multi-Retry", "testing", "error",
|
|
||||||
)
|
|
||||||
|
|
||||||
assertAllEndpointsRetried(t, result)
|
|
||||||
}
|
|
||||||
|
|
||||||
// endpointRetryResult tracks per-endpoint retry state.
|
|
||||||
type endpointRetryResult struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
ntfyAttempts int
|
|
||||||
slackAttempts int
|
|
||||||
mmAttempts int
|
|
||||||
ntfyOK bool
|
|
||||||
slackOK bool
|
|
||||||
mmOK bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func newEndpointRetryResult() *endpointRetryResult {
|
|
||||||
return &endpointRetryResult{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newRetryServers(
|
|
||||||
r *endpointRetryResult,
|
|
||||||
) (*httptest.Server, *httptest.Server, *httptest.Server) {
|
|
||||||
mk := func(
|
|
||||||
attempts *int, ok *bool,
|
|
||||||
) *httptest.Server {
|
|
||||||
return httptest.NewServer(
|
|
||||||
http.HandlerFunc(
|
|
||||||
func(w http.ResponseWriter, _ *http.Request) {
|
|
||||||
r.mu.Lock()
|
|
||||||
*attempts++
|
|
||||||
n := *attempts
|
|
||||||
r.mu.Unlock()
|
|
||||||
|
|
||||||
if n == 1 {
|
|
||||||
w.WriteHeader(
|
|
||||||
http.StatusServiceUnavailable,
|
|
||||||
)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
r.mu.Lock()
|
|
||||||
*ok = true
|
|
||||||
r.mu.Unlock()
|
|
||||||
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
return mk(&r.ntfyAttempts, &r.ntfyOK),
|
|
||||||
mk(&r.slackAttempts, &r.slackOK),
|
|
||||||
mk(&r.mmAttempts, &r.mmOK)
|
|
||||||
}
|
|
||||||
|
|
||||||
func buildAllEndpointRetryService(
|
|
||||||
ntfyURL, slackURL, mmURL string,
|
|
||||||
) *notify.Service {
|
|
||||||
svc := notify.NewTestService(http.DefaultTransport)
|
|
||||||
svc.SetSleepFunc(instantSleep)
|
|
||||||
svc.SetRetryConfig(notify.RetryConfig{
|
|
||||||
MaxRetries: 3,
|
|
||||||
BaseDelay: time.Millisecond,
|
|
||||||
MaxDelay: 10 * time.Millisecond,
|
|
||||||
})
|
|
||||||
|
|
||||||
nu, _ := url.Parse(ntfyURL)
|
|
||||||
su, _ := url.Parse(slackURL)
|
|
||||||
mu, _ := url.Parse(mmURL)
|
|
||||||
|
|
||||||
svc.SetNtfyURL(nu)
|
|
||||||
svc.SetSlackWebhookURL(su)
|
|
||||||
svc.SetMattermostWebhookURL(mu)
|
|
||||||
|
|
||||||
return svc
|
|
||||||
}
|
|
||||||
|
|
||||||
func assertAllEndpointsRetried(
|
|
||||||
t *testing.T,
|
|
||||||
r *endpointRetryResult,
|
|
||||||
) {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
waitForCondition(t, func() bool {
|
|
||||||
r.mu.Lock()
|
|
||||||
defer r.mu.Unlock()
|
|
||||||
|
|
||||||
return r.ntfyOK && r.slackOK && r.mmOK
|
|
||||||
})
|
|
||||||
|
|
||||||
r.mu.Lock()
|
|
||||||
defer r.mu.Unlock()
|
|
||||||
|
|
||||||
if r.ntfyAttempts < 2 {
|
|
||||||
t.Errorf(
|
|
||||||
"ntfy: expected >= 2 attempts, got %d",
|
|
||||||
r.ntfyAttempts,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
if r.slackAttempts < 2 {
|
|
||||||
t.Errorf(
|
|
||||||
"slack: expected >= 2 attempts, got %d",
|
|
||||||
r.slackAttempts,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
if r.mmAttempts < 2 {
|
|
||||||
t.Errorf(
|
|
||||||
"mattermost: expected >= 2 attempts, got %d",
|
|
||||||
r.mmAttempts,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSendNotificationPermanentFailureLogsError(
|
|
||||||
t *testing.T,
|
|
||||||
) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
var (
|
|
||||||
mu sync.Mutex
|
|
||||||
attempts int
|
|
||||||
)
|
|
||||||
|
|
||||||
srv := httptest.NewServer(
|
|
||||||
http.HandlerFunc(
|
|
||||||
func(w http.ResponseWriter, _ *http.Request) {
|
|
||||||
mu.Lock()
|
|
||||||
attempts++
|
|
||||||
mu.Unlock()
|
|
||||||
|
|
||||||
w.WriteHeader(
|
|
||||||
http.StatusInternalServerError,
|
|
||||||
)
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
svc := newRetryTestService(srv.URL, "slack")
|
|
||||||
svc.SetRetryConfig(notify.RetryConfig{
|
|
||||||
MaxRetries: 2,
|
|
||||||
BaseDelay: time.Millisecond,
|
|
||||||
MaxDelay: 10 * time.Millisecond,
|
|
||||||
})
|
|
||||||
|
|
||||||
svc.SendNotification(
|
|
||||||
context.Background(),
|
|
||||||
"Permanent Fail", "body", "error",
|
|
||||||
)
|
|
||||||
|
|
||||||
// 1 initial + 2 retries = 3 total.
|
|
||||||
waitForCondition(t, func() bool {
|
|
||||||
mu.Lock()
|
|
||||||
defer mu.Unlock()
|
|
||||||
|
|
||||||
return attempts >= 3
|
|
||||||
})
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user