Files
chat/internal/broker/broker.go
clawbot 1f54b281fd MVP: IRC envelope format, long-polling, per-client queues, SPA rewrite
Major changes:
- Consolidated schema into single migration with IRC envelope format
- Messages table stores command/from/to/body(JSON)/meta(JSON) per spec
- Per-client delivery queues (client_queues table) with fan-out
- In-memory broker for long-poll notifications (no busy polling)
- GET /messages supports ?after=<queue_id>&timeout=15 long-polling
- All commands (JOIN/PART/NICK/TOPIC/QUIT/PING) broadcast events
- Channels are ephemeral (deleted when last member leaves)
- PRIVMSG to nicks (DMs) fan out to both sender and recipient
- SPA rewritten in vanilla JS (no build step needed):
  - Long-poll via recursive fetch (not setInterval)
  - IRC envelope parsing with system message display
  - /nick, /join, /part, /msg, /quit commands
  - Unread indicators on inactive tabs
  - DM tabs from user list clicks
- Removed unused models package (was for UUID-based schema)
- Removed conflicting UUID-based db methods
- Increased HTTP write timeout to 60s for long-poll support
2026-02-10 18:09:10 -08:00

61 lines
1.3 KiB
Go

// Package broker provides an in-memory pub/sub for long-poll notifications.
package broker
import (
"sync"
)
// Broker notifies waiting clients when new messages are available.
type Broker struct {
mu sync.Mutex
listeners map[int64][]chan struct{} // userID -> list of waiting channels
}
// New creates a new Broker.
func New() *Broker {
return &Broker{
listeners: make(map[int64][]chan struct{}),
}
}
// Wait returns a channel that will be closed when a message is available for the user.
func (b *Broker) Wait(userID int64) chan struct{} {
ch := make(chan struct{}, 1)
b.mu.Lock()
b.listeners[userID] = append(b.listeners[userID], ch)
b.mu.Unlock()
return ch
}
// Notify wakes up all waiting clients for a user.
func (b *Broker) Notify(userID int64) {
b.mu.Lock()
waiters := b.listeners[userID]
delete(b.listeners, userID)
b.mu.Unlock()
for _, ch := range waiters {
select {
case ch <- struct{}{}:
default:
}
}
}
// Remove removes a specific wait channel (for cleanup on timeout).
func (b *Broker) Remove(userID int64, ch chan struct{}) {
b.mu.Lock()
defer b.mu.Unlock()
waiters := b.listeners[userID]
for i, w := range waiters {
if w == ch {
b.listeners[userID] = append(waiters[:i], waiters[i+1:]...)
break
}
}
if len(b.listeners[userID]) == 0 {
delete(b.listeners, userID)
}
}