All checks were successful
check / check (push) Successful in 2m19s
Security: - Add channel membership check before PRIVMSG (prevents non-members from sending) - Add membership check on history endpoint (channels require membership, DMs scoped to own nick) - Enforce MaxBytesReader on all POST request bodies - Fix rand.Read error being silently ignored in token generation Data integrity: - Fix TOCTOU race in GetOrCreateChannel using INSERT OR IGNORE + SELECT Build: - Add CGO_ENABLED=0 to golangci-lint install in Dockerfile (fixes alpine build) Linting: - Strict .golangci.yml: only wsl disabled (deprecated in v2) - Re-enable exhaustruct, depguard, godot, wrapcheck, varnamelen - Fix linters-settings -> linters.settings for v2 config format - Fix ALL lint findings in actual code (no linter config weakening) - Wrap all external package errors (wrapcheck) - Fill struct fields or add targeted nolint:exhaustruct where appropriate - Rename short variables (ts->timestamp, n->bufIndex, etc.) - Add depguard deny policy for io/ioutil and math/rand - Exclude G704 (SSRF) in gosec config (CLI client takes user-configured URLs) Tests: - Add security tests (TestNonMemberCannotSend, TestHistoryNonMember) - Split TestInsertAndPollMessages for reduced complexity - Fix parallel test safety (viper global state prevents parallelism) - Use t.Context() instead of context.Background() in tests Docker build verified passing locally.
74 lines
1.4 KiB
Go
74 lines
1.4 KiB
Go
// Package broker provides an in-memory pub/sub for long-poll notifications.
|
|
package broker
|
|
|
|
import (
|
|
"sync"
|
|
)
|
|
|
|
// Broker notifies waiting clients when new messages are available.
|
|
type Broker struct {
|
|
mu sync.Mutex
|
|
listeners map[int64][]chan struct{}
|
|
}
|
|
|
|
// New creates a new Broker.
|
|
func New() *Broker {
|
|
return &Broker{ //nolint:exhaustruct // mu has zero-value default
|
|
listeners: make(map[int64][]chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Wait returns a channel that will be closed when a message
|
|
// is available for the user.
|
|
func (b *Broker) Wait(userID int64) chan struct{} {
|
|
waitCh := make(chan struct{}, 1)
|
|
|
|
b.mu.Lock()
|
|
b.listeners[userID] = append(
|
|
b.listeners[userID], waitCh,
|
|
)
|
|
b.mu.Unlock()
|
|
|
|
return waitCh
|
|
}
|
|
|
|
// Notify wakes up all waiting clients for a user.
|
|
func (b *Broker) Notify(userID int64) {
|
|
b.mu.Lock()
|
|
waiters := b.listeners[userID]
|
|
delete(b.listeners, userID)
|
|
b.mu.Unlock()
|
|
|
|
for _, waiter := range waiters {
|
|
select {
|
|
case waiter <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// Remove removes a specific wait channel (for cleanup on timeout).
|
|
func (b *Broker) Remove(
|
|
userID int64,
|
|
waitCh chan struct{},
|
|
) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
waiters := b.listeners[userID]
|
|
|
|
for i, waiter := range waiters {
|
|
if waiter == waitCh {
|
|
b.listeners[userID] = append(
|
|
waiters[:i], waiters[i+1:]...,
|
|
)
|
|
|
|
break
|
|
}
|
|
}
|
|
|
|
if len(b.listeners[userID]) == 0 {
|
|
delete(b.listeners, userID)
|
|
}
|
|
}
|