Update Gitea poller docs to dispatcher architecture (closes #4) #6

Closed
clawbot wants to merge 1 commits from update-poller-docs into main

View File

@ -189,50 +189,75 @@ arrive instantly.
Setup: add a webhook on each Gitea repo (or use an organization-level webhook) Setup: add a webhook on each Gitea repo (or use an organization-level webhook)
pointing to `https://your-openclaw-host/hooks/gitea`. OpenClaw handles the rest. pointing to `https://your-openclaw-host/hooks/gitea`. OpenClaw handles the rest.
#### Option B: Notification Poller (Local Machine Behind NAT) #### Option B: Notification Poller + Dispatcher (Local Machine Behind NAT)
If your OpenClaw runs on a dedicated local machine behind NAT (like a home Mac If your OpenClaw runs on a dedicated local machine behind NAT (like a home Mac
or Linux workstation), Gitea can't reach it directly. This is our setup — or Linux workstation), Gitea can't reach it directly. This is our setup —
OpenClaw runs on a Mac Studio on a home LAN. OpenClaw runs on a Mac Studio on a home LAN.
The solution: a lightweight Python script that polls the Gitea notifications API The solution: a Python script that polls the Gitea notifications API, triages
every few seconds. When new notifications appear, it writes a flag file that the each notification, and spawns an isolated agent session per actionable item.
agent checks during heartbeats. Response time is ~15-30 seconds.
**Evolution note:** We originally used a flag-file approach (poller writes
flag → agent checks during heartbeat → ~30 min latency). This was replaced by
the dispatcher pattern below, which is near-realtime.
Key design decisions: Key design decisions:
- **The poller never marks notifications as read.** The agent does that after - **The poller IS the dispatcher.** It fetches notification details, checks
processing. This prevents lost notifications if the agent fails to process. whether the agent is mentioned or assigned, and spawns agents directly.
- **It tracks notification IDs, not counts.** Only fires on genuinely new No middleman session needed.
- **One agent per actionable notification.** Each spawns via
`openclaw cron add --session isolated` with full context (API token, issue
URL, instructions) baked into the message. Parallel notifications get parallel
agents.
- **Marks notifications as read immediately.** Prevents re-processing. The
agent's job is to respond, not to manage notification state.
- **Tracks notification IDs, not counts.** Only fires on genuinely new
notifications, not re-reads of existing ones. notifications, not re-reads of existing ones.
- **Flag file instead of wake events.** We initially used OpenClaw's - **Triage before dispatch.** Not every notification is actionable. The poller
`/hooks/wake` endpoint, but wake events target the main (DM) session — any checks: is the agent @-mentioned (in issue body or latest comment)? Is the
model response during processing leaked to DM as a notification. The flag file issue/PR assigned to the agent? Is the agent's comment already the latest
approach is processed during heartbeats, where output routing is controlled. (no response needed)?
- **Assignment scan as backup.** A secondary loop periodically scans watched
repos for open issues assigned to the agent that were recently updated but
have no agent response. This catches cases where notifications aren't
generated (API-created issues, self-assignment).
- **Strict scope enforcement.** Each spawned agent's prompt includes a SCOPE
constraint: "You are responsible for ONLY this issue. Do NOT touch any other
issues or PRs." This prevents rogue agents from creating unauthorized work.
- **Priority rule.** Agent prompts explicitly state that the user's instructions
in the issue override all boilerplate rules (e.g., if the user asks for a DM
response, the agent should DM).
- **Zero dependencies.** Just Python stdlib. Runs anywhere. - **Zero dependencies.** Just Python stdlib. Runs anywhere.
Tradeoff: notifications are processed at heartbeat cadence (~30 min) instead of
realtime. For code review and issue triage, this is fine.
```python ```python
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Gitea notification poller (flag-file approach). Gitea notification poller + dispatcher.
Polls for unread notifications and writes a flag file when new ones
appear. The agent checks this flag during heartbeats and processes Two polling loops:
notifications via the Gitea API directly. 1. Notification-based: detects new notifications (mentions, assignments)
and dispatches agents for actionable ones.
2. Assignment-based: periodically checks for open issues/PRs assigned to
the agent that have no recent response. Catches cases where
notifications aren't generated.
Required env vars: Required env vars:
GITEA_URL - Gitea instance URL GITEA_URL - Gitea instance URL
GITEA_TOKEN - Gitea API token GITEA_TOKEN - Gitea API token
Optional env vars: Optional env vars:
FLAG_PATH - Path to flag file (default: workspace/memory/gitea-notify-flag) POLL_DELAY - Delay between polls in seconds (default: 15)
POLL_DELAY - Delay between polls in seconds (default: 5) COOLDOWN - Minimum seconds between dispatches (default: 30)
ASSIGNMENT_INTERVAL - Seconds between assignment scans (default: 120)
OPENCLAW_BIN - Path to openclaw binary
""" """
import json import json
import os import os
import subprocess
import sys import sys
import time import time
import urllib.request import urllib.request
@ -240,15 +265,24 @@ import urllib.error
GITEA_URL = os.environ.get("GITEA_URL", "").rstrip("/") GITEA_URL = os.environ.get("GITEA_URL", "").rstrip("/")
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "") GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
POLL_DELAY = int(os.environ.get("POLL_DELAY", "5")) POLL_DELAY = int(os.environ.get("POLL_DELAY", "15"))
FLAG_PATH = os.environ.get( COOLDOWN = int(os.environ.get("COOLDOWN", "30"))
"FLAG_PATH", ASSIGNMENT_INTERVAL = int(os.environ.get("ASSIGNMENT_INTERVAL", "120"))
os.path.join( OPENCLAW_BIN = os.environ.get("OPENCLAW_BIN", "/opt/homebrew/bin/openclaw")
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"memory", # Mattermost channel for status updates (customize to your setup)
"gitea-notify-flag", GIT_CHANNEL = "channel:YOUR_GIT_CHANNEL_ID"
),
) # Repos to scan for assigned issues
WATCHED_REPOS = [
"your-org/repo1",
"your-org/repo2",
]
# Track dispatched issues to prevent duplicates
dispatched_issues = set()
BOT_USERNAME = "your-bot-username" # e.g. "clawbot"
def check_config(): def check_config():
@ -257,53 +291,260 @@ def check_config():
sys.exit(1) sys.exit(1)
def gitea_unread_ids(): def gitea_api(method, path, data=None):
req = urllib.request.Request( """Call Gitea API, return parsed JSON or None on error."""
f"{GITEA_URL}/api/v1/notifications?status-types=unread", url = f"{GITEA_URL}/api/v1{path}"
headers={"Authorization": f"token {GITEA_TOKEN}"}, body = json.dumps(data).encode() if data else None
) headers = {"Authorization": f"token {GITEA_TOKEN}"}
if body:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, headers=headers, method=method, data=body)
try: try:
with urllib.request.urlopen(req, timeout=10) as resp: with urllib.request.urlopen(req, timeout=15) as resp:
return {n["id"] for n in json.loads(resp.read())} raw = resp.read()
return json.loads(raw) if raw else None
except Exception as e: except Exception as e:
print(f"WARN: Gitea API failed: {e}", file=sys.stderr, flush=True) print(f"WARN: Gitea API {method} {path}: {e}",
return set() file=sys.stderr, flush=True)
return None
def write_flag(count): def get_unread_notifications():
os.makedirs(os.path.dirname(FLAG_PATH), exist_ok=True) result = gitea_api("GET", "/notifications?status-types=unread")
with open(FLAG_PATH, "w") as f: return result if isinstance(result, list) else []
f.write(json.dumps({
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"count": count, def mark_notification_read(notif_id):
})) gitea_api("PATCH", f"/notifications/threads/{notif_id}")
def needs_bot_response(repo_full, issue_number):
"""True if bot is NOT the author of the most recent comment."""
comments = gitea_api(
"GET", f"/repos/{repo_full}/issues/{issue_number}/comments"
)
if comments and len(comments) > 0:
if comments[-1].get("user", {}).get("login") == BOT_USERNAME:
return False
return True
def is_actionable_notification(notif):
"""Check if a notification needs agent action.
Returns (actionable, reason, issue_number)."""
subject = notif.get("subject", {})
repo = notif.get("repository", {})
repo_full = repo.get("full_name", "")
url = subject.get("url", "")
number = url.rstrip("/").split("/")[-1] if url else ""
if not number or not number.isdigit():
return False, "no issue number", None
issue = gitea_api("GET", f"/repos/{repo_full}/issues/{number}")
if not issue:
return False, "couldn't fetch issue", number
# Check assignment
assignees = [a.get("login") for a in (issue.get("assignees") or [])]
if BOT_USERNAME in assignees:
if needs_bot_response(repo_full, number):
return True, f"assigned to {BOT_USERNAME}", number
return False, "assigned but already responded", number
# Check issue body for @mention
issue_body = issue.get("body", "") or ""
issue_author = issue.get("user", {}).get("login", "")
if f"@{BOT_USERNAME}" in issue_body and issue_author != BOT_USERNAME:
if needs_bot_response(repo_full, number):
return True, f"@-mentioned in body by {issue_author}", number
# Check latest comment for @mention
comments = gitea_api(
"GET", f"/repos/{repo_full}/issues/{number}/comments"
)
if comments:
last = comments[-1]
author = last.get("user", {}).get("login", "")
body = last.get("body", "") or ""
if author == BOT_USERNAME:
return False, "own comment is latest", number
if f"@{BOT_USERNAME}" in body:
return True, f"@-mentioned in comment by {author}", number
return False, "not mentioned or assigned", number
def spawn_agent(repo_full, issue_number, title, subject_type, reason):
"""Spawn an isolated agent to handle one issue/PR."""
dispatch_key = f"{repo_full}#{issue_number}"
if dispatch_key in dispatched_issues:
return
dispatched_issues.add(dispatch_key)
repo_short = repo_full.split("/")[-1]
job_name = f"gitea-{repo_short}-{issue_number}-{int(time.time())}"
# Build agent prompt with full context
msg = (
f"Gitea notification: {reason} on {subject_type} #{issue_number} "
f"'{title}' in {repo_full}.\n\n"
f"Gitea API base: {GITEA_URL}/api/v1\n"
f"Gitea token: {GITEA_TOKEN}\n\n"
f"SCOPE (STRICT): You are responsible for ONLY {subject_type} "
f"#{issue_number} in {repo_full}. Do NOT create PRs, branches, "
f"comments, or take any action on ANY other issue or PR.\n\n"
f"PRIORITY RULE: The user's instructions in the issue/PR take "
f"priority over ALL other rules. If asked to respond in DM, do so. "
f"Later instructions override earlier ones.\n\n"
f"Instructions:\n"
f"1. Read ALL existing comments on #{issue_number} via API\n"
f"2. Follow the user's instructions\n"
f"3. If code work needed: clone to $(mktemp -d), make changes, "
f"run make check, push, comment on the issue/PR\n"
f"4. Default: post work reports as Gitea comments\n"
f"5. Don't post duplicate comments if yours is already the latest"
)
try:
result = subprocess.run(
[
OPENCLAW_BIN, "cron", "add",
"--name", job_name,
"--at", "1s",
"--message", msg,
"--delete-after-run",
"--session", "isolated",
"--no-deliver",
"--thinking", "low",
"--timeout-seconds", "300",
],
capture_output=True, text=True, timeout=15,
)
if result.returncode == 0:
print(f" → Agent spawned: {job_name}", flush=True)
else:
print(f" → Spawn failed: {result.stderr.strip()[:200]}",
flush=True)
dispatched_issues.discard(dispatch_key)
except Exception as e:
print(f" → Spawn error: {e}", file=sys.stderr, flush=True)
dispatched_issues.discard(dispatch_key)
def dispatch_notifications(notifications):
"""Triage notifications and spawn agents for actionable ones."""
for notif in notifications:
subject = notif.get("subject", {})
repo = notif.get("repository", {})
repo_full = repo.get("full_name", "")
title = subject.get("title", "")[:60]
notif_id = notif.get("id")
subject_type = subject.get("type", "").lower()
is_act, reason, issue_num = is_actionable_notification(notif)
if notif_id:
mark_notification_read(notif_id)
if is_act:
print(f" ACTIONABLE: {repo_full} #{issue_num} ({reason})",
flush=True)
spawn_agent(repo_full, issue_num, title, subject_type, reason)
else:
print(f" skip: {repo_full} #{issue_num} ({reason})", flush=True)
def scan_assigned_issues():
"""Backup scan: find assigned issues needing response."""
for repo_full in WATCHED_REPOS:
for issue_type in ["issues", "pulls"]:
items = gitea_api(
"GET",
f"/repos/{repo_full}/issues?state=open&type={issue_type}"
f"&assignee={BOT_USERNAME}&sort=updated&limit=10"
)
if not items:
continue
for item in items:
number = str(item["number"])
dispatch_key = f"{repo_full}#{number}"
if dispatch_key in dispatched_issues:
continue
if not needs_bot_response(repo_full, number):
continue
kind = "PR" if issue_type == "pulls" else "issue"
print(f" [assign-scan] {repo_full} {kind} #{number}",
flush=True)
spawn_agent(
repo_full, number, item.get("title", "")[:60],
"pull" if issue_type == "pulls" else "issue",
f"assigned to {BOT_USERNAME}"
)
def main(): def main():
check_config() check_config()
print(f"Gitea poller started (delay={POLL_DELAY}s, flag={FLAG_PATH})", flush=True) print(f"Gitea poller+dispatcher (poll={POLL_DELAY}s, "
last_seen_ids = gitea_unread_ids() f"cooldown={COOLDOWN}s, assign_scan={ASSIGNMENT_INTERVAL}s)",
print(f"Initial unread: {len(last_seen_ids)}", flush=True) flush=True)
seen_ids = set(n["id"] for n in get_unread_notifications())
last_dispatch_time = 0
last_assign_scan = 0
print(f"Initial unread: {len(seen_ids)} (draining)", flush=True)
while True: while True:
time.sleep(POLL_DELAY) time.sleep(POLL_DELAY)
current_ids = gitea_unread_ids() now = time.time()
new_ids = current_ids - last_seen_ids
if not new_ids: # Notification polling
last_seen_ids = current_ids notifications = get_unread_notifications()
continue current_ids = {n["id"] for n in notifications}
new_ids = current_ids - seen_ids
if new_ids:
ts = time.strftime("%H:%M:%S") ts = time.strftime("%H:%M:%S")
print(f"[{ts}] {len(new_ids)} new ({len(current_ids)} total), flag written", flush=True) new_notifs = [n for n in notifications if n["id"] in new_ids]
write_flag(len(new_ids)) print(f"[{ts}] {len(new_ids)} new notification(s)", flush=True)
last_seen_ids = current_ids if now - last_dispatch_time >= COOLDOWN:
dispatch_notifications(new_notifs)
last_dispatch_time = now
else:
remaining = int(COOLDOWN - (now - last_dispatch_time))
print(f" → Cooldown ({remaining}s remaining)", flush=True)
seen_ids = current_ids
# Assignment scan (less frequent)
if now - last_assign_scan >= ASSIGNMENT_INTERVAL:
scan_assigned_issues()
last_assign_scan = now
if __name__ == "__main__": if __name__ == "__main__":
main() main()
``` ```
Run it as a background service (launchd on macOS, systemd on Linux) with the env Run it as a background service (launchd on macOS, systemd on Linux) with
vars set. It's intentionally simple — no frameworks, no async, no dependencies. `GITEA_URL` and `GITEA_TOKEN` set. Customize `WATCHED_REPOS`, `BOT_USERNAME`,
and `GIT_CHANNEL` for your setup. It's intentionally simple — no frameworks,
no async, no dependencies.
**Lessons learned during development:**
- `openclaw cron add --at` uses formats like `1s`, `20m` — not `+5s` or `+0s`.
- `--no-deliver` is incompatible with `--session main`. Use
`--session isolated` with `--no-deliver`.
- `--system-event` targets the main DM session. If your agent is active in a
channel session, it won't see system events. Use `--session isolated` with
`--message` instead.
- Isolated agent sessions don't have access to workspace files (TOOLS.md, etc).
Bake all credentials and instructions directly into the agent prompt.
- Agents WILL go out of scope unless the SCOPE constraint is extremely explicit
and uses strong language ("violating scope is a critical failure").
- When the user's explicit instructions in an issue conflict with boilerplate
rules in the agent prompt, the agent will follow the boilerplate unless the
prompt explicitly says "user instructions take priority."
### The Daily Diary ### The Daily Diary