Compare commits

..

1 Commits

Author SHA1 Message Date
clawbot
c30a280d6e update Gitea poller docs to dispatcher architecture (closes #4)
Some checks failed
check / check (push) Failing after 11s
2026-02-28 06:30:25 -08:00

View File

@ -195,47 +195,54 @@ If your OpenClaw runs on a dedicated local machine behind NAT (like a home Mac
or Linux workstation), Gitea can't reach it directly. This is our setup —
OpenClaw runs on a Mac Studio on a home LAN.
The solution: a Python script that both polls and dispatches. It polls the Gitea
notifications API every 15 seconds, triages each notification (checking
assignment and @-mentions), marks them as read, and spawns one isolated agent
session per actionable item via `openclaw cron add --session isolated`.
The solution: a Python script that polls the Gitea notifications API, triages
each notification, and spawns an isolated agent session per actionable item.
Response time is ~15-30 seconds.
The poller also runs a secondary **assignment scan** every 2 minutes, checking
all watched repos for open issues/PRs assigned to the bot that were recently
updated and still need a response. This catches cases where notifications aren't
generated (e.g. self-assignment, API-created issues).
**Evolution note:** We originally used a flag-file approach (poller writes
flag → agent checks during heartbeat → ~30 min latency). This was replaced by
the dispatcher pattern below, which is near-realtime.
Key design decisions:
- **The poller IS the dispatcher.** No flag files, no heartbeat dependency. The
poller triages notifications and spawns agents directly.
- **Marks notifications as read immediately.** Each notification is marked read
as it's processed, preventing re-dispatch on the next poll.
- **One agent per issue.** Each spawned agent gets a `SCOPE` instruction
limiting it to one specific issue/PR. Agents post results as Gitea comments,
not DMs.
- **Dedup tracking.** An in-memory `dispatched_issues` set prevents spawning
multiple agents for the same issue within one poller lifetime.
- **`--no-deliver` instead of `--announce`.** Agents report via Gitea API
directly. The `--announce` flag on isolated sessions had delivery failures.
- **Assignment scan filters by recency.** Only issues updated in the last 5
minutes are considered, preventing re-dispatch for stale assigned issues.
- **The poller IS the dispatcher.** It fetches notification details, checks
whether the agent is mentioned or assigned, and spawns agents directly.
No middleman session needed.
- **One agent per actionable notification.** Each spawns via
`openclaw cron add --session isolated` with full context (API token, issue
URL, instructions) baked into the message. Parallel notifications get parallel
agents.
- **Marks notifications as read immediately.** Prevents re-processing. The
agent's job is to respond, not to manage notification state.
- **Tracks notification IDs, not counts.** Only fires on genuinely new
notifications, not re-reads of existing ones.
- **Triage before dispatch.** Not every notification is actionable. The poller
checks: is the agent @-mentioned (in issue body or latest comment)? Is the
issue/PR assigned to the agent? Is the agent's comment already the latest
(no response needed)?
- **Assignment scan as backup.** A secondary loop periodically scans watched
repos for open issues assigned to the agent that were recently updated but
have no agent response. This catches cases where notifications aren't
generated (API-created issues, self-assignment).
- **Strict scope enforcement.** Each spawned agent's prompt includes a SCOPE
constraint: "You are responsible for ONLY this issue. Do NOT touch any other
issues or PRs." This prevents rogue agents from creating unauthorized work.
- **Priority rule.** Agent prompts explicitly state that the user's instructions
in the issue override all boilerplate rules (e.g., if the user asks for a DM
response, the agent should DM).
- **Zero dependencies.** Just Python stdlib. Runs anywhere.
Response time: ~1560s from notification to agent comment (vs ~30 min with the
old heartbeat approach).
```python
#!/usr/bin/env python3
"""
Gitea notification poller + dispatcher.
Two polling loops:
1. Notification-based: detects new notifications (mentions, assignments by
other users) and dispatches agents for actionable ones.
1. Notification-based: detects new notifications (mentions, assignments)
and dispatches agents for actionable ones.
2. Assignment-based: periodically checks for open issues/PRs assigned to
the bot that have no recent bot comment. Catches cases where
notifications aren't generated (e.g. self-assignment, API-created issues).
the agent that have no recent response. Catches cases where
notifications aren't generated.
Required env vars:
GITEA_URL - Gitea instance URL
@ -262,19 +269,30 @@ POLL_DELAY = int(os.environ.get("POLL_DELAY", "15"))
COOLDOWN = int(os.environ.get("COOLDOWN", "30"))
ASSIGNMENT_INTERVAL = int(os.environ.get("ASSIGNMENT_INTERVAL", "120"))
OPENCLAW_BIN = os.environ.get("OPENCLAW_BIN", "/opt/homebrew/bin/openclaw")
BOT_USER = "clawbot" # Change to your bot's Gitea username
# Mattermost channel for status updates (customize to your setup)
GIT_CHANNEL = "channel:YOUR_GIT_CHANNEL_ID"
# Repos to scan for assigned issues
WATCHED_REPOS = [
# "org/repo1",
# "org/repo2",
"your-org/repo1",
"your-org/repo2",
]
# Track dispatched issues to prevent duplicates
dispatched_issues = set()
BOT_USERNAME = "your-bot-username" # e.g. "clawbot"
def check_config():
if not GITEA_URL or not GITEA_TOKEN:
print("ERROR: GITEA_URL and GITEA_TOKEN required", file=sys.stderr)
sys.exit(1)
def gitea_api(method, path, data=None):
"""Call Gitea API, return parsed JSON or None on error."""
url = f"{GITEA_URL}/api/v1{path}"
body = json.dumps(data).encode() if data else None
headers = {"Authorization": f"token {GITEA_TOKEN}"}
@ -286,24 +304,38 @@ def gitea_api(method, path, data=None):
raw = resp.read()
return json.loads(raw) if raw else None
except Exception as e:
print(f"WARN: {method} {path}: {e}", file=sys.stderr, flush=True)
print(f"WARN: Gitea API {method} {path}: {e}",
file=sys.stderr, flush=True)
return None
def get_unread_notifications():
result = gitea_api("GET", "/notifications?status-types=unread")
return result if isinstance(result, list) else []
def mark_notification_read(notif_id):
gitea_api("PATCH", f"/notifications/threads/{notif_id}")
def needs_bot_response(repo_full, issue_number):
"""True if the bot is NOT the author of the most recent comment."""
comments = gitea_api("GET", f"/repos/{repo_full}/issues/{issue_number}/comments")
"""True if bot is NOT the author of the most recent comment."""
comments = gitea_api(
"GET", f"/repos/{repo_full}/issues/{issue_number}/comments"
)
if comments and len(comments) > 0:
if comments[-1].get("user", {}).get("login") == BOT_USER:
if comments[-1].get("user", {}).get("login") == BOT_USERNAME:
return False
return True
def is_actionable(notif):
"""Returns (actionable, reason, issue_number)."""
def is_actionable_notification(notif):
"""Check if a notification needs agent action.
Returns (actionable, reason, issue_number)."""
subject = notif.get("subject", {})
repo = notif.get("repository", {})
repo_full = repo.get("full_name", "")
url = subject.get("url", "")
number = url.rstrip("/").split("/")[-1] if url else ""
if not number or not number.isdigit():
@ -313,29 +345,38 @@ def is_actionable(notif):
if not issue:
return False, "couldn't fetch issue", number
# Check assignment
assignees = [a.get("login") for a in (issue.get("assignees") or [])]
if BOT_USER in assignees:
if BOT_USERNAME in assignees:
if needs_bot_response(repo_full, number):
return True, f"assigned to {BOT_USER}", number
return True, f"assigned to {BOT_USERNAME}", number
return False, "assigned but already responded", number
# Check issue body for @mention
issue_body = issue.get("body", "") or ""
if f"@{BOT_USER}" in issue_body and issue.get("user", {}).get("login") != BOT_USER:
issue_author = issue.get("user", {}).get("login", "")
if f"@{BOT_USERNAME}" in issue_body and issue_author != BOT_USERNAME:
if needs_bot_response(repo_full, number):
return True, f"@-mentioned in body", number
return True, f"@-mentioned in body by {issue_author}", number
comments = gitea_api("GET", f"/repos/{repo_full}/issues/{number}/comments")
# Check latest comment for @mention
comments = gitea_api(
"GET", f"/repos/{repo_full}/issues/{number}/comments"
)
if comments:
last = comments[-1]
if last.get("user", {}).get("login") == BOT_USER:
author = last.get("user", {}).get("login", "")
body = last.get("body", "") or ""
if author == BOT_USERNAME:
return False, "own comment is latest", number
if f"@{BOT_USER}" in (last.get("body") or ""):
return True, f"@-mentioned in comment", number
if f"@{BOT_USERNAME}" in body:
return True, f"@-mentioned in comment by {author}", number
return False, "not mentioned or assigned", number
def spawn_agent(repo_full, issue_number, title, subject_type, reason):
"""Spawn an isolated agent to handle one issue/PR."""
dispatch_key = f"{repo_full}#{issue_number}"
if dispatch_key in dispatched_issues:
return
@ -343,72 +384,140 @@ def spawn_agent(repo_full, issue_number, title, subject_type, reason):
repo_short = repo_full.split("/")[-1]
job_name = f"gitea-{repo_short}-{issue_number}-{int(time.time())}"
# Build agent prompt with full context
msg = (
f"Gitea: {reason} on {subject_type} #{issue_number} "
f"'{title}' in {repo_full}.\n"
f"API: {GITEA_URL}/api/v1 | Token: {GITEA_TOKEN}\n"
f"SCOPE: Only {subject_type} #{issue_number} in {repo_full}.\n"
f"Read all comments, do the work, post results as Gitea comments."
f"Gitea notification: {reason} on {subject_type} #{issue_number} "
f"'{title}' in {repo_full}.\n\n"
f"Gitea API base: {GITEA_URL}/api/v1\n"
f"Gitea token: {GITEA_TOKEN}\n\n"
f"SCOPE (STRICT): You are responsible for ONLY {subject_type} "
f"#{issue_number} in {repo_full}. Do NOT create PRs, branches, "
f"comments, or take any action on ANY other issue or PR.\n\n"
f"PRIORITY RULE: The user's instructions in the issue/PR take "
f"priority over ALL other rules. If asked to respond in DM, do so. "
f"Later instructions override earlier ones.\n\n"
f"Instructions:\n"
f"1. Read ALL existing comments on #{issue_number} via API\n"
f"2. Follow the user's instructions\n"
f"3. If code work needed: clone to $(mktemp -d), make changes, "
f"run make check, push, comment on the issue/PR\n"
f"4. Default: post work reports as Gitea comments\n"
f"5. Don't post duplicate comments if yours is already the latest"
)
try:
subprocess.run(
[OPENCLAW_BIN, "cron", "add",
"--name", job_name, "--at", "1s",
"--message", msg, "--delete-after-run",
"--session", "isolated", "--no-deliver",
"--thinking", "low", "--timeout-seconds", "300"],
result = subprocess.run(
[
OPENCLAW_BIN, "cron", "add",
"--name", job_name,
"--at", "1s",
"--message", msg,
"--delete-after-run",
"--session", "isolated",
"--no-deliver",
"--thinking", "low",
"--timeout-seconds", "300",
],
capture_output=True, text=True, timeout=15,
)
if result.returncode == 0:
print(f" → Agent spawned: {job_name}", flush=True)
else:
print(f" → Spawn failed: {result.stderr.strip()[:200]}",
flush=True)
dispatched_issues.discard(dispatch_key)
except Exception as e:
print(f"Spawn error: {e}", file=sys.stderr, flush=True)
print(f"Spawn error: {e}", file=sys.stderr, flush=True)
dispatched_issues.discard(dispatch_key)
def dispatch_notifications(notifications):
"""Triage notifications and spawn agents for actionable ones."""
for notif in notifications:
subject = notif.get("subject", {})
repo = notif.get("repository", {})
repo_full = repo.get("full_name", "")
title = subject.get("title", "")[:60]
notif_id = notif.get("id")
subject_type = subject.get("type", "").lower()
is_act, reason, issue_num = is_actionable_notification(notif)
if notif_id:
mark_notification_read(notif_id)
if is_act:
print(f" ACTIONABLE: {repo_full} #{issue_num} ({reason})",
flush=True)
spawn_agent(repo_full, issue_num, title, subject_type, reason)
else:
print(f" skip: {repo_full} #{issue_num} ({reason})", flush=True)
def scan_assigned_issues():
"""Backup scan: find assigned issues needing response."""
for repo_full in WATCHED_REPOS:
for issue_type in ["issues", "pulls"]:
items = gitea_api(
"GET",
f"/repos/{repo_full}/issues?state=open&type={issue_type}"
f"&assignee={BOT_USERNAME}&sort=updated&limit=10"
)
if not items:
continue
for item in items:
number = str(item["number"])
dispatch_key = f"{repo_full}#{number}"
if dispatch_key in dispatched_issues:
continue
if not needs_bot_response(repo_full, number):
continue
kind = "PR" if issue_type == "pulls" else "issue"
print(f" [assign-scan] {repo_full} {kind} #{number}",
flush=True)
spawn_agent(
repo_full, number, item.get("title", "")[:60],
"pull" if issue_type == "pulls" else "issue",
f"assigned to {BOT_USERNAME}"
)
def main():
print(f"Poller started (poll={POLL_DELAY}s, cooldown={COOLDOWN}s)", flush=True)
seen_ids = set(n["id"] for n in (gitea_api("GET", "/notifications?status-types=unread") or []))
last_dispatch = 0
check_config()
print(f"Gitea poller+dispatcher (poll={POLL_DELAY}s, "
f"cooldown={COOLDOWN}s, assign_scan={ASSIGNMENT_INTERVAL}s)",
flush=True)
seen_ids = set(n["id"] for n in get_unread_notifications())
last_dispatch_time = 0
last_assign_scan = 0
print(f"Initial unread: {len(seen_ids)} (draining)", flush=True)
while True:
time.sleep(POLL_DELAY)
now = time.time()
# Notification polling
notifs = gitea_api("GET", "/notifications?status-types=unread") or []
current_ids = {n["id"] for n in notifs}
notifications = get_unread_notifications()
current_ids = {n["id"] for n in notifications}
new_ids = current_ids - seen_ids
if new_ids and now - last_dispatch >= COOLDOWN:
for n in [n for n in notifs if n["id"] in new_ids]:
nid = n.get("id")
if nid:
gitea_api("PATCH", f"/notifications/threads/{nid}")
is_act, reason, num = is_actionable(n)
if is_act:
repo = n["repository"]["full_name"]
title = n["subject"]["title"][:60]
stype = n["subject"].get("type", "").lower()
spawn_agent(repo, num, title, stype, reason)
last_dispatch = now
if new_ids:
ts = time.strftime("%H:%M:%S")
new_notifs = [n for n in notifications if n["id"] in new_ids]
print(f"[{ts}] {len(new_ids)} new notification(s)", flush=True)
if now - last_dispatch_time >= COOLDOWN:
dispatch_notifications(new_notifs)
last_dispatch_time = now
else:
remaining = int(COOLDOWN - (now - last_dispatch_time))
print(f" → Cooldown ({remaining}s remaining)", flush=True)
seen_ids = current_ids
# Assignment scan (less frequent)
if now - last_assign_scan >= ASSIGNMENT_INTERVAL:
for repo in WATCHED_REPOS:
for itype in ["issues", "pulls"]:
items = gitea_api("GET",
f"/repos/{repo}/issues?state=open&type={itype}"
f"&assignee={BOT_USER}&sort=updated&limit=10") or []
for item in items:
num = str(item["number"])
if f"{repo}#{num}" in dispatched_issues:
continue
# Only recently updated items (5 min)
# ... add is_recently_updated() check here
if needs_bot_response(repo, num):
spawn_agent(repo, num, item["title"][:60],
"pull" if itype == "pulls" else "issue",
f"assigned to {BOT_USER}")
scan_assigned_issues()
last_assign_scan = now
@ -416,8 +525,26 @@ if __name__ == "__main__":
main()
```
Run it as a background service (launchd on macOS, systemd on Linux) with the env
vars set. It's intentionally simple — no frameworks, no async, no dependencies.
Run it as a background service (launchd on macOS, systemd on Linux) with
`GITEA_URL` and `GITEA_TOKEN` set. Customize `WATCHED_REPOS`, `BOT_USERNAME`,
and `GIT_CHANNEL` for your setup. It's intentionally simple — no frameworks,
no async, no dependencies.
**Lessons learned during development:**
- `openclaw cron add --at` uses formats like `1s`, `20m` — not `+5s` or `+0s`.
- `--no-deliver` is incompatible with `--session main`. Use
`--session isolated` with `--no-deliver`.
- `--system-event` targets the main DM session. If your agent is active in a
channel session, it won't see system events. Use `--session isolated` with
`--message` instead.
- Isolated agent sessions don't have access to workspace files (TOOLS.md, etc).
Bake all credentials and instructions directly into the agent prompt.
- Agents WILL go out of scope unless the SCOPE constraint is extremely explicit
and uses strong language ("violating scope is a critical failure").
- When the user's explicit instructions in an issue conflict with boilerplate
rules in the agent prompt, the agent will follow the boilerplate unless the
prompt explicitly says "user instructions take priority."
### The Daily Diary