package broker_test import ( "sync" "testing" "time" "git.eeqj.de/sneak/chat/internal/broker" ) func TestNewBroker(t *testing.T) { t.Parallel() brk := broker.New() if brk == nil { t.Fatal("expected non-nil broker") } } func TestWaitAndNotify(t *testing.T) { t.Parallel() brk := broker.New() waitCh := brk.Wait(1) go func() { time.Sleep(10 * time.Millisecond) brk.Notify(1) }() select { case <-waitCh: case <-time.After(2 * time.Second): t.Fatal("timeout") } } func TestNotifyWithoutWaiters(t *testing.T) { t.Parallel() brk := broker.New() brk.Notify(42) // should not panic. } func TestRemove(t *testing.T) { t.Parallel() brk := broker.New() waitCh := brk.Wait(1) brk.Remove(1, waitCh) brk.Notify(1) select { case <-waitCh: t.Fatal("should not receive after remove") case <-time.After(50 * time.Millisecond): } } func TestMultipleWaiters(t *testing.T) { t.Parallel() brk := broker.New() waitCh1 := brk.Wait(1) waitCh2 := brk.Wait(1) brk.Notify(1) select { case <-waitCh1: case <-time.After(time.Second): t.Fatal("ch1 timeout") } select { case <-waitCh2: case <-time.After(time.Second): t.Fatal("ch2 timeout") } } func TestConcurrentWaitNotify(t *testing.T) { t.Parallel() brk := broker.New() var waitGroup sync.WaitGroup const concurrency = 100 for idx := range concurrency { waitGroup.Add(1) go func(uid int64) { defer waitGroup.Done() waitCh := brk.Wait(uid) brk.Notify(uid) select { case <-waitCh: case <-time.After(time.Second): t.Error("timeout") } }(int64(idx % 10)) } waitGroup.Wait() } func TestRemoveNonexistent(t *testing.T) { t.Parallel() brk := broker.New() waitCh := make(chan struct{}, 1) brk.Remove(999, waitCh) // should not panic. }