// Package broker provides an in-memory pub/sub for long-poll notifications. package broker import ( "sync" ) // Broker notifies waiting clients when new messages are available. type Broker struct { mu sync.Mutex listeners map[int64][]chan struct{} } // New creates a new Broker. func New() *Broker { return &Broker{ //nolint:exhaustruct // mu has zero-value default listeners: make(map[int64][]chan struct{}), } } // Wait returns a channel that will be closed when a message // is available for the user. func (b *Broker) Wait(userID int64) chan struct{} { waitCh := make(chan struct{}, 1) b.mu.Lock() b.listeners[userID] = append( b.listeners[userID], waitCh, ) b.mu.Unlock() return waitCh } // Notify wakes up all waiting clients for a user. func (b *Broker) Notify(userID int64) { b.mu.Lock() waiters := b.listeners[userID] delete(b.listeners, userID) b.mu.Unlock() for _, waiter := range waiters { select { case waiter <- struct{}{}: default: } } } // Remove removes a specific wait channel (for cleanup on timeout). func (b *Broker) Remove( userID int64, waitCh chan struct{}, ) { b.mu.Lock() defer b.mu.Unlock() waiters := b.listeners[userID] for i, waiter := range waiters { if waiter == waitCh { b.listeners[userID] = append( waiters[:i], waiters[i+1:]..., ) break } } if len(b.listeners[userID]) == 0 { delete(b.listeners, userID) } }