All checks were successful
check / check (push) Successful in 1m52s
- Fan out all targets for an event in parallel goroutines (fire-and-forget) - Add per-target circuit breaker for retry targets (closed/open/half-open) - Circuit breaker trips after 5 consecutive failures, 30s cooldown - Open circuit skips delivery and reschedules after cooldown - Half-open allows one probe delivery to test recovery - HTTP/database/log targets unaffected (no circuit breaker) - Recovery path also fans out in parallel - Update README with parallel delivery and circuit breaker docs
163 lines
4.1 KiB
Go
163 lines
4.1 KiB
Go
package delivery
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// CircuitState represents the current state of a circuit breaker.
|
|
type CircuitState int
|
|
|
|
const (
|
|
// CircuitClosed is the normal operating state. Deliveries flow through.
|
|
CircuitClosed CircuitState = iota
|
|
// CircuitOpen means the circuit has tripped. Deliveries are skipped
|
|
// until the cooldown expires.
|
|
CircuitOpen
|
|
// CircuitHalfOpen allows a single probe delivery to test whether
|
|
// the target has recovered.
|
|
CircuitHalfOpen
|
|
)
|
|
|
|
const (
|
|
// defaultFailureThreshold is the number of consecutive failures
|
|
// before a circuit breaker trips open.
|
|
defaultFailureThreshold = 5
|
|
|
|
// defaultCooldown is how long a circuit stays open before
|
|
// transitioning to half-open for a probe delivery.
|
|
defaultCooldown = 30 * time.Second
|
|
)
|
|
|
|
// CircuitBreaker implements the circuit breaker pattern for a single
|
|
// delivery target. It tracks consecutive failures and prevents
|
|
// hammering a down target by temporarily stopping delivery attempts.
|
|
//
|
|
// States:
|
|
// - Closed (normal): deliveries flow through; consecutive failures
|
|
// are counted.
|
|
// - Open (tripped): deliveries are skipped; a cooldown timer is
|
|
// running. After the cooldown expires the state moves to HalfOpen.
|
|
// - HalfOpen (probing): one probe delivery is allowed. If it
|
|
// succeeds the circuit closes; if it fails the circuit reopens.
|
|
type CircuitBreaker struct {
|
|
mu sync.Mutex
|
|
state CircuitState
|
|
failures int
|
|
threshold int
|
|
cooldown time.Duration
|
|
lastFailure time.Time
|
|
}
|
|
|
|
// NewCircuitBreaker creates a circuit breaker with default settings.
|
|
func NewCircuitBreaker() *CircuitBreaker {
|
|
return &CircuitBreaker{
|
|
state: CircuitClosed,
|
|
threshold: defaultFailureThreshold,
|
|
cooldown: defaultCooldown,
|
|
}
|
|
}
|
|
|
|
// Allow checks whether a delivery attempt should proceed. It returns
|
|
// true if the delivery should be attempted, false if the circuit is
|
|
// open and the delivery should be skipped.
|
|
//
|
|
// When the circuit is open and the cooldown has elapsed, Allow
|
|
// transitions to half-open and permits exactly one probe delivery.
|
|
func (cb *CircuitBreaker) Allow() bool {
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
|
|
switch cb.state {
|
|
case CircuitClosed:
|
|
return true
|
|
|
|
case CircuitOpen:
|
|
// Check if cooldown has elapsed
|
|
if time.Since(cb.lastFailure) >= cb.cooldown {
|
|
cb.state = CircuitHalfOpen
|
|
return true
|
|
}
|
|
return false
|
|
|
|
case CircuitHalfOpen:
|
|
// Only one probe at a time — reject additional attempts while
|
|
// a probe is in flight. The probe goroutine will call
|
|
// RecordSuccess or RecordFailure to resolve the state.
|
|
return false
|
|
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
// CooldownRemaining returns how much time is left before an open circuit
|
|
// transitions to half-open. Returns zero if the circuit is not open or
|
|
// the cooldown has already elapsed.
|
|
func (cb *CircuitBreaker) CooldownRemaining() time.Duration {
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
|
|
if cb.state != CircuitOpen {
|
|
return 0
|
|
}
|
|
|
|
remaining := cb.cooldown - time.Since(cb.lastFailure)
|
|
if remaining < 0 {
|
|
return 0
|
|
}
|
|
return remaining
|
|
}
|
|
|
|
// RecordSuccess records a successful delivery and resets the circuit
|
|
// breaker to closed state with zero failures.
|
|
func (cb *CircuitBreaker) RecordSuccess() {
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
|
|
cb.failures = 0
|
|
cb.state = CircuitClosed
|
|
}
|
|
|
|
// RecordFailure records a failed delivery. If the failure count reaches
|
|
// the threshold, the circuit trips open.
|
|
func (cb *CircuitBreaker) RecordFailure() {
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
|
|
cb.failures++
|
|
cb.lastFailure = time.Now()
|
|
|
|
switch cb.state {
|
|
case CircuitClosed:
|
|
if cb.failures >= cb.threshold {
|
|
cb.state = CircuitOpen
|
|
}
|
|
|
|
case CircuitHalfOpen:
|
|
// Probe failed — reopen immediately
|
|
cb.state = CircuitOpen
|
|
}
|
|
}
|
|
|
|
// State returns the current circuit state. Safe for concurrent use.
|
|
func (cb *CircuitBreaker) State() CircuitState {
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
return cb.state
|
|
}
|
|
|
|
// String returns the human-readable name of a circuit state.
|
|
func (s CircuitState) String() string {
|
|
switch s {
|
|
case CircuitClosed:
|
|
return "closed"
|
|
case CircuitOpen:
|
|
return "open"
|
|
case CircuitHalfOpen:
|
|
return "half-open"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|