Implement queue backpressure with gradual message dropping
- Add gradual message dropping based on queue utilization - Start dropping messages at 50% queue capacity - Drop rate increases linearly from 0% at 50% to 100% at full - Uses random drops to maintain fair distribution - Helps prevent queue overflow under high load
This commit is contained in:
parent
b6ad50f23f
commit
8e12c07396
@ -8,6 +8,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
@ -29,6 +30,10 @@ const (
|
|||||||
bytesPerKB = 1024
|
bytesPerKB = 1024
|
||||||
bytesPerMB = 1024 * 1024
|
bytesPerMB = 1024 * 1024
|
||||||
maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers
|
maxConcurrentHandlers = 800 // Maximum number of concurrent message handlers
|
||||||
|
|
||||||
|
// Backpressure constants
|
||||||
|
backpressureThreshold = 0.5 // Start dropping at 50% queue utilization
|
||||||
|
backpressureSlope = 2.0 // Slope for linear drop probability increase
|
||||||
)
|
)
|
||||||
|
|
||||||
// MessageHandler is an interface for handling RIS messages
|
// MessageHandler is an interface for handling RIS messages
|
||||||
@ -74,7 +79,8 @@ type Streamer struct {
|
|||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
running bool
|
running bool
|
||||||
metrics *metrics.Tracker
|
metrics *metrics.Tracker
|
||||||
totalDropped uint64 // Total dropped messages across all handlers
|
totalDropped uint64 // Total dropped messages across all handlers
|
||||||
|
random *rand.Rand // Random number generator for backpressure drops
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new RIS streamer
|
// New creates a new RIS streamer
|
||||||
@ -86,6 +92,8 @@ func New(logger *logger.Logger, metrics *metrics.Tracker) *Streamer {
|
|||||||
},
|
},
|
||||||
handlers: make([]*handlerInfo, 0),
|
handlers: make([]*handlerInfo, 0),
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
|
//nolint:gosec // Non-cryptographic randomness is fine for backpressure
|
||||||
|
random: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -526,15 +534,26 @@ func (s *Streamer) stream(ctx context.Context) error {
|
|||||||
// Dispatch to interested handlers
|
// Dispatch to interested handlers
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
for _, info := range s.handlers {
|
for _, info := range s.handlers {
|
||||||
if info.handler.WantsMessage(msg.Type) {
|
if !info.handler.WantsMessage(msg.Type) {
|
||||||
select {
|
continue
|
||||||
case info.queue <- &msg:
|
}
|
||||||
// Message queued successfully
|
|
||||||
default:
|
// Check if we should drop due to backpressure
|
||||||
// Queue is full, drop the message
|
if s.shouldDropForBackpressure(info) {
|
||||||
atomic.AddUint64(&info.metrics.droppedCount, 1)
|
atomic.AddUint64(&info.metrics.droppedCount, 1)
|
||||||
atomic.AddUint64(&s.totalDropped, 1)
|
atomic.AddUint64(&s.totalDropped, 1)
|
||||||
}
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to queue the message
|
||||||
|
select {
|
||||||
|
case info.queue <- &msg:
|
||||||
|
// Message queued successfully
|
||||||
|
default:
|
||||||
|
// Queue is full, drop the message
|
||||||
|
atomic.AddUint64(&info.metrics.droppedCount, 1)
|
||||||
|
atomic.AddUint64(&s.totalDropped, 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.mu.RUnlock()
|
s.mu.RUnlock()
|
||||||
@ -546,3 +565,25 @@ func (s *Streamer) stream(ctx context.Context) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// shouldDropForBackpressure determines if a message should be dropped based on queue utilization
|
||||||
|
func (s *Streamer) shouldDropForBackpressure(info *handlerInfo) bool {
|
||||||
|
// Calculate queue utilization
|
||||||
|
queueLen := len(info.queue)
|
||||||
|
queueCap := cap(info.queue)
|
||||||
|
utilization := float64(queueLen) / float64(queueCap)
|
||||||
|
|
||||||
|
// No drops below threshold
|
||||||
|
if utilization < backpressureThreshold {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate drop probability (0.0 at threshold, 1.0 at 100% full)
|
||||||
|
dropProbability := (utilization - backpressureThreshold) * backpressureSlope
|
||||||
|
if dropProbability > 1.0 {
|
||||||
|
dropProbability = 1.0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Random drop based on probability
|
||||||
|
return s.random.Float64() < dropProbability
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user