Compare commits

..

2 Commits

Author SHA1 Message Date
ccf08cfb67 Merge pull request 'docs: update poller to dispatcher architecture (closes #4)' (#5) from fix/update-poller-docs into main
All checks were successful
check / check (push) Successful in 9s
2026-02-28 16:31:43 +01:00
clawbot
0284ea63c0 docs: update poller to dispatcher architecture (closes #4)
All checks were successful
check / check (push) Successful in 11s
Replace flag-file + heartbeat approach with the production dispatcher
pattern: poller triages notifications and spawns isolated agents
directly via openclaw cron. Adds assignment scan for self-created
issues. Response time ~15-60s instead of ~30 min.
2026-02-28 06:29:32 -08:00

View File

@ -195,54 +195,47 @@ If your OpenClaw runs on a dedicated local machine behind NAT (like a home Mac
or Linux workstation), Gitea can't reach it directly. This is our setup — or Linux workstation), Gitea can't reach it directly. This is our setup —
OpenClaw runs on a Mac Studio on a home LAN. OpenClaw runs on a Mac Studio on a home LAN.
The solution: a Python script that polls the Gitea notifications API, triages The solution: a Python script that both polls and dispatches. It polls the Gitea
each notification, and spawns an isolated agent session per actionable item. notifications API every 15 seconds, triages each notification (checking
Response time is ~15-30 seconds. assignment and @-mentions), marks them as read, and spawns one isolated agent
session per actionable item via `openclaw cron add --session isolated`.
**Evolution note:** We originally used a flag-file approach (poller writes The poller also runs a secondary **assignment scan** every 2 minutes, checking
flag → agent checks during heartbeat → ~30 min latency). This was replaced by all watched repos for open issues/PRs assigned to the bot that were recently
the dispatcher pattern below, which is near-realtime. updated and still need a response. This catches cases where notifications aren't
generated (e.g. self-assignment, API-created issues).
Key design decisions: Key design decisions:
- **The poller IS the dispatcher.** It fetches notification details, checks - **The poller IS the dispatcher.** No flag files, no heartbeat dependency. The
whether the agent is mentioned or assigned, and spawns agents directly. poller triages notifications and spawns agents directly.
No middleman session needed. - **Marks notifications as read immediately.** Each notification is marked read
- **One agent per actionable notification.** Each spawns via as it's processed, preventing re-dispatch on the next poll.
`openclaw cron add --session isolated` with full context (API token, issue - **One agent per issue.** Each spawned agent gets a `SCOPE` instruction
URL, instructions) baked into the message. Parallel notifications get parallel limiting it to one specific issue/PR. Agents post results as Gitea comments,
agents. not DMs.
- **Marks notifications as read immediately.** Prevents re-processing. The - **Dedup tracking.** An in-memory `dispatched_issues` set prevents spawning
agent's job is to respond, not to manage notification state. multiple agents for the same issue within one poller lifetime.
- **Tracks notification IDs, not counts.** Only fires on genuinely new - **`--no-deliver` instead of `--announce`.** Agents report via Gitea API
notifications, not re-reads of existing ones. directly. The `--announce` flag on isolated sessions had delivery failures.
- **Triage before dispatch.** Not every notification is actionable. The poller - **Assignment scan filters by recency.** Only issues updated in the last 5
checks: is the agent @-mentioned (in issue body or latest comment)? Is the minutes are considered, preventing re-dispatch for stale assigned issues.
issue/PR assigned to the agent? Is the agent's comment already the latest
(no response needed)?
- **Assignment scan as backup.** A secondary loop periodically scans watched
repos for open issues assigned to the agent that were recently updated but
have no agent response. This catches cases where notifications aren't
generated (API-created issues, self-assignment).
- **Strict scope enforcement.** Each spawned agent's prompt includes a SCOPE
constraint: "You are responsible for ONLY this issue. Do NOT touch any other
issues or PRs." This prevents rogue agents from creating unauthorized work.
- **Priority rule.** Agent prompts explicitly state that the user's instructions
in the issue override all boilerplate rules (e.g., if the user asks for a DM
response, the agent should DM).
- **Zero dependencies.** Just Python stdlib. Runs anywhere. - **Zero dependencies.** Just Python stdlib. Runs anywhere.
Response time: ~1560s from notification to agent comment (vs ~30 min with the
old heartbeat approach).
```python ```python
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Gitea notification poller + dispatcher. Gitea notification poller + dispatcher.
Two polling loops: Two polling loops:
1. Notification-based: detects new notifications (mentions, assignments) 1. Notification-based: detects new notifications (mentions, assignments by
and dispatches agents for actionable ones. other users) and dispatches agents for actionable ones.
2. Assignment-based: periodically checks for open issues/PRs assigned to 2. Assignment-based: periodically checks for open issues/PRs assigned to
the agent that have no recent response. Catches cases where the bot that have no recent bot comment. Catches cases where
notifications aren't generated. notifications aren't generated (e.g. self-assignment, API-created issues).
Required env vars: Required env vars:
GITEA_URL - Gitea instance URL GITEA_URL - Gitea instance URL
@ -269,30 +262,19 @@ POLL_DELAY = int(os.environ.get("POLL_DELAY", "15"))
COOLDOWN = int(os.environ.get("COOLDOWN", "30")) COOLDOWN = int(os.environ.get("COOLDOWN", "30"))
ASSIGNMENT_INTERVAL = int(os.environ.get("ASSIGNMENT_INTERVAL", "120")) ASSIGNMENT_INTERVAL = int(os.environ.get("ASSIGNMENT_INTERVAL", "120"))
OPENCLAW_BIN = os.environ.get("OPENCLAW_BIN", "/opt/homebrew/bin/openclaw") OPENCLAW_BIN = os.environ.get("OPENCLAW_BIN", "/opt/homebrew/bin/openclaw")
BOT_USER = "clawbot" # Change to your bot's Gitea username
# Mattermost channel for status updates (customize to your setup)
GIT_CHANNEL = "channel:YOUR_GIT_CHANNEL_ID"
# Repos to scan for assigned issues # Repos to scan for assigned issues
WATCHED_REPOS = [ WATCHED_REPOS = [
"your-org/repo1", # "org/repo1",
"your-org/repo2", # "org/repo2",
] ]
# Track dispatched issues to prevent duplicates # Track dispatched issues to prevent duplicates
dispatched_issues = set() dispatched_issues = set()
BOT_USERNAME = "your-bot-username" # e.g. "clawbot"
def check_config():
if not GITEA_URL or not GITEA_TOKEN:
print("ERROR: GITEA_URL and GITEA_TOKEN required", file=sys.stderr)
sys.exit(1)
def gitea_api(method, path, data=None): def gitea_api(method, path, data=None):
"""Call Gitea API, return parsed JSON or None on error."""
url = f"{GITEA_URL}/api/v1{path}" url = f"{GITEA_URL}/api/v1{path}"
body = json.dumps(data).encode() if data else None body = json.dumps(data).encode() if data else None
headers = {"Authorization": f"token {GITEA_TOKEN}"} headers = {"Authorization": f"token {GITEA_TOKEN}"}
@ -304,38 +286,24 @@ def gitea_api(method, path, data=None):
raw = resp.read() raw = resp.read()
return json.loads(raw) if raw else None return json.loads(raw) if raw else None
except Exception as e: except Exception as e:
print(f"WARN: Gitea API {method} {path}: {e}", print(f"WARN: {method} {path}: {e}", file=sys.stderr, flush=True)
file=sys.stderr, flush=True)
return None return None
def get_unread_notifications():
result = gitea_api("GET", "/notifications?status-types=unread")
return result if isinstance(result, list) else []
def mark_notification_read(notif_id):
gitea_api("PATCH", f"/notifications/threads/{notif_id}")
def needs_bot_response(repo_full, issue_number): def needs_bot_response(repo_full, issue_number):
"""True if bot is NOT the author of the most recent comment.""" """True if the bot is NOT the author of the most recent comment."""
comments = gitea_api( comments = gitea_api("GET", f"/repos/{repo_full}/issues/{issue_number}/comments")
"GET", f"/repos/{repo_full}/issues/{issue_number}/comments"
)
if comments and len(comments) > 0: if comments and len(comments) > 0:
if comments[-1].get("user", {}).get("login") == BOT_USERNAME: if comments[-1].get("user", {}).get("login") == BOT_USER:
return False return False
return True return True
def is_actionable_notification(notif): def is_actionable(notif):
"""Check if a notification needs agent action. """Returns (actionable, reason, issue_number)."""
Returns (actionable, reason, issue_number)."""
subject = notif.get("subject", {}) subject = notif.get("subject", {})
repo = notif.get("repository", {}) repo = notif.get("repository", {})
repo_full = repo.get("full_name", "") repo_full = repo.get("full_name", "")
url = subject.get("url", "") url = subject.get("url", "")
number = url.rstrip("/").split("/")[-1] if url else "" number = url.rstrip("/").split("/")[-1] if url else ""
if not number or not number.isdigit(): if not number or not number.isdigit():
@ -345,38 +313,29 @@ def is_actionable_notification(notif):
if not issue: if not issue:
return False, "couldn't fetch issue", number return False, "couldn't fetch issue", number
# Check assignment
assignees = [a.get("login") for a in (issue.get("assignees") or [])] assignees = [a.get("login") for a in (issue.get("assignees") or [])]
if BOT_USERNAME in assignees: if BOT_USER in assignees:
if needs_bot_response(repo_full, number): if needs_bot_response(repo_full, number):
return True, f"assigned to {BOT_USERNAME}", number return True, f"assigned to {BOT_USER}", number
return False, "assigned but already responded", number return False, "assigned but already responded", number
# Check issue body for @mention
issue_body = issue.get("body", "") or "" issue_body = issue.get("body", "") or ""
issue_author = issue.get("user", {}).get("login", "") if f"@{BOT_USER}" in issue_body and issue.get("user", {}).get("login") != BOT_USER:
if f"@{BOT_USERNAME}" in issue_body and issue_author != BOT_USERNAME:
if needs_bot_response(repo_full, number): if needs_bot_response(repo_full, number):
return True, f"@-mentioned in body by {issue_author}", number return True, f"@-mentioned in body", number
# Check latest comment for @mention comments = gitea_api("GET", f"/repos/{repo_full}/issues/{number}/comments")
comments = gitea_api(
"GET", f"/repos/{repo_full}/issues/{number}/comments"
)
if comments: if comments:
last = comments[-1] last = comments[-1]
author = last.get("user", {}).get("login", "") if last.get("user", {}).get("login") == BOT_USER:
body = last.get("body", "") or ""
if author == BOT_USERNAME:
return False, "own comment is latest", number return False, "own comment is latest", number
if f"@{BOT_USERNAME}" in body: if f"@{BOT_USER}" in (last.get("body") or ""):
return True, f"@-mentioned in comment by {author}", number return True, f"@-mentioned in comment", number
return False, "not mentioned or assigned", number return False, "not mentioned or assigned", number
def spawn_agent(repo_full, issue_number, title, subject_type, reason): def spawn_agent(repo_full, issue_number, title, subject_type, reason):
"""Spawn an isolated agent to handle one issue/PR."""
dispatch_key = f"{repo_full}#{issue_number}" dispatch_key = f"{repo_full}#{issue_number}"
if dispatch_key in dispatched_issues: if dispatch_key in dispatched_issues:
return return
@ -384,140 +343,72 @@ def spawn_agent(repo_full, issue_number, title, subject_type, reason):
repo_short = repo_full.split("/")[-1] repo_short = repo_full.split("/")[-1]
job_name = f"gitea-{repo_short}-{issue_number}-{int(time.time())}" job_name = f"gitea-{repo_short}-{issue_number}-{int(time.time())}"
# Build agent prompt with full context
msg = ( msg = (
f"Gitea notification: {reason} on {subject_type} #{issue_number} " f"Gitea: {reason} on {subject_type} #{issue_number} "
f"'{title}' in {repo_full}.\n\n" f"'{title}' in {repo_full}.\n"
f"Gitea API base: {GITEA_URL}/api/v1\n" f"API: {GITEA_URL}/api/v1 | Token: {GITEA_TOKEN}\n"
f"Gitea token: {GITEA_TOKEN}\n\n" f"SCOPE: Only {subject_type} #{issue_number} in {repo_full}.\n"
f"SCOPE (STRICT): You are responsible for ONLY {subject_type} " f"Read all comments, do the work, post results as Gitea comments."
f"#{issue_number} in {repo_full}. Do NOT create PRs, branches, "
f"comments, or take any action on ANY other issue or PR.\n\n"
f"PRIORITY RULE: The user's instructions in the issue/PR take "
f"priority over ALL other rules. If asked to respond in DM, do so. "
f"Later instructions override earlier ones.\n\n"
f"Instructions:\n"
f"1. Read ALL existing comments on #{issue_number} via API\n"
f"2. Follow the user's instructions\n"
f"3. If code work needed: clone to $(mktemp -d), make changes, "
f"run make check, push, comment on the issue/PR\n"
f"4. Default: post work reports as Gitea comments\n"
f"5. Don't post duplicate comments if yours is already the latest"
) )
try: try:
result = subprocess.run( subprocess.run(
[ [OPENCLAW_BIN, "cron", "add",
OPENCLAW_BIN, "cron", "add", "--name", job_name, "--at", "1s",
"--name", job_name, "--message", msg, "--delete-after-run",
"--at", "1s", "--session", "isolated", "--no-deliver",
"--message", msg, "--thinking", "low", "--timeout-seconds", "300"],
"--delete-after-run",
"--session", "isolated",
"--no-deliver",
"--thinking", "low",
"--timeout-seconds", "300",
],
capture_output=True, text=True, timeout=15, capture_output=True, text=True, timeout=15,
) )
if result.returncode == 0:
print(f" → Agent spawned: {job_name}", flush=True)
else:
print(f" → Spawn failed: {result.stderr.strip()[:200]}",
flush=True)
dispatched_issues.discard(dispatch_key)
except Exception as e: except Exception as e:
print(f"Spawn error: {e}", file=sys.stderr, flush=True) print(f"Spawn error: {e}", file=sys.stderr, flush=True)
dispatched_issues.discard(dispatch_key) dispatched_issues.discard(dispatch_key)
def dispatch_notifications(notifications):
"""Triage notifications and spawn agents for actionable ones."""
for notif in notifications:
subject = notif.get("subject", {})
repo = notif.get("repository", {})
repo_full = repo.get("full_name", "")
title = subject.get("title", "")[:60]
notif_id = notif.get("id")
subject_type = subject.get("type", "").lower()
is_act, reason, issue_num = is_actionable_notification(notif)
if notif_id:
mark_notification_read(notif_id)
if is_act:
print(f" ACTIONABLE: {repo_full} #{issue_num} ({reason})",
flush=True)
spawn_agent(repo_full, issue_num, title, subject_type, reason)
else:
print(f" skip: {repo_full} #{issue_num} ({reason})", flush=True)
def scan_assigned_issues():
"""Backup scan: find assigned issues needing response."""
for repo_full in WATCHED_REPOS:
for issue_type in ["issues", "pulls"]:
items = gitea_api(
"GET",
f"/repos/{repo_full}/issues?state=open&type={issue_type}"
f"&assignee={BOT_USERNAME}&sort=updated&limit=10"
)
if not items:
continue
for item in items:
number = str(item["number"])
dispatch_key = f"{repo_full}#{number}"
if dispatch_key in dispatched_issues:
continue
if not needs_bot_response(repo_full, number):
continue
kind = "PR" if issue_type == "pulls" else "issue"
print(f" [assign-scan] {repo_full} {kind} #{number}",
flush=True)
spawn_agent(
repo_full, number, item.get("title", "")[:60],
"pull" if issue_type == "pulls" else "issue",
f"assigned to {BOT_USERNAME}"
)
def main(): def main():
check_config() print(f"Poller started (poll={POLL_DELAY}s, cooldown={COOLDOWN}s)", flush=True)
print(f"Gitea poller+dispatcher (poll={POLL_DELAY}s, " seen_ids = set(n["id"] for n in (gitea_api("GET", "/notifications?status-types=unread") or []))
f"cooldown={COOLDOWN}s, assign_scan={ASSIGNMENT_INTERVAL}s)", last_dispatch = 0
flush=True)
seen_ids = set(n["id"] for n in get_unread_notifications())
last_dispatch_time = 0
last_assign_scan = 0 last_assign_scan = 0
print(f"Initial unread: {len(seen_ids)} (draining)", flush=True)
while True: while True:
time.sleep(POLL_DELAY) time.sleep(POLL_DELAY)
now = time.time() now = time.time()
# Notification polling # Notification polling
notifications = get_unread_notifications() notifs = gitea_api("GET", "/notifications?status-types=unread") or []
current_ids = {n["id"] for n in notifications} current_ids = {n["id"] for n in notifs}
new_ids = current_ids - seen_ids new_ids = current_ids - seen_ids
if new_ids and now - last_dispatch >= COOLDOWN:
if new_ids: for n in [n for n in notifs if n["id"] in new_ids]:
ts = time.strftime("%H:%M:%S") nid = n.get("id")
new_notifs = [n for n in notifications if n["id"] in new_ids] if nid:
print(f"[{ts}] {len(new_ids)} new notification(s)", flush=True) gitea_api("PATCH", f"/notifications/threads/{nid}")
if now - last_dispatch_time >= COOLDOWN: is_act, reason, num = is_actionable(n)
dispatch_notifications(new_notifs) if is_act:
last_dispatch_time = now repo = n["repository"]["full_name"]
else: title = n["subject"]["title"][:60]
remaining = int(COOLDOWN - (now - last_dispatch_time)) stype = n["subject"].get("type", "").lower()
print(f" → Cooldown ({remaining}s remaining)", flush=True) spawn_agent(repo, num, title, stype, reason)
last_dispatch = now
seen_ids = current_ids seen_ids = current_ids
# Assignment scan (less frequent) # Assignment scan (less frequent)
if now - last_assign_scan >= ASSIGNMENT_INTERVAL: if now - last_assign_scan >= ASSIGNMENT_INTERVAL:
scan_assigned_issues() for repo in WATCHED_REPOS:
for itype in ["issues", "pulls"]:
items = gitea_api("GET",
f"/repos/{repo}/issues?state=open&type={itype}"
f"&assignee={BOT_USER}&sort=updated&limit=10") or []
for item in items:
num = str(item["number"])
if f"{repo}#{num}" in dispatched_issues:
continue
# Only recently updated items (5 min)
# ... add is_recently_updated() check here
if needs_bot_response(repo, num):
spawn_agent(repo, num, item["title"][:60],
"pull" if itype == "pulls" else "issue",
f"assigned to {BOT_USER}")
last_assign_scan = now last_assign_scan = now
@ -525,26 +416,8 @@ if __name__ == "__main__":
main() main()
``` ```
Run it as a background service (launchd on macOS, systemd on Linux) with Run it as a background service (launchd on macOS, systemd on Linux) with the env
`GITEA_URL` and `GITEA_TOKEN` set. Customize `WATCHED_REPOS`, `BOT_USERNAME`, vars set. It's intentionally simple — no frameworks, no async, no dependencies.
and `GIT_CHANNEL` for your setup. It's intentionally simple — no frameworks,
no async, no dependencies.
**Lessons learned during development:**
- `openclaw cron add --at` uses formats like `1s`, `20m` — not `+5s` or `+0s`.
- `--no-deliver` is incompatible with `--session main`. Use
`--session isolated` with `--no-deliver`.
- `--system-event` targets the main DM session. If your agent is active in a
channel session, it won't see system events. Use `--session isolated` with
`--message` instead.
- Isolated agent sessions don't have access to workspace files (TOOLS.md, etc).
Bake all credentials and instructions directly into the agent prompt.
- Agents WILL go out of scope unless the SCOPE constraint is extremely explicit
and uses strong language ("violating scope is a critical failure").
- When the user's explicit instructions in an issue conflict with boilerplate
rules in the agent prompt, the agent will follow the boilerplate unless the
prompt explicitly says "user instructions take priority."
### The Daily Diary ### The Daily Diary