Update Gitea poller docs to dispatcher architecture (closes #4) #6

Closed
clawbot wants to merge 1 commits from update-poller-docs into main

View File

@ -189,50 +189,75 @@ arrive instantly.
Setup: add a webhook on each Gitea repo (or use an organization-level webhook)
pointing to `https://your-openclaw-host/hooks/gitea`. OpenClaw handles the rest.
#### Option B: Notification Poller (Local Machine Behind NAT)
#### Option B: Notification Poller + Dispatcher (Local Machine Behind NAT)
If your OpenClaw runs on a dedicated local machine behind NAT (like a home Mac
or Linux workstation), Gitea can't reach it directly. This is our setup —
OpenClaw runs on a Mac Studio on a home LAN.
The solution: a lightweight Python script that polls the Gitea notifications API
every few seconds. When new notifications appear, it writes a flag file that the
agent checks during heartbeats.
The solution: a Python script that polls the Gitea notifications API, triages
each notification, and spawns an isolated agent session per actionable item.
Response time is ~15-30 seconds.
**Evolution note:** We originally used a flag-file approach (poller writes
flag → agent checks during heartbeat → ~30 min latency). This was replaced by
the dispatcher pattern below, which is near-realtime.
Key design decisions:
- **The poller never marks notifications as read.** The agent does that after
processing. This prevents lost notifications if the agent fails to process.
- **It tracks notification IDs, not counts.** Only fires on genuinely new
- **The poller IS the dispatcher.** It fetches notification details, checks
whether the agent is mentioned or assigned, and spawns agents directly.
No middleman session needed.
- **One agent per actionable notification.** Each spawns via
`openclaw cron add --session isolated` with full context (API token, issue
URL, instructions) baked into the message. Parallel notifications get parallel
agents.
- **Marks notifications as read immediately.** Prevents re-processing. The
agent's job is to respond, not to manage notification state.
- **Tracks notification IDs, not counts.** Only fires on genuinely new
notifications, not re-reads of existing ones.
- **Flag file instead of wake events.** We initially used OpenClaw's
`/hooks/wake` endpoint, but wake events target the main (DM) session — any
model response during processing leaked to DM as a notification. The flag file
approach is processed during heartbeats, where output routing is controlled.
- **Triage before dispatch.** Not every notification is actionable. The poller
checks: is the agent @-mentioned (in issue body or latest comment)? Is the
issue/PR assigned to the agent? Is the agent's comment already the latest
(no response needed)?
- **Assignment scan as backup.** A secondary loop periodically scans watched
repos for open issues assigned to the agent that were recently updated but
have no agent response. This catches cases where notifications aren't
generated (API-created issues, self-assignment).
- **Strict scope enforcement.** Each spawned agent's prompt includes a SCOPE
constraint: "You are responsible for ONLY this issue. Do NOT touch any other
issues or PRs." This prevents rogue agents from creating unauthorized work.
- **Priority rule.** Agent prompts explicitly state that the user's instructions
in the issue override all boilerplate rules (e.g., if the user asks for a DM
response, the agent should DM).
- **Zero dependencies.** Just Python stdlib. Runs anywhere.
Tradeoff: notifications are processed at heartbeat cadence (~30 min) instead of
realtime. For code review and issue triage, this is fine.
```python
#!/usr/bin/env python3
"""
Gitea notification poller (flag-file approach).
Polls for unread notifications and writes a flag file when new ones
appear. The agent checks this flag during heartbeats and processes
notifications via the Gitea API directly.
Gitea notification poller + dispatcher.
Two polling loops:
1. Notification-based: detects new notifications (mentions, assignments)
and dispatches agents for actionable ones.
2. Assignment-based: periodically checks for open issues/PRs assigned to
the agent that have no recent response. Catches cases where
notifications aren't generated.
Required env vars:
GITEA_URL - Gitea instance URL
GITEA_TOKEN - Gitea API token
GITEA_URL - Gitea instance URL
GITEA_TOKEN - Gitea API token
Optional env vars:
FLAG_PATH - Path to flag file (default: workspace/memory/gitea-notify-flag)
POLL_DELAY - Delay between polls in seconds (default: 5)
POLL_DELAY - Delay between polls in seconds (default: 15)
COOLDOWN - Minimum seconds between dispatches (default: 30)
ASSIGNMENT_INTERVAL - Seconds between assignment scans (default: 120)
OPENCLAW_BIN - Path to openclaw binary
"""
import json
import os
import subprocess
import sys
import time
import urllib.request
@ -240,15 +265,24 @@ import urllib.error
GITEA_URL = os.environ.get("GITEA_URL", "").rstrip("/")
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
POLL_DELAY = int(os.environ.get("POLL_DELAY", "5"))
FLAG_PATH = os.environ.get(
"FLAG_PATH",
os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"memory",
"gitea-notify-flag",
),
)
POLL_DELAY = int(os.environ.get("POLL_DELAY", "15"))
COOLDOWN = int(os.environ.get("COOLDOWN", "30"))
ASSIGNMENT_INTERVAL = int(os.environ.get("ASSIGNMENT_INTERVAL", "120"))
OPENCLAW_BIN = os.environ.get("OPENCLAW_BIN", "/opt/homebrew/bin/openclaw")
# Mattermost channel for status updates (customize to your setup)
GIT_CHANNEL = "channel:YOUR_GIT_CHANNEL_ID"
# Repos to scan for assigned issues
WATCHED_REPOS = [
"your-org/repo1",
"your-org/repo2",
]
# Track dispatched issues to prevent duplicates
dispatched_issues = set()
BOT_USERNAME = "your-bot-username" # e.g. "clawbot"
def check_config():
@ -257,53 +291,260 @@ def check_config():
sys.exit(1)
def gitea_unread_ids():
req = urllib.request.Request(
f"{GITEA_URL}/api/v1/notifications?status-types=unread",
headers={"Authorization": f"token {GITEA_TOKEN}"},
)
def gitea_api(method, path, data=None):
"""Call Gitea API, return parsed JSON or None on error."""
url = f"{GITEA_URL}/api/v1{path}"
body = json.dumps(data).encode() if data else None
headers = {"Authorization": f"token {GITEA_TOKEN}"}
if body:
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, headers=headers, method=method, data=body)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return {n["id"] for n in json.loads(resp.read())}
with urllib.request.urlopen(req, timeout=15) as resp:
raw = resp.read()
return json.loads(raw) if raw else None
except Exception as e:
print(f"WARN: Gitea API failed: {e}", file=sys.stderr, flush=True)
return set()
print(f"WARN: Gitea API {method} {path}: {e}",
file=sys.stderr, flush=True)
return None
def write_flag(count):
os.makedirs(os.path.dirname(FLAG_PATH), exist_ok=True)
with open(FLAG_PATH, "w") as f:
f.write(json.dumps({
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"count": count,
}))
def get_unread_notifications():
result = gitea_api("GET", "/notifications?status-types=unread")
return result if isinstance(result, list) else []
def mark_notification_read(notif_id):
gitea_api("PATCH", f"/notifications/threads/{notif_id}")
def needs_bot_response(repo_full, issue_number):
"""True if bot is NOT the author of the most recent comment."""
comments = gitea_api(
"GET", f"/repos/{repo_full}/issues/{issue_number}/comments"
)
if comments and len(comments) > 0:
if comments[-1].get("user", {}).get("login") == BOT_USERNAME:
return False
return True
def is_actionable_notification(notif):
"""Check if a notification needs agent action.
Returns (actionable, reason, issue_number)."""
subject = notif.get("subject", {})
repo = notif.get("repository", {})
repo_full = repo.get("full_name", "")
url = subject.get("url", "")
number = url.rstrip("/").split("/")[-1] if url else ""
if not number or not number.isdigit():
return False, "no issue number", None
issue = gitea_api("GET", f"/repos/{repo_full}/issues/{number}")
if not issue:
return False, "couldn't fetch issue", number
# Check assignment
assignees = [a.get("login") for a in (issue.get("assignees") or [])]
if BOT_USERNAME in assignees:
if needs_bot_response(repo_full, number):
return True, f"assigned to {BOT_USERNAME}", number
return False, "assigned but already responded", number
# Check issue body for @mention
issue_body = issue.get("body", "") or ""
issue_author = issue.get("user", {}).get("login", "")
if f"@{BOT_USERNAME}" in issue_body and issue_author != BOT_USERNAME:
if needs_bot_response(repo_full, number):
return True, f"@-mentioned in body by {issue_author}", number
# Check latest comment for @mention
comments = gitea_api(
"GET", f"/repos/{repo_full}/issues/{number}/comments"
)
if comments:
last = comments[-1]
author = last.get("user", {}).get("login", "")
body = last.get("body", "") or ""
if author == BOT_USERNAME:
return False, "own comment is latest", number
if f"@{BOT_USERNAME}" in body:
return True, f"@-mentioned in comment by {author}", number
return False, "not mentioned or assigned", number
def spawn_agent(repo_full, issue_number, title, subject_type, reason):
"""Spawn an isolated agent to handle one issue/PR."""
dispatch_key = f"{repo_full}#{issue_number}"
if dispatch_key in dispatched_issues:
return
dispatched_issues.add(dispatch_key)
repo_short = repo_full.split("/")[-1]
job_name = f"gitea-{repo_short}-{issue_number}-{int(time.time())}"
# Build agent prompt with full context
msg = (
f"Gitea notification: {reason} on {subject_type} #{issue_number} "
f"'{title}' in {repo_full}.\n\n"
f"Gitea API base: {GITEA_URL}/api/v1\n"
f"Gitea token: {GITEA_TOKEN}\n\n"
f"SCOPE (STRICT): You are responsible for ONLY {subject_type} "
f"#{issue_number} in {repo_full}. Do NOT create PRs, branches, "
f"comments, or take any action on ANY other issue or PR.\n\n"
f"PRIORITY RULE: The user's instructions in the issue/PR take "
f"priority over ALL other rules. If asked to respond in DM, do so. "
f"Later instructions override earlier ones.\n\n"
f"Instructions:\n"
f"1. Read ALL existing comments on #{issue_number} via API\n"
f"2. Follow the user's instructions\n"
f"3. If code work needed: clone to $(mktemp -d), make changes, "
f"run make check, push, comment on the issue/PR\n"
f"4. Default: post work reports as Gitea comments\n"
f"5. Don't post duplicate comments if yours is already the latest"
)
try:
result = subprocess.run(
[
OPENCLAW_BIN, "cron", "add",
"--name", job_name,
"--at", "1s",
"--message", msg,
"--delete-after-run",
"--session", "isolated",
"--no-deliver",
"--thinking", "low",
"--timeout-seconds", "300",
],
capture_output=True, text=True, timeout=15,
)
if result.returncode == 0:
print(f" → Agent spawned: {job_name}", flush=True)
else:
print(f" → Spawn failed: {result.stderr.strip()[:200]}",
flush=True)
dispatched_issues.discard(dispatch_key)
except Exception as e:
print(f" → Spawn error: {e}", file=sys.stderr, flush=True)
dispatched_issues.discard(dispatch_key)
def dispatch_notifications(notifications):
"""Triage notifications and spawn agents for actionable ones."""
for notif in notifications:
subject = notif.get("subject", {})
repo = notif.get("repository", {})
repo_full = repo.get("full_name", "")
title = subject.get("title", "")[:60]
notif_id = notif.get("id")
subject_type = subject.get("type", "").lower()
is_act, reason, issue_num = is_actionable_notification(notif)
if notif_id:
mark_notification_read(notif_id)
if is_act:
print(f" ACTIONABLE: {repo_full} #{issue_num} ({reason})",
flush=True)
spawn_agent(repo_full, issue_num, title, subject_type, reason)
else:
print(f" skip: {repo_full} #{issue_num} ({reason})", flush=True)
def scan_assigned_issues():
"""Backup scan: find assigned issues needing response."""
for repo_full in WATCHED_REPOS:
for issue_type in ["issues", "pulls"]:
items = gitea_api(
"GET",
f"/repos/{repo_full}/issues?state=open&type={issue_type}"
f"&assignee={BOT_USERNAME}&sort=updated&limit=10"
)
if not items:
continue
for item in items:
number = str(item["number"])
dispatch_key = f"{repo_full}#{number}"
if dispatch_key in dispatched_issues:
continue
if not needs_bot_response(repo_full, number):
continue
kind = "PR" if issue_type == "pulls" else "issue"
print(f" [assign-scan] {repo_full} {kind} #{number}",
flush=True)
spawn_agent(
repo_full, number, item.get("title", "")[:60],
"pull" if issue_type == "pulls" else "issue",
f"assigned to {BOT_USERNAME}"
)
def main():
check_config()
print(f"Gitea poller started (delay={POLL_DELAY}s, flag={FLAG_PATH})", flush=True)
last_seen_ids = gitea_unread_ids()
print(f"Initial unread: {len(last_seen_ids)}", flush=True)
print(f"Gitea poller+dispatcher (poll={POLL_DELAY}s, "
f"cooldown={COOLDOWN}s, assign_scan={ASSIGNMENT_INTERVAL}s)",
flush=True)
seen_ids = set(n["id"] for n in get_unread_notifications())
last_dispatch_time = 0
last_assign_scan = 0
print(f"Initial unread: {len(seen_ids)} (draining)", flush=True)
while True:
time.sleep(POLL_DELAY)
current_ids = gitea_unread_ids()
new_ids = current_ids - last_seen_ids
if not new_ids:
last_seen_ids = current_ids
continue
ts = time.strftime("%H:%M:%S")
print(f"[{ts}] {len(new_ids)} new ({len(current_ids)} total), flag written", flush=True)
write_flag(len(new_ids))
last_seen_ids = current_ids
now = time.time()
# Notification polling
notifications = get_unread_notifications()
current_ids = {n["id"] for n in notifications}
new_ids = current_ids - seen_ids
if new_ids:
ts = time.strftime("%H:%M:%S")
new_notifs = [n for n in notifications if n["id"] in new_ids]
print(f"[{ts}] {len(new_ids)} new notification(s)", flush=True)
if now - last_dispatch_time >= COOLDOWN:
dispatch_notifications(new_notifs)
last_dispatch_time = now
else:
remaining = int(COOLDOWN - (now - last_dispatch_time))
print(f" → Cooldown ({remaining}s remaining)", flush=True)
seen_ids = current_ids
# Assignment scan (less frequent)
if now - last_assign_scan >= ASSIGNMENT_INTERVAL:
scan_assigned_issues()
last_assign_scan = now
if __name__ == "__main__":
main()
```
Run it as a background service (launchd on macOS, systemd on Linux) with the env
vars set. It's intentionally simple — no frameworks, no async, no dependencies.
Run it as a background service (launchd on macOS, systemd on Linux) with
`GITEA_URL` and `GITEA_TOKEN` set. Customize `WATCHED_REPOS`, `BOT_USERNAME`,
and `GIT_CHANNEL` for your setup. It's intentionally simple — no frameworks,
no async, no dependencies.
**Lessons learned during development:**
- `openclaw cron add --at` uses formats like `1s`, `20m` — not `+5s` or `+0s`.
- `--no-deliver` is incompatible with `--session main`. Use
`--session isolated` with `--no-deliver`.
- `--system-event` targets the main DM session. If your agent is active in a
channel session, it won't see system events. Use `--session isolated` with
`--message` instead.
- Isolated agent sessions don't have access to workspace files (TOOLS.md, etc).
Bake all credentials and instructions directly into the agent prompt.
- Agents WILL go out of scope unless the SCOPE constraint is extremely explicit
and uses strong language ("violating scope is a critical failure").
- When the user's explicit instructions in an issue conflict with boilerplate
rules in the agent prompt, the agent will follow the boilerplate unless the
prompt explicitly says "user instructions take priority."
### The Daily Diary