From ee474cdd7f8a41c0c123f040889f60fd1e62dbb7 Mon Sep 17 00:00:00 2001 From: Caret Date: Mon, 6 Apr 2026 11:55:16 +0000 Subject: [PATCH] =?UTF-8?q?Initial=20commit=20=E2=80=94=20tg-stream,=20tg-?= =?UTF-8?q?task,=20Claude=20Code=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 132 +++++++ bin/tg-stream | 378 +++++++++++++++++++ bin/tg-task | 103 +++++ docs/telegram-plugin-duplicate-poller-bug.md | 57 +++ hooks/bash-heartbeat-post.sh | 35 ++ hooks/bash-heartbeat-pre.sh | 57 +++ hooks/redirect-telegram-reply.sh | 61 +++ 7 files changed, 823 insertions(+) create mode 100644 README.md create mode 100755 bin/tg-stream create mode 100755 bin/tg-task create mode 100644 docs/telegram-plugin-duplicate-poller-bug.md create mode 100755 hooks/bash-heartbeat-post.sh create mode 100755 hooks/bash-heartbeat-pre.sh create mode 100755 hooks/redirect-telegram-reply.sh diff --git a/README.md b/README.md new file mode 100644 index 0000000..1c28903 --- /dev/null +++ b/README.md @@ -0,0 +1,132 @@ +# claude-telegram-live-feed + +A hardened character-by-character streaming Telegram CLI for AI assistants (built for [Claude Code](https://claude.com/claude-code)), plus the set of Claude Code hooks that enforce streaming-by-default and auto-heartbeat long commands. + +The core idea: when an AI assistant replies to a Telegram chat, the user shouldn't wait in silence for a wall of text. Instead, the reply appears as a single message that grows word-by-word in front of them, with an alternating cursor glyph, finishing in 1-3 seconds. Long-running tasks produce automatic "starting" and "done in Ns" heartbeats so the channel never goes silent. + +Built as part of the OpenClaw agent system, but usable by any tool that can shell out. + +## Components + +### `bin/tg-stream` — the streamer + +A single-file [bun](https://bun.com) script. Reads a message from arguments or stdin, sends a placeholder to Telegram, then pipelined-edits it with a growing prefix so the user sees a typing animation. Auto-picks chunk size based on message length and a target stream duration (default 2 seconds). + +**Features** +- Direct calls to `api.telegram.org/bot/editMessageText` — no MCP middleman +- Word-boundary snapping so chunks end at spaces +- Alternating `▌` / `▐` cursor to dodge Telegram's "message not modified" whitespace rejection +- Auto-split at 4000 characters to stay under Telegram's 4096 message limit — long replies become multiple sequential bubbles +- Header line (`--header "🔧 working"`) stays fixed while the body grows +- Plain-send mode (`--no-stream`) for fast one-line acknowledgements + +**Safety contracts** +- Global concurrency cap via `O_EXCL` atomic slot files (default: 10 concurrent; race-free, no `flock`) +- Per-fetch timeout via `Promise.race` (default: 10 seconds; works around [bun AbortController bugs](https://github.com/oven-sh/bun/issues/13302)) +- Total wall-time budget with self-kill (default: 60 seconds) +- Exponential 429 retry with jitter, honoring server-suggested `retry_after` +- RSS memory watchdog reading `/proc/self/status` (default cap: 256 MB) +- Structured JSON failure log with line-count rotation (default: `/host/root/.caret/log/tg-stream.log`) + +**Usage** +```bash +tg-stream "your text" # stream to default chat in ~2 seconds +tg-stream --header "🔧 working" "body text" # static prefix + streamed body +tg-stream --target 3 "longer answer..." # stretch the stream to 3 seconds +tg-stream --no-stream "short ack" # plain send, no streaming +echo "from stdin" | tg-stream +tg-stream --chat 1234567 "explicit chat id" +``` + +**Env knobs** +| var | default | purpose | +|-------------------------|------------------------------------------|-------------------------------------| +| `TG_DEFAULT_CHAT` | first env default or hardcoded | target chat_id when `--chat` omitted | +| `TG_MAX_CONCURRENT` | `10` | global concurrency cap | +| `TG_FETCH_TIMEOUT_MS` | `10000` | per HTTP request timeout | +| `TG_MAX_TOTAL_MS` | `60000` | total invocation wall budget | +| `TG_MEM_CAP_MB` | `256` | RSS cap before self-abort | +| `TG_LOG_FILE` | `/host/root/.caret/log/tg-stream.log` | structured JSON log path | + +**Token source:** expects a file at `/root/.claude/channels/telegram/.env` containing `TELEGRAM_BOT_TOKEN=...`. Adjust the `ENV_FILE` constant at the top of `tg-stream` if your path differs. + +**Exit codes** +| code | meaning | +|------|-------------------------------------------| +| `0` | success | +| `1` | send failed (final attempt errored) | +| `2` | bad arguments | +| `3` | memory cap exceeded | +| `4` | back-pressure drop (≥ cap concurrent) | +| `5` | total time budget exceeded | + +### `bin/tg-task` — the long-command wrapper + +A bash wrapper that turns any shell command into a self-announcing task. It sends a "🔧 starting · label" message to Telegram before running, an "⏳ still on it · Ns elapsed" heartbeat every 8 seconds while running, and a "✅ done in Ns · label" completion message (with truncated stdout/stderr) after. + +**Usage** +```bash +tg-task "label" -- +tg-task --target 3 "rendering 8K avatar" -- python3 /tmp/avatar.py +HEARTBEAT_SECS=5 tg-task "deploy" -- ./deploy.sh production +``` + +Use it for anything that might exceed 5 seconds. No more waiting in silence. + +### `hooks/` — Claude Code PreToolUse / PostToolUse hooks + +Three hooks that enforce the streaming behavior infrastructurally instead of relying on the assistant remembering to use the right tool. + +**`redirect-telegram-reply.sh`** — PreToolUse hook matched against `mcp__plugin_telegram_telegram__reply`. Blocks plain Telegram reply calls without a `files` attachment (because those should stream), passes attachment-bearing calls through (because `tg-stream` doesn't do attachments yet). The assistant is physically unable to send a non-streamed reply once this hook is installed. + +**`bash-heartbeat-pre.sh`** — PreToolUse hook matched against `Bash`. Fires a `tg-stream --no-stream "🔧 description"` in the background before every Bash call, except for a few noisy patterns (`tg-stream` itself to avoid loops, `ls`/`cat`/`echo`). + +**`bash-heartbeat-post.sh`** — PostToolUse hook matched against `Bash`. Pairs with the pre hook via a small state file and, if the Bash call took longer than 5 seconds, fires a `✅ done in Ns · description` completion message. + +**Install** (in `~/.claude/settings.json`) +```json +{ + "hooks": { + "PreToolUse": [ + { + "matcher": "mcp__plugin_telegram_telegram__reply", + "hooks": [{ "type": "command", "command": "/path/to/hooks/redirect-telegram-reply.sh" }] + }, + { + "matcher": "Bash", + "hooks": [{ "type": "command", "command": "/path/to/hooks/bash-heartbeat-pre.sh" }] + } + ], + "PostToolUse": [ + { + "matcher": "Bash", + "hooks": [{ "type": "command", "command": "/path/to/hooks/bash-heartbeat-post.sh" }] + } + ] + } +} +``` + +The hooks use `node` to parse the JSON stdin payload, not `jq`, because `jq` isn't always installed. + +## Dependencies + +- [bun](https://bun.com) for `tg-stream` +- `bash`, `node` for the hooks and `tg-task` +- `curl` is not required — `tg-stream` uses bun's built-in `fetch` + +## Why this exists + +Telegram bot API responses from an LLM usually arrive as one big wall of text, which feels dead. By streaming edits to a single message, the reply appears to be "typed" in real time. Combined with hook enforcement, the assistant cannot accidentally regress to the wall-of-text behavior, and long-running tasks produce automatic heartbeats so the user never stares at a blank chat for more than a few seconds. + +Built as part of the OpenClaw agent infrastructure. Battle-tested against: +- 20 parallel invocations → concurrency cap holds at 10, excess cleanly drops with exit 4, no slot leaks +- Forced 1ms fetch timeout → retry path engages, 3 attempts with exponential backoff, clean exit 1 +- 5400-character payload → auto-splits into 2 sequential streaming bubbles, no `MESSAGE_TOO_LONG` errors +- Production deploy under real Telegram traffic → no perceptible regression vs the earlier unhardened version + +See `docs/` for additional notes including an upstream plugin bug draft for the MCP Telegram plugin's duplicate-poller race. + +## License + +MIT. Use it, copy it, adapt it. diff --git a/bin/tg-stream b/bin/tg-stream new file mode 100755 index 0000000..f01871e --- /dev/null +++ b/bin/tg-stream @@ -0,0 +1,378 @@ +#!/usr/bin/env bun +// tg-stream — stream text to Telegram via direct Bot API. +// +// Single-file hardened implementation. Provides: +// - global concurrency cap via O_EXCL atomic slot files (race-free, no flock) +// - per-fetch timeout via Promise.race (works around bun AbortController bugs) +// - exponential 429 retry honoring server retry_after + jitter +// - total wall-time budget via setTimeout self-kill +// - RSS memory cap via /proc/self/status watchdog +// - structured JSON failure log, rotated by line count +// +// Usage: +// tg-stream "your text" +// tg-stream --header "🔧 working" "body" +// tg-stream --no-stream "short ack" +// echo "from stdin" | tg-stream +// tg-stream --target 3 "longer answer..." +// +// Token: read from /root/.claude/channels/telegram/.env +// Default chat: TG_DEFAULT_CHAT env or 107806725 (Rooh) +// +// Env knobs (with defaults): +// TG_MAX_CONCURRENT 10 max parallel invocations +// TG_FETCH_TIMEOUT_MS 10000 per HTTP request +// TG_MAX_TOTAL_MS 60000 total invocation budget +// TG_MEM_CAP_MB 256 RSS cap before self-abort +// TG_DEFAULT_CHAT 107806725 + +import fs from 'node:fs'; +import path from 'node:path'; + +const ENV_FILE = '/root/.claude/channels/telegram/.env'; +const DEFAULT_CHAT = process.env.TG_DEFAULT_CHAT || '107806725'; +const MAX_CONCURRENT = parseInt(process.env.TG_MAX_CONCURRENT || '10', 10); +const FETCH_TIMEOUT_MS = parseInt(process.env.TG_FETCH_TIMEOUT_MS || '10000', 10); +const MAX_TOTAL_MS = parseInt(process.env.TG_MAX_TOTAL_MS || '60000', 10); +const MEM_CAP_MB = parseInt(process.env.TG_MEM_CAP_MB || '256', 10); +const MAX_429_RETRIES = 3; +const SLOT_DIR = '/tmp/tg-stream-slots'; +const LOG_FILE = process.env.TG_LOG_FILE || '/host/root/.caret/log/tg-stream.log'; +const LOG_KEEP_LINES = 1000; + +// --------------------------------------------------------------------------- +// Structured JSON log with cheap line-count rotation +function log(level, msg, extra = {}) { + const line = JSON.stringify({ + ts: new Date().toISOString(), pid: process.pid, level, msg, ...extra, + }) + '\n'; + try { + try { fs.mkdirSync(path.dirname(LOG_FILE), { recursive: true }); } catch (_) {} + fs.appendFileSync(LOG_FILE, line); + if (Math.random() < 0.02) { + const all = fs.readFileSync(LOG_FILE, 'utf8').split('\n'); + if (all.length > LOG_KEEP_LINES) { + fs.writeFileSync(LOG_FILE, all.slice(-LOG_KEEP_LINES).join('\n')); + } + } + } catch (_) { /* log dir not writable — degrade silently */ } +} + +// --------------------------------------------------------------------------- +// Concurrency cap via O_EXCL atomic slot files (race-free, no flock). +function acquireSlot() { + try { fs.mkdirSync(SLOT_DIR, { recursive: true }); } catch (_) {} + const now = Date.now(); + for (const name of fs.readdirSync(SLOT_DIR)) { + const full = path.join(SLOT_DIR, name); + try { + const stat = fs.statSync(full); + if (now - stat.mtimeMs > MAX_TOTAL_MS) fs.unlinkSync(full); + } catch (_) {} + } + for (let i = 0; i < MAX_CONCURRENT; i++) { + const slot = path.join(SLOT_DIR, `slot.${i}`); + try { + const fd = fs.openSync(slot, fs.constants.O_CREAT | fs.constants.O_EXCL | fs.constants.O_WRONLY); + fs.writeSync(fd, String(process.pid)); + fs.closeSync(fd); + return slot; + } catch (e) { + if (e.code !== 'EEXIST') throw e; + } + } + return null; +} + +function releaseSlot(slotPath) { + if (!slotPath) return; + try { fs.unlinkSync(slotPath); } catch (_) {} +} + +// --------------------------------------------------------------------------- +// Memory watchdog — self-abort if RSS exceeds the cap +let memWatchTimer = null; +function startMemWatch() { + if (memWatchTimer) return; + memWatchTimer = setInterval(() => { + try { + const status = fs.readFileSync('/proc/self/status', 'utf8'); + const m = status.match(/VmRSS:\s+(\d+) kB/); + if (m) { + const mb = parseInt(m[1], 10) / 1024; + if (mb > MEM_CAP_MB) { + log('error', 'memory cap exceeded, aborting', { vmrss_mb: mb, cap_mb: MEM_CAP_MB }); + process.exit(3); + } + } + } catch (_) {} + }, 1000); + memWatchTimer.unref(); +} + +// --------------------------------------------------------------------------- +function readToken() { + const m = fs.readFileSync(ENV_FILE, 'utf8').match(/TELEGRAM_BOT_TOKEN=(.+)/); + if (!m) throw new Error(`TELEGRAM_BOT_TOKEN not found in ${ENV_FILE}`); + return m[1].trim(); +} + +function parseArgs(argv) { + const opts = { + chat: DEFAULT_CHAT, gap: 0, batch: 0, target: 2, rtt: 300, + stream: true, header: '', cursor: '▌', wordWrap: true, text: null, + }; + const rest = []; + for (let i = 0; i < argv.length; i++) { + const a = argv[i]; + if (a === '--chat') opts.chat = argv[++i]; + else if (a === '--gap') opts.gap = parseInt(argv[++i], 10); + else if (a === '--batch') opts.batch = parseInt(argv[++i], 10); + else if (a === '--target') opts.target = parseFloat(argv[++i]); + else if (a === '--rtt') opts.rtt = parseInt(argv[++i], 10); + else if (a === '--no-stream') opts.stream = false; + else if (a === '--no-words') opts.wordWrap = false; + else if (a === '--header') opts.header = argv[++i]; + else if (a === '--cursor') opts.cursor = argv[++i]; + else if (a === '-h' || a === '--help') { printHelp(); process.exit(0); } + else rest.push(a); + } + if (rest.length) opts.text = rest.join(' '); + return opts; +} + +function printHelp() { + process.stderr.write(`tg-stream — hardened streaming Telegram CLI + +Usage: + tg-stream [options] "text" + echo "text" | tg-stream [options] + +Options: + --chat ID Target chat_id (default: ${DEFAULT_CHAT}) + --gap MS Extra delay between edits in ms (default: 0) + --batch N Characters per edit (default: auto from --target) + --target SECS Target stream duration in seconds (default: 2) + --rtt MS Assumed per-edit RTT for auto-batch (default: 300) + --no-words Don't snap edits to word boundaries + --no-stream Send as a single message, no streaming + --header TEXT Static header line above the streamed body + --cursor CHAR Cursor glyph (default: ▌) + -h, --help Show this help + +Env knobs: + TG_MAX_CONCURRENT (${MAX_CONCURRENT}) TG_FETCH_TIMEOUT_MS (${FETCH_TIMEOUT_MS}) + TG_MAX_TOTAL_MS (${MAX_TOTAL_MS}) TG_MEM_CAP_MB (${MEM_CAP_MB}) + TG_LOG_FILE (${LOG_FILE}) + +Exit codes: + 0 ok 1 send failed + 2 bad args 3 memory cap exceeded + 4 back-pressure drop 5 total time budget exceeded +`); +} + +// --------------------------------------------------------------------------- +// Promise.race fetch timeout (works around bun AbortController issues) +async function fetchJsonTimed(url, init, timeoutMs) { + let timer; + const timeoutP = new Promise((_, rej) => { + timer = setTimeout(() => rej(new Error(`fetch timeout after ${timeoutMs}ms`)), timeoutMs); + }); + try { + const r = await Promise.race([fetch(url, init), timeoutP]); + return await r.json(); + } finally { + clearTimeout(timer); + } +} + +const TG = (token) => async (method, body) => { + const url = `https://api.telegram.org/bot${token}/${method}`; + const init = { + method: 'POST', + headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, + body: new URLSearchParams(body), + }; + let attempt = 0; + while (true) { + try { + const j = await fetchJsonTimed(url, init, FETCH_TIMEOUT_MS); + if (j.ok) return j; + if (j.parameters?.retry_after && attempt < MAX_429_RETRIES) { + const expBackoff = 2 ** attempt * 1000; + const serverHint = j.parameters.retry_after * 1000; + const jitter = Math.floor(Math.random() * 1000); + const wait = Math.max(expBackoff, serverHint) + jitter; + log('warn', '429 retry', { method, attempt, wait_ms: wait, retry_after: j.parameters.retry_after }); + await new Promise(r => setTimeout(r, wait)); + attempt++; + continue; + } + log('error', 'tg api non-ok', { method, code: j.error_code, desc: j.description }); + return j; + } catch (e) { + if (attempt < MAX_429_RETRIES) { + const wait = 2 ** attempt * 1000 + Math.floor(Math.random() * 1000); + log('warn', 'fetch error, retrying', { method, attempt, wait_ms: wait, err: String(e?.message || e) }); + await new Promise(r => setTimeout(r, wait)); + attempt++; + continue; + } + log('error', 'fetch error, giving up', { method, err: String(e?.message || e) }); + return { ok: false, description: String(e?.message || e) }; + } + } +}; + +// --------------------------------------------------------------------------- +async function main() { + startMemWatch(); + + const opts = parseArgs(process.argv.slice(2)); + if (!opts.text && !process.stdin.isTTY) opts.text = fs.readFileSync(0, 'utf8'); + if (!opts.text) { printHelp(); process.exit(2); } + opts.text = opts.text.replace(/\s+$/, ''); + + const slot = acquireSlot(); + if (!slot) { + log('warn', 'back-pressure drop', { reason: 'max concurrent', cap: MAX_CONCURRENT }); + process.stderr.write(`tg-stream: dropped (>= ${MAX_CONCURRENT} concurrent)\n`); + process.exit(4); + } + + const overall = setTimeout(() => { + log('error', 'total time budget exceeded', { ms: MAX_TOTAL_MS }); + releaseSlot(slot); + process.exit(5); + }, MAX_TOTAL_MS); + overall.unref(); + + process.on('SIGINT', () => { releaseSlot(slot); process.exit(130); }); + process.on('SIGTERM', () => { releaseSlot(slot); process.exit(143); }); + + // Telegram message size ceiling is 4096 chars. We leave headroom for the + // header + cursor glyph, so split at 3900 to be safe. + const MAX_CHUNK_CHARS = 3900; + + // Split text into chunks of <= MAX_CHUNK_CHARS, preferring word boundaries. + function splitIntoChunks(text, limit) { + if (text.length <= limit) return [text]; + const chunks = []; + let pos = 0; + while (pos < text.length) { + if (text.length - pos <= limit) { chunks.push(text.slice(pos)); break; } + let end = pos + limit; + // Walk back to the last whitespace within a reasonable window + const windowStart = Math.max(pos + Math.floor(limit * 0.7), end - 300); + let split = -1; + for (let j = end; j > windowStart; j--) { + if (/\s/.test(text[j - 1])) { split = j; break; } + } + if (split < 0) split = end; // hard split if no whitespace found + chunks.push(text.slice(pos, split)); + pos = split; + // Skip leading whitespace of next chunk + while (pos < text.length && /\s/.test(text[pos])) pos++; + } + return chunks; + } + + let exitCode = 0; + try { + const token = readToken(); + const tg = TG(token); + const headerStr = opts.header ? opts.header + '\n' : ''; + const chunks = splitIntoChunks(opts.text, MAX_CHUNK_CHARS - headerStr.length - 4); + if (chunks.length > 1) { + log('info', 'splitting long message', { total_chars: opts.text.length, chunks: chunks.length }); + } + + // Non-streaming path: send each chunk as its own plain message. + if (!opts.stream) { + const mids = []; + for (const chunk of chunks) { + const j = await tg('sendMessage', { chat_id: opts.chat, text: headerStr + chunk }); + if (!j.ok) { console.error('send failed:', j.description); exitCode = 1; break; } + mids.push(j.result.message_id); + } + if (mids.length) console.log(mids.join(',')); + } else { + // Streaming path: stream each chunk into its own bubble sequentially. + const mids = []; + for (let ci = 0; ci < chunks.length; ci++) { + const chunkText = chunks[ci]; + + const init = await tg('sendMessage', { chat_id: opts.chat, text: headerStr + opts.cursor }); + if (!init.ok) { console.error('placeholder failed:', init.description); exitCode = 1; break; } + const mid = init.result.message_id; + mids.push(mid); + + // Auto-batch sized from this chunk's length, not the whole text. + let batch = opts.batch; + if (!batch || batch < 1) { + const perEdit = opts.rtt + opts.gap; + batch = Math.max(1, Math.ceil(chunkText.length * perEdit / (opts.target * 1000))); + } + + const breakpoints = []; + let pos = 0; + while (pos < chunkText.length) { + let next = pos + batch; + if (opts.wordWrap && next < chunkText.length) { + const window = Math.ceil(batch / 2); + let best = -1; + for (let j = Math.max(pos + 1, next - window); j <= Math.min(chunkText.length, next + window); j++) { + if (/\s/.test(chunkText[j - 1])) { best = j; break; } + } + if (best > 0) next = best; + } + if (next > chunkText.length) next = chunkText.length; + breakpoints.push(next); + pos = next; + } + + const altCursor = opts.cursor === '▌' ? '▐' : opts.cursor; + const cursors = [opts.cursor, altCursor]; + const results = []; + let lastSent = ''; + + for (let k = 0; k < breakpoints.length - 1; k++) { + const i = breakpoints[k]; + const partial = chunkText.slice(0, i); + if (partial === lastSent) continue; + lastSent = partial; + const text = headerStr + partial + cursors[k % 2]; + results.push(await tg('editMessageText', { chat_id: opts.chat, message_id: mid, text })); + if (opts.gap > 0) await new Promise(r => setTimeout(r, opts.gap)); + } + // Final edit: full chunk text, no cursor + const finalRes = await tg('editMessageText', { chat_id: opts.chat, message_id: mid, text: headerStr + chunkText }); + results.push(finalRes); + + const fail = results.filter(r => !r.ok); + if (fail.length && !finalRes.ok) { + // Final edit failed — user doesn't see the full text. Escalate. + console.error(`stream chunk ${ci+1}/${chunks.length} final edit failed: ${finalRes.description}`); + exitCode = 1; + } else if (fail.length) { + console.error(`stream chunk ${ci+1}/${chunks.length}: ${results.length - fail.length}/${results.length} edits ok (final ok)`); + } + } + if (mids.length) console.log(mids.join(',')); + } + } catch (e) { + log('error', 'main exception', { err: String(e?.message || e) }); + console.error(e?.message || e); + exitCode = 1; + } finally { + clearTimeout(overall); + releaseSlot(slot); + if (memWatchTimer) clearInterval(memWatchTimer); + // MUST exit from finally so early returns from try still honor exitCode. + // Otherwise `return` inside try jumps straight past the post-finally line. + process.exit(exitCode); + } +} + +main().catch(e => { log('error', 'unhandled', { err: String(e?.message || e) }); process.exit(1); }); diff --git a/bin/tg-task b/bin/tg-task new file mode 100755 index 0000000..a02f4b5 --- /dev/null +++ b/bin/tg-task @@ -0,0 +1,103 @@ +#!/usr/bin/env bash +# tg-task — wrap any long-running command so Telegram gets: +# 1. an immediate "starting" heartbeat before the command runs +# 2. an interim "still running (Ns elapsed)" heartbeat every HEARTBEAT_SECS +# 3. a final "done in Ns" + the command's stdout (truncated) +# +# Usage: +# tg-task "label" -- +# tg-task --target 3 "rendering avatar" -- python3 /tmp/avatar.py +# tg-task "apt install python3" -- apt-get install -y python3 +# +# Env: +# HEARTBEAT_SECS — interim heartbeat interval (default 8s) +# TG_STREAM — path to tg-stream (default /host/root/openclaw/tg-stream) +# TG_CHAT — chat to notify (default tg-stream's default — StarCros) +# +# Why this exists: the rule "send a heartbeat every ~5 seconds during long +# work" was previously a behavioral memory that I had to remember every +# session. That burns tokens and I can forget. Wrapping it as a script makes +# the behavior deterministic and free. + +set -u +TG_STREAM="${TG_STREAM:-/host/root/openclaw/tg-stream}" +HEARTBEAT_SECS="${HEARTBEAT_SECS:-8}" +TARGET=2 + +# Parse options before -- +LABEL="" +while [[ $# -gt 0 ]]; do + case "$1" in + --target) TARGET="$2"; shift 2;; + --) shift; break;; + *) + if [[ -z "$LABEL" ]]; then LABEL="$1"; shift + else break + fi + ;; + esac +done + +if [[ -z "$LABEL" ]]; then + echo "usage: tg-task [--target SECS] \"label\" -- " >&2 + exit 2 +fi +if [[ $# -eq 0 ]]; then + echo "tg-task: missing command after --" >&2 + exit 2 +fi + +START_TS=$(date +%s) + +# Heartbeat 1: starting +"$TG_STREAM" --target "$TARGET" "🔧 starting · $LABEL" >/dev/null 2>&1 || true + +# Run command in background, capture output +TMPOUT=$(mktemp /tmp/tg-task-out.XXXXXX) +TMPERR=$(mktemp /tmp/tg-task-err.XXXXXX) +"$@" >"$TMPOUT" 2>"$TMPERR" & +PID=$! + +# Heartbeat loop +LAST_HEARTBEAT=$START_TS +while kill -0 "$PID" 2>/dev/null; do + sleep 1 + NOW=$(date +%s) + if (( NOW - LAST_HEARTBEAT >= HEARTBEAT_SECS )); then + ELAPSED=$((NOW - START_TS)) + "$TG_STREAM" --no-stream "⏳ still on it · $LABEL · ${ELAPSED}s elapsed" >/dev/null 2>&1 || true + LAST_HEARTBEAT=$NOW + fi +done + +wait "$PID" +RC=$? +END_TS=$(date +%s) +DURATION=$((END_TS - START_TS)) + +# Final report +OUT_SNIPPET=$(tail -c 1500 "$TMPOUT") +ERR_SNIPPET=$(tail -c 500 "$TMPERR") +STATUS=$([[ $RC -eq 0 ]] && echo "✅ done" || echo "❌ failed (rc=$RC)") + +REPORT="$STATUS · $LABEL · ${DURATION}s" +if [[ -n "$OUT_SNIPPET" ]]; then + REPORT="$REPORT + +stdout: +$OUT_SNIPPET" +fi +if [[ -n "$ERR_SNIPPET" && $RC -ne 0 ]]; then + REPORT="$REPORT + +stderr: +$ERR_SNIPPET" +fi + +"$TG_STREAM" --target "$TARGET" "$REPORT" >/dev/null 2>&1 || true + +# Also dump the full output to stdout/stderr so the caller can pipe/inspect +cat "$TMPOUT" +[[ -s "$TMPERR" ]] && cat "$TMPERR" >&2 +rm -f "$TMPOUT" "$TMPERR" +exit $RC diff --git a/docs/telegram-plugin-duplicate-poller-bug.md b/docs/telegram-plugin-duplicate-poller-bug.md new file mode 100644 index 0000000..1341795 --- /dev/null +++ b/docs/telegram-plugin-duplicate-poller-bug.md @@ -0,0 +1,57 @@ +# Bug: Telegram plugin spawns one poller per Claude session → silent message drops + +**Plugin:** `claude-plugins-official/external_plugins/telegram` +**Symptom:** When two or more Claude Code sessions are running concurrently, incoming Telegram messages to the bot are randomly dropped — the user sends N messages, the bot only sees a subset, and replies arrive late or not at all. No error is surfaced anywhere. + +## Root cause + +Each Claude Code session starts its own copy of the Telegram plugin via: + +``` +bun run --cwd --shell=bun --silent start +``` + +Observed on this host (two concurrent sessions): + +``` +PID 25 pts/0 claude +PID 58 pts/0 bun run .../external_plugins/telegram start +PID 487 pts/1 claude +PID 588 pts/1 bun run .../external_plugins/telegram start +``` + +Both bun processes long-poll Telegram's `getUpdates` against the **same bot token**. Per Telegram Bot API semantics and the known tdlib issue [tdlib/telegram-bot-api#43](https://github.com/tdlib/telegram-bot-api/issues/43), concurrent `getUpdates` calls against one token race: whichever call acks an update first marks it confirmed server-side, and the other poller never sees it. With N concurrent pollers, on average ~(N-1)/N of any individual poller's "view" is missing updates. Since the plugin in any given Claude session only acts on what *its* poller sees, messages get silently dropped from the user's perspective. + +This is not a Telegram bug — Telegram's `getUpdates` is documented as single-consumer. It's a plugin architecture bug. + +## Reproduction + +1. Open two Claude Code sessions on the same machine that both load this plugin (same bot token). +2. From a Telegram user paired to the bot, send 5 messages back-to-back. +3. Observe: only some messages produce a reply; the rest vanish without trace. +4. Kill one of the two `bun .../telegram start` processes. +5. Resend 5 messages — all arrive and get replies. + +Confirmed reproducible on this host (2026-04-06): killing PID 58 immediately fixed message delivery for the surviving session. + +## Why no logs + +`/root/.claude/plugins/data/telegram-claude-plugins-official/` is empty — the plugin emits no per-message audit log, so the drop is invisible unless you happen to compare what the user sent vs what the agent saw. + +## Suggested fix + +Plugin needs a singleton lock per bot token. Options: + +1. **File lock** (`flock` on `/tmp/telegram-plugin-.lock`) — first plugin instance to start grabs the lock and runs the poller; subsequent instances detect the lock and instead attach to a local Unix socket / named pipe owned by the leader to receive updates fan-out. On leader exit, a follower takes over. +2. **Webhook mode** instead of long polling — Telegram delivers each update exactly once to the configured URL, sidestepping the race entirely. Requires a public endpoint or tunnel, so harder to set up but more robust. +3. **Out-of-process daemon** — ship the poller as a separate long-lived service (systemd unit / docker container) and have each Claude session's plugin instance act as a thin client that subscribes to it. Cleanest separation of concerns. + +Option 1 is the smallest change. Option 3 is the right long-term shape, especially since multiple Claude sessions are clearly an intended use case. + +## Workaround until fix lands + +Run a single Claude session, OR manually `kill` all but one `bun run .../telegram start` process whenever a second session starts. Add a startup hook that does this automatically. + +## Also worth logging + +The plugin should write a per-message log line (`update_id received`, `chat_id`, `acted/ignored`) to `plugins/data/telegram-claude-plugins-official/`. Even one line per inbound update would have saved hours of guessing here. diff --git a/hooks/bash-heartbeat-post.sh b/hooks/bash-heartbeat-post.sh new file mode 100755 index 0000000..9027138 --- /dev/null +++ b/hooks/bash-heartbeat-post.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash +# PostToolUse hook on the Bash tool — if the wrapped command took longer than +# DURATION_FLOOR seconds, send a "done in Ns ·