Files
claude-telegram-live-feed/bin/tg-stream

379 lines
14 KiB
Plaintext
Executable File

#!/usr/bin/env bun
// tg-stream — stream text to Telegram via direct Bot API.
//
// Single-file hardened implementation. Provides:
// - global concurrency cap via O_EXCL atomic slot files (race-free, no flock)
// - per-fetch timeout via Promise.race (works around bun AbortController bugs)
// - exponential 429 retry honoring server retry_after + jitter
// - total wall-time budget via setTimeout self-kill
// - RSS memory cap via /proc/self/status watchdog
// - structured JSON failure log, rotated by line count
//
// Usage:
// tg-stream "your text"
// tg-stream --header "🔧 working" "body"
// tg-stream --no-stream "short ack"
// echo "from stdin" | tg-stream
// tg-stream --target 3 "longer answer..."
//
// Token: read from /root/.claude/channels/telegram/.env
// Default chat: TG_DEFAULT_CHAT env or 107806725 (Rooh)
//
// Env knobs (with defaults):
// TG_MAX_CONCURRENT 10 max parallel invocations
// TG_FETCH_TIMEOUT_MS 10000 per HTTP request
// TG_MAX_TOTAL_MS 60000 total invocation budget
// TG_MEM_CAP_MB 256 RSS cap before self-abort
// TG_DEFAULT_CHAT 107806725
import fs from 'node:fs';
import path from 'node:path';
const ENV_FILE = '/root/.claude/channels/telegram/.env';
const DEFAULT_CHAT = process.env.TG_DEFAULT_CHAT || '107806725';
const MAX_CONCURRENT = parseInt(process.env.TG_MAX_CONCURRENT || '10', 10);
const FETCH_TIMEOUT_MS = parseInt(process.env.TG_FETCH_TIMEOUT_MS || '10000', 10);
const MAX_TOTAL_MS = parseInt(process.env.TG_MAX_TOTAL_MS || '60000', 10);
const MEM_CAP_MB = parseInt(process.env.TG_MEM_CAP_MB || '256', 10);
const MAX_429_RETRIES = 3;
const SLOT_DIR = '/tmp/tg-stream-slots';
const LOG_FILE = process.env.TG_LOG_FILE || '/host/root/.caret/log/tg-stream.log';
const LOG_KEEP_LINES = 1000;
// ---------------------------------------------------------------------------
// Structured JSON log with cheap line-count rotation
function log(level, msg, extra = {}) {
const line = JSON.stringify({
ts: new Date().toISOString(), pid: process.pid, level, msg, ...extra,
}) + '\n';
try {
try { fs.mkdirSync(path.dirname(LOG_FILE), { recursive: true }); } catch (_) {}
fs.appendFileSync(LOG_FILE, line);
if (Math.random() < 0.02) {
const all = fs.readFileSync(LOG_FILE, 'utf8').split('\n');
if (all.length > LOG_KEEP_LINES) {
fs.writeFileSync(LOG_FILE, all.slice(-LOG_KEEP_LINES).join('\n'));
}
}
} catch (_) { /* log dir not writable — degrade silently */ }
}
// ---------------------------------------------------------------------------
// Concurrency cap via O_EXCL atomic slot files (race-free, no flock).
function acquireSlot() {
try { fs.mkdirSync(SLOT_DIR, { recursive: true }); } catch (_) {}
const now = Date.now();
for (const name of fs.readdirSync(SLOT_DIR)) {
const full = path.join(SLOT_DIR, name);
try {
const stat = fs.statSync(full);
if (now - stat.mtimeMs > MAX_TOTAL_MS) fs.unlinkSync(full);
} catch (_) {}
}
for (let i = 0; i < MAX_CONCURRENT; i++) {
const slot = path.join(SLOT_DIR, `slot.${i}`);
try {
const fd = fs.openSync(slot, fs.constants.O_CREAT | fs.constants.O_EXCL | fs.constants.O_WRONLY);
fs.writeSync(fd, String(process.pid));
fs.closeSync(fd);
return slot;
} catch (e) {
if (e.code !== 'EEXIST') throw e;
}
}
return null;
}
function releaseSlot(slotPath) {
if (!slotPath) return;
try { fs.unlinkSync(slotPath); } catch (_) {}
}
// ---------------------------------------------------------------------------
// Memory watchdog — self-abort if RSS exceeds the cap
let memWatchTimer = null;
function startMemWatch() {
if (memWatchTimer) return;
memWatchTimer = setInterval(() => {
try {
const status = fs.readFileSync('/proc/self/status', 'utf8');
const m = status.match(/VmRSS:\s+(\d+) kB/);
if (m) {
const mb = parseInt(m[1], 10) / 1024;
if (mb > MEM_CAP_MB) {
log('error', 'memory cap exceeded, aborting', { vmrss_mb: mb, cap_mb: MEM_CAP_MB });
process.exit(3);
}
}
} catch (_) {}
}, 1000);
memWatchTimer.unref();
}
// ---------------------------------------------------------------------------
function readToken() {
const m = fs.readFileSync(ENV_FILE, 'utf8').match(/TELEGRAM_BOT_TOKEN=(.+)/);
if (!m) throw new Error(`TELEGRAM_BOT_TOKEN not found in ${ENV_FILE}`);
return m[1].trim();
}
function parseArgs(argv) {
const opts = {
chat: DEFAULT_CHAT, gap: 0, batch: 0, target: 2, rtt: 300,
stream: true, header: '', cursor: '▌', wordWrap: true, text: null,
};
const rest = [];
for (let i = 0; i < argv.length; i++) {
const a = argv[i];
if (a === '--chat') opts.chat = argv[++i];
else if (a === '--gap') opts.gap = parseInt(argv[++i], 10);
else if (a === '--batch') opts.batch = parseInt(argv[++i], 10);
else if (a === '--target') opts.target = parseFloat(argv[++i]);
else if (a === '--rtt') opts.rtt = parseInt(argv[++i], 10);
else if (a === '--no-stream') opts.stream = false;
else if (a === '--no-words') opts.wordWrap = false;
else if (a === '--header') opts.header = argv[++i];
else if (a === '--cursor') opts.cursor = argv[++i];
else if (a === '-h' || a === '--help') { printHelp(); process.exit(0); }
else rest.push(a);
}
if (rest.length) opts.text = rest.join(' ');
return opts;
}
function printHelp() {
process.stderr.write(`tg-stream — hardened streaming Telegram CLI
Usage:
tg-stream [options] "text"
echo "text" | tg-stream [options]
Options:
--chat ID Target chat_id (default: ${DEFAULT_CHAT})
--gap MS Extra delay between edits in ms (default: 0)
--batch N Characters per edit (default: auto from --target)
--target SECS Target stream duration in seconds (default: 2)
--rtt MS Assumed per-edit RTT for auto-batch (default: 300)
--no-words Don't snap edits to word boundaries
--no-stream Send as a single message, no streaming
--header TEXT Static header line above the streamed body
--cursor CHAR Cursor glyph (default: ▌)
-h, --help Show this help
Env knobs:
TG_MAX_CONCURRENT (${MAX_CONCURRENT}) TG_FETCH_TIMEOUT_MS (${FETCH_TIMEOUT_MS})
TG_MAX_TOTAL_MS (${MAX_TOTAL_MS}) TG_MEM_CAP_MB (${MEM_CAP_MB})
TG_LOG_FILE (${LOG_FILE})
Exit codes:
0 ok 1 send failed
2 bad args 3 memory cap exceeded
4 back-pressure drop 5 total time budget exceeded
`);
}
// ---------------------------------------------------------------------------
// Promise.race fetch timeout (works around bun AbortController issues)
async function fetchJsonTimed(url, init, timeoutMs) {
let timer;
const timeoutP = new Promise((_, rej) => {
timer = setTimeout(() => rej(new Error(`fetch timeout after ${timeoutMs}ms`)), timeoutMs);
});
try {
const r = await Promise.race([fetch(url, init), timeoutP]);
return await r.json();
} finally {
clearTimeout(timer);
}
}
const TG = (token) => async (method, body) => {
const url = `https://api.telegram.org/bot${token}/${method}`;
const init = {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams(body),
};
let attempt = 0;
while (true) {
try {
const j = await fetchJsonTimed(url, init, FETCH_TIMEOUT_MS);
if (j.ok) return j;
if (j.parameters?.retry_after && attempt < MAX_429_RETRIES) {
const expBackoff = 2 ** attempt * 1000;
const serverHint = j.parameters.retry_after * 1000;
const jitter = Math.floor(Math.random() * 1000);
const wait = Math.max(expBackoff, serverHint) + jitter;
log('warn', '429 retry', { method, attempt, wait_ms: wait, retry_after: j.parameters.retry_after });
await new Promise(r => setTimeout(r, wait));
attempt++;
continue;
}
log('error', 'tg api non-ok', { method, code: j.error_code, desc: j.description });
return j;
} catch (e) {
if (attempt < MAX_429_RETRIES) {
const wait = 2 ** attempt * 1000 + Math.floor(Math.random() * 1000);
log('warn', 'fetch error, retrying', { method, attempt, wait_ms: wait, err: String(e?.message || e) });
await new Promise(r => setTimeout(r, wait));
attempt++;
continue;
}
log('error', 'fetch error, giving up', { method, err: String(e?.message || e) });
return { ok: false, description: String(e?.message || e) };
}
}
};
// ---------------------------------------------------------------------------
async function main() {
startMemWatch();
const opts = parseArgs(process.argv.slice(2));
if (!opts.text && !process.stdin.isTTY) opts.text = fs.readFileSync(0, 'utf8');
if (!opts.text) { printHelp(); process.exit(2); }
opts.text = opts.text.replace(/\s+$/, '');
const slot = acquireSlot();
if (!slot) {
log('warn', 'back-pressure drop', { reason: 'max concurrent', cap: MAX_CONCURRENT });
process.stderr.write(`tg-stream: dropped (>= ${MAX_CONCURRENT} concurrent)\n`);
process.exit(4);
}
const overall = setTimeout(() => {
log('error', 'total time budget exceeded', { ms: MAX_TOTAL_MS });
releaseSlot(slot);
process.exit(5);
}, MAX_TOTAL_MS);
overall.unref();
process.on('SIGINT', () => { releaseSlot(slot); process.exit(130); });
process.on('SIGTERM', () => { releaseSlot(slot); process.exit(143); });
// Telegram message size ceiling is 4096 chars. We leave headroom for the
// header + cursor glyph, so split at 3900 to be safe.
const MAX_CHUNK_CHARS = 3900;
// Split text into chunks of <= MAX_CHUNK_CHARS, preferring word boundaries.
function splitIntoChunks(text, limit) {
if (text.length <= limit) return [text];
const chunks = [];
let pos = 0;
while (pos < text.length) {
if (text.length - pos <= limit) { chunks.push(text.slice(pos)); break; }
let end = pos + limit;
// Walk back to the last whitespace within a reasonable window
const windowStart = Math.max(pos + Math.floor(limit * 0.7), end - 300);
let split = -1;
for (let j = end; j > windowStart; j--) {
if (/\s/.test(text[j - 1])) { split = j; break; }
}
if (split < 0) split = end; // hard split if no whitespace found
chunks.push(text.slice(pos, split));
pos = split;
// Skip leading whitespace of next chunk
while (pos < text.length && /\s/.test(text[pos])) pos++;
}
return chunks;
}
let exitCode = 0;
try {
const token = readToken();
const tg = TG(token);
const headerStr = opts.header ? opts.header + '\n' : '';
const chunks = splitIntoChunks(opts.text, MAX_CHUNK_CHARS - headerStr.length - 4);
if (chunks.length > 1) {
log('info', 'splitting long message', { total_chars: opts.text.length, chunks: chunks.length });
}
// Non-streaming path: send each chunk as its own plain message.
if (!opts.stream) {
const mids = [];
for (const chunk of chunks) {
const j = await tg('sendMessage', { chat_id: opts.chat, text: headerStr + chunk });
if (!j.ok) { console.error('send failed:', j.description); exitCode = 1; break; }
mids.push(j.result.message_id);
}
if (mids.length) console.log(mids.join(','));
} else {
// Streaming path: stream each chunk into its own bubble sequentially.
const mids = [];
for (let ci = 0; ci < chunks.length; ci++) {
const chunkText = chunks[ci];
const init = await tg('sendMessage', { chat_id: opts.chat, text: headerStr + opts.cursor });
if (!init.ok) { console.error('placeholder failed:', init.description); exitCode = 1; break; }
const mid = init.result.message_id;
mids.push(mid);
// Auto-batch sized from this chunk's length, not the whole text.
let batch = opts.batch;
if (!batch || batch < 1) {
const perEdit = opts.rtt + opts.gap;
batch = Math.max(1, Math.ceil(chunkText.length * perEdit / (opts.target * 1000)));
}
const breakpoints = [];
let pos = 0;
while (pos < chunkText.length) {
let next = pos + batch;
if (opts.wordWrap && next < chunkText.length) {
const window = Math.ceil(batch / 2);
let best = -1;
for (let j = Math.max(pos + 1, next - window); j <= Math.min(chunkText.length, next + window); j++) {
if (/\s/.test(chunkText[j - 1])) { best = j; break; }
}
if (best > 0) next = best;
}
if (next > chunkText.length) next = chunkText.length;
breakpoints.push(next);
pos = next;
}
const altCursor = opts.cursor === '▌' ? '▐' : opts.cursor;
const cursors = [opts.cursor, altCursor];
const results = [];
let lastSent = '';
for (let k = 0; k < breakpoints.length - 1; k++) {
const i = breakpoints[k];
const partial = chunkText.slice(0, i);
if (partial === lastSent) continue;
lastSent = partial;
const text = headerStr + partial + cursors[k % 2];
results.push(await tg('editMessageText', { chat_id: opts.chat, message_id: mid, text }));
if (opts.gap > 0) await new Promise(r => setTimeout(r, opts.gap));
}
// Final edit: full chunk text, no cursor
const finalRes = await tg('editMessageText', { chat_id: opts.chat, message_id: mid, text: headerStr + chunkText });
results.push(finalRes);
const fail = results.filter(r => !r.ok);
if (fail.length && !finalRes.ok) {
// Final edit failed — user doesn't see the full text. Escalate.
console.error(`stream chunk ${ci+1}/${chunks.length} final edit failed: ${finalRes.description}`);
exitCode = 1;
} else if (fail.length) {
console.error(`stream chunk ${ci+1}/${chunks.length}: ${results.length - fail.length}/${results.length} edits ok (final ok)`);
}
}
if (mids.length) console.log(mids.join(','));
}
} catch (e) {
log('error', 'main exception', { err: String(e?.message || e) });
console.error(e?.message || e);
exitCode = 1;
} finally {
clearTimeout(overall);
releaseSlot(slot);
if (memWatchTimer) clearInterval(memWatchTimer);
// MUST exit from finally so early returns from try still honor exitCode.
// Otherwise `return` inside try jumps straight past the post-finally line.
process.exit(exitCode);
}
}
main().catch(e => { log('error', 'unhandled', { err: String(e?.message || e) }); process.exit(1); });