- Fix .golangci.yml for v2 format (linters-settings -> linters.settings) - All production code now passes golangci-lint with zero issues - Line length 88, funlen 80/50, cyclop 15, dupl 100 - Extract shared helpers in db (scanChannels, scanInt64s, scanMessages) - Split runMigrations into applyMigration/execMigration - Fix fanOut return signature (remove unused int64) - Add fanOutSilent helper to avoid dogsled - Rewrite CLI code for lint compliance (nlreturn, wsl_v5, noctx, etc) - Rename CLI api package to chatapi to avoid revive var-naming - Fix all noinlineerr, mnd, perfsprint, funcorder issues - Fix db tests: extract helpers, add t.Parallel, proper error checks - Broker tests already clean - Handler integration tests still have lint issues (next commit)
66 lines
1.3 KiB
Go
66 lines
1.3 KiB
Go
// Package broker provides an in-memory pub/sub for long-poll notifications.
|
|
package broker
|
|
|
|
import (
|
|
"sync"
|
|
)
|
|
|
|
// Broker notifies waiting clients when new messages are available.
|
|
type Broker struct {
|
|
mu sync.Mutex
|
|
listeners map[int64][]chan struct{} // userID -> list of waiting channels
|
|
}
|
|
|
|
// New creates a new Broker.
|
|
func New() *Broker {
|
|
return &Broker{
|
|
listeners: make(map[int64][]chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Wait returns a channel that will be closed when a message is available for the user.
|
|
func (b *Broker) Wait(userID int64) chan struct{} {
|
|
ch := make(chan struct{}, 1)
|
|
|
|
b.mu.Lock()
|
|
b.listeners[userID] = append(b.listeners[userID], ch)
|
|
b.mu.Unlock()
|
|
|
|
return ch
|
|
}
|
|
|
|
// Notify wakes up all waiting clients for a user.
|
|
func (b *Broker) Notify(userID int64) {
|
|
b.mu.Lock()
|
|
waiters := b.listeners[userID]
|
|
delete(b.listeners, userID)
|
|
b.mu.Unlock()
|
|
|
|
for _, ch := range waiters {
|
|
select {
|
|
case ch <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// Remove removes a specific wait channel (for cleanup on timeout).
|
|
func (b *Broker) Remove(userID int64, ch chan struct{}) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
waiters := b.listeners[userID]
|
|
|
|
for i, w := range waiters {
|
|
if w == ch {
|
|
b.listeners[userID] = append(waiters[:i], waiters[i+1:]...)
|
|
|
|
break
|
|
}
|
|
}
|
|
|
|
if len(b.listeners[userID]) == 0 {
|
|
delete(b.listeners, userID)
|
|
}
|
|
}
|