package broker import ( "sync" "testing" "time" ) func TestNewBroker(t *testing.T) { b := New() if b == nil { t.Fatal("expected non-nil broker") } } func TestWaitAndNotify(t *testing.T) { b := New() ch := b.Wait(1) go func() { time.Sleep(10 * time.Millisecond) b.Notify(1) }() select { case <-ch: case <-time.After(2 * time.Second): t.Fatal("timeout") } } func TestNotifyWithoutWaiters(t *testing.T) { b := New() b.Notify(42) // should not panic } func TestRemove(t *testing.T) { b := New() ch := b.Wait(1) b.Remove(1, ch) b.Notify(1) select { case <-ch: t.Fatal("should not receive after remove") case <-time.After(50 * time.Millisecond): } } func TestMultipleWaiters(t *testing.T) { b := New() ch1 := b.Wait(1) ch2 := b.Wait(1) b.Notify(1) select { case <-ch1: case <-time.After(time.Second): t.Fatal("ch1 timeout") } select { case <-ch2: case <-time.After(time.Second): t.Fatal("ch2 timeout") } } func TestConcurrentWaitNotify(t *testing.T) { b := New() var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func(uid int64) { defer wg.Done() ch := b.Wait(uid) b.Notify(uid) select { case <-ch: case <-time.After(time.Second): t.Error("timeout") } }(int64(i % 10)) } wg.Wait() } func TestRemoveNonexistent(t *testing.T) { b := New() ch := make(chan struct{}, 1) b.Remove(999, ch) // should not panic }