379 lines
14 KiB
Plaintext
Executable File
379 lines
14 KiB
Plaintext
Executable File
#!/usr/bin/env bun
|
|
// tg-stream — stream text to Telegram via direct Bot API.
|
|
//
|
|
// Single-file hardened implementation. Provides:
|
|
// - global concurrency cap via O_EXCL atomic slot files (race-free, no flock)
|
|
// - per-fetch timeout via Promise.race (works around bun AbortController bugs)
|
|
// - exponential 429 retry honoring server retry_after + jitter
|
|
// - total wall-time budget via setTimeout self-kill
|
|
// - RSS memory cap via /proc/self/status watchdog
|
|
// - structured JSON failure log, rotated by line count
|
|
//
|
|
// Usage:
|
|
// tg-stream "your text"
|
|
// tg-stream --header "🔧 working" "body"
|
|
// tg-stream --no-stream "short ack"
|
|
// echo "from stdin" | tg-stream
|
|
// tg-stream --target 3 "longer answer..."
|
|
//
|
|
// Token: read from /root/.claude/channels/telegram/.env
|
|
// Default chat: TG_DEFAULT_CHAT env or 107806725 (Rooh)
|
|
//
|
|
// Env knobs (with defaults):
|
|
// TG_MAX_CONCURRENT 10 max parallel invocations
|
|
// TG_FETCH_TIMEOUT_MS 10000 per HTTP request
|
|
// TG_MAX_TOTAL_MS 60000 total invocation budget
|
|
// TG_MEM_CAP_MB 256 RSS cap before self-abort
|
|
// TG_DEFAULT_CHAT 107806725
|
|
|
|
import fs from 'node:fs';
|
|
import path from 'node:path';
|
|
|
|
const ENV_FILE = '/root/.claude/channels/telegram/.env';
|
|
const DEFAULT_CHAT = process.env.TG_DEFAULT_CHAT || '107806725';
|
|
const MAX_CONCURRENT = parseInt(process.env.TG_MAX_CONCURRENT || '10', 10);
|
|
const FETCH_TIMEOUT_MS = parseInt(process.env.TG_FETCH_TIMEOUT_MS || '10000', 10);
|
|
const MAX_TOTAL_MS = parseInt(process.env.TG_MAX_TOTAL_MS || '60000', 10);
|
|
const MEM_CAP_MB = parseInt(process.env.TG_MEM_CAP_MB || '256', 10);
|
|
const MAX_429_RETRIES = 3;
|
|
const SLOT_DIR = '/tmp/tg-stream-slots';
|
|
const LOG_FILE = process.env.TG_LOG_FILE || '/host/root/.caret/log/tg-stream.log';
|
|
const LOG_KEEP_LINES = 1000;
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Structured JSON log with cheap line-count rotation
|
|
function log(level, msg, extra = {}) {
|
|
const line = JSON.stringify({
|
|
ts: new Date().toISOString(), pid: process.pid, level, msg, ...extra,
|
|
}) + '\n';
|
|
try {
|
|
try { fs.mkdirSync(path.dirname(LOG_FILE), { recursive: true }); } catch (_) {}
|
|
fs.appendFileSync(LOG_FILE, line);
|
|
if (Math.random() < 0.02) {
|
|
const all = fs.readFileSync(LOG_FILE, 'utf8').split('\n');
|
|
if (all.length > LOG_KEEP_LINES) {
|
|
fs.writeFileSync(LOG_FILE, all.slice(-LOG_KEEP_LINES).join('\n'));
|
|
}
|
|
}
|
|
} catch (_) { /* log dir not writable — degrade silently */ }
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Concurrency cap via O_EXCL atomic slot files (race-free, no flock).
|
|
function acquireSlot() {
|
|
try { fs.mkdirSync(SLOT_DIR, { recursive: true }); } catch (_) {}
|
|
const now = Date.now();
|
|
for (const name of fs.readdirSync(SLOT_DIR)) {
|
|
const full = path.join(SLOT_DIR, name);
|
|
try {
|
|
const stat = fs.statSync(full);
|
|
if (now - stat.mtimeMs > MAX_TOTAL_MS) fs.unlinkSync(full);
|
|
} catch (_) {}
|
|
}
|
|
for (let i = 0; i < MAX_CONCURRENT; i++) {
|
|
const slot = path.join(SLOT_DIR, `slot.${i}`);
|
|
try {
|
|
const fd = fs.openSync(slot, fs.constants.O_CREAT | fs.constants.O_EXCL | fs.constants.O_WRONLY);
|
|
fs.writeSync(fd, String(process.pid));
|
|
fs.closeSync(fd);
|
|
return slot;
|
|
} catch (e) {
|
|
if (e.code !== 'EEXIST') throw e;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
function releaseSlot(slotPath) {
|
|
if (!slotPath) return;
|
|
try { fs.unlinkSync(slotPath); } catch (_) {}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Memory watchdog — self-abort if RSS exceeds the cap
|
|
let memWatchTimer = null;
|
|
function startMemWatch() {
|
|
if (memWatchTimer) return;
|
|
memWatchTimer = setInterval(() => {
|
|
try {
|
|
const status = fs.readFileSync('/proc/self/status', 'utf8');
|
|
const m = status.match(/VmRSS:\s+(\d+) kB/);
|
|
if (m) {
|
|
const mb = parseInt(m[1], 10) / 1024;
|
|
if (mb > MEM_CAP_MB) {
|
|
log('error', 'memory cap exceeded, aborting', { vmrss_mb: mb, cap_mb: MEM_CAP_MB });
|
|
process.exit(3);
|
|
}
|
|
}
|
|
} catch (_) {}
|
|
}, 1000);
|
|
memWatchTimer.unref();
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
function readToken() {
|
|
const m = fs.readFileSync(ENV_FILE, 'utf8').match(/TELEGRAM_BOT_TOKEN=(.+)/);
|
|
if (!m) throw new Error(`TELEGRAM_BOT_TOKEN not found in ${ENV_FILE}`);
|
|
return m[1].trim();
|
|
}
|
|
|
|
function parseArgs(argv) {
|
|
const opts = {
|
|
chat: DEFAULT_CHAT, gap: 0, batch: 0, target: 2, rtt: 300,
|
|
stream: true, header: '', cursor: '▌', wordWrap: true, text: null,
|
|
};
|
|
const rest = [];
|
|
for (let i = 0; i < argv.length; i++) {
|
|
const a = argv[i];
|
|
if (a === '--chat') opts.chat = argv[++i];
|
|
else if (a === '--gap') opts.gap = parseInt(argv[++i], 10);
|
|
else if (a === '--batch') opts.batch = parseInt(argv[++i], 10);
|
|
else if (a === '--target') opts.target = parseFloat(argv[++i]);
|
|
else if (a === '--rtt') opts.rtt = parseInt(argv[++i], 10);
|
|
else if (a === '--no-stream') opts.stream = false;
|
|
else if (a === '--no-words') opts.wordWrap = false;
|
|
else if (a === '--header') opts.header = argv[++i];
|
|
else if (a === '--cursor') opts.cursor = argv[++i];
|
|
else if (a === '-h' || a === '--help') { printHelp(); process.exit(0); }
|
|
else rest.push(a);
|
|
}
|
|
if (rest.length) opts.text = rest.join(' ');
|
|
return opts;
|
|
}
|
|
|
|
function printHelp() {
|
|
process.stderr.write(`tg-stream — hardened streaming Telegram CLI
|
|
|
|
Usage:
|
|
tg-stream [options] "text"
|
|
echo "text" | tg-stream [options]
|
|
|
|
Options:
|
|
--chat ID Target chat_id (default: ${DEFAULT_CHAT})
|
|
--gap MS Extra delay between edits in ms (default: 0)
|
|
--batch N Characters per edit (default: auto from --target)
|
|
--target SECS Target stream duration in seconds (default: 2)
|
|
--rtt MS Assumed per-edit RTT for auto-batch (default: 300)
|
|
--no-words Don't snap edits to word boundaries
|
|
--no-stream Send as a single message, no streaming
|
|
--header TEXT Static header line above the streamed body
|
|
--cursor CHAR Cursor glyph (default: ▌)
|
|
-h, --help Show this help
|
|
|
|
Env knobs:
|
|
TG_MAX_CONCURRENT (${MAX_CONCURRENT}) TG_FETCH_TIMEOUT_MS (${FETCH_TIMEOUT_MS})
|
|
TG_MAX_TOTAL_MS (${MAX_TOTAL_MS}) TG_MEM_CAP_MB (${MEM_CAP_MB})
|
|
TG_LOG_FILE (${LOG_FILE})
|
|
|
|
Exit codes:
|
|
0 ok 1 send failed
|
|
2 bad args 3 memory cap exceeded
|
|
4 back-pressure drop 5 total time budget exceeded
|
|
`);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Promise.race fetch timeout (works around bun AbortController issues)
|
|
async function fetchJsonTimed(url, init, timeoutMs) {
|
|
let timer;
|
|
const timeoutP = new Promise((_, rej) => {
|
|
timer = setTimeout(() => rej(new Error(`fetch timeout after ${timeoutMs}ms`)), timeoutMs);
|
|
});
|
|
try {
|
|
const r = await Promise.race([fetch(url, init), timeoutP]);
|
|
return await r.json();
|
|
} finally {
|
|
clearTimeout(timer);
|
|
}
|
|
}
|
|
|
|
const TG = (token) => async (method, body) => {
|
|
const url = `https://api.telegram.org/bot${token}/${method}`;
|
|
const init = {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
|
|
body: new URLSearchParams(body),
|
|
};
|
|
let attempt = 0;
|
|
while (true) {
|
|
try {
|
|
const j = await fetchJsonTimed(url, init, FETCH_TIMEOUT_MS);
|
|
if (j.ok) return j;
|
|
if (j.parameters?.retry_after && attempt < MAX_429_RETRIES) {
|
|
const expBackoff = 2 ** attempt * 1000;
|
|
const serverHint = j.parameters.retry_after * 1000;
|
|
const jitter = Math.floor(Math.random() * 1000);
|
|
const wait = Math.max(expBackoff, serverHint) + jitter;
|
|
log('warn', '429 retry', { method, attempt, wait_ms: wait, retry_after: j.parameters.retry_after });
|
|
await new Promise(r => setTimeout(r, wait));
|
|
attempt++;
|
|
continue;
|
|
}
|
|
log('error', 'tg api non-ok', { method, code: j.error_code, desc: j.description });
|
|
return j;
|
|
} catch (e) {
|
|
if (attempt < MAX_429_RETRIES) {
|
|
const wait = 2 ** attempt * 1000 + Math.floor(Math.random() * 1000);
|
|
log('warn', 'fetch error, retrying', { method, attempt, wait_ms: wait, err: String(e?.message || e) });
|
|
await new Promise(r => setTimeout(r, wait));
|
|
attempt++;
|
|
continue;
|
|
}
|
|
log('error', 'fetch error, giving up', { method, err: String(e?.message || e) });
|
|
return { ok: false, description: String(e?.message || e) };
|
|
}
|
|
}
|
|
};
|
|
|
|
// ---------------------------------------------------------------------------
|
|
async function main() {
|
|
startMemWatch();
|
|
|
|
const opts = parseArgs(process.argv.slice(2));
|
|
if (!opts.text && !process.stdin.isTTY) opts.text = fs.readFileSync(0, 'utf8');
|
|
if (!opts.text) { printHelp(); process.exit(2); }
|
|
opts.text = opts.text.replace(/\s+$/, '');
|
|
|
|
const slot = acquireSlot();
|
|
if (!slot) {
|
|
log('warn', 'back-pressure drop', { reason: 'max concurrent', cap: MAX_CONCURRENT });
|
|
process.stderr.write(`tg-stream: dropped (>= ${MAX_CONCURRENT} concurrent)\n`);
|
|
process.exit(4);
|
|
}
|
|
|
|
const overall = setTimeout(() => {
|
|
log('error', 'total time budget exceeded', { ms: MAX_TOTAL_MS });
|
|
releaseSlot(slot);
|
|
process.exit(5);
|
|
}, MAX_TOTAL_MS);
|
|
overall.unref();
|
|
|
|
process.on('SIGINT', () => { releaseSlot(slot); process.exit(130); });
|
|
process.on('SIGTERM', () => { releaseSlot(slot); process.exit(143); });
|
|
|
|
// Telegram message size ceiling is 4096 chars. We leave headroom for the
|
|
// header + cursor glyph, so split at 3900 to be safe.
|
|
const MAX_CHUNK_CHARS = 3900;
|
|
|
|
// Split text into chunks of <= MAX_CHUNK_CHARS, preferring word boundaries.
|
|
function splitIntoChunks(text, limit) {
|
|
if (text.length <= limit) return [text];
|
|
const chunks = [];
|
|
let pos = 0;
|
|
while (pos < text.length) {
|
|
if (text.length - pos <= limit) { chunks.push(text.slice(pos)); break; }
|
|
let end = pos + limit;
|
|
// Walk back to the last whitespace within a reasonable window
|
|
const windowStart = Math.max(pos + Math.floor(limit * 0.7), end - 300);
|
|
let split = -1;
|
|
for (let j = end; j > windowStart; j--) {
|
|
if (/\s/.test(text[j - 1])) { split = j; break; }
|
|
}
|
|
if (split < 0) split = end; // hard split if no whitespace found
|
|
chunks.push(text.slice(pos, split));
|
|
pos = split;
|
|
// Skip leading whitespace of next chunk
|
|
while (pos < text.length && /\s/.test(text[pos])) pos++;
|
|
}
|
|
return chunks;
|
|
}
|
|
|
|
let exitCode = 0;
|
|
try {
|
|
const token = readToken();
|
|
const tg = TG(token);
|
|
const headerStr = opts.header ? opts.header + '\n' : '';
|
|
const chunks = splitIntoChunks(opts.text, MAX_CHUNK_CHARS - headerStr.length - 4);
|
|
if (chunks.length > 1) {
|
|
log('info', 'splitting long message', { total_chars: opts.text.length, chunks: chunks.length });
|
|
}
|
|
|
|
// Non-streaming path: send each chunk as its own plain message.
|
|
if (!opts.stream) {
|
|
const mids = [];
|
|
for (const chunk of chunks) {
|
|
const j = await tg('sendMessage', { chat_id: opts.chat, text: headerStr + chunk });
|
|
if (!j.ok) { console.error('send failed:', j.description); exitCode = 1; break; }
|
|
mids.push(j.result.message_id);
|
|
}
|
|
if (mids.length) console.log(mids.join(','));
|
|
} else {
|
|
// Streaming path: stream each chunk into its own bubble sequentially.
|
|
const mids = [];
|
|
for (let ci = 0; ci < chunks.length; ci++) {
|
|
const chunkText = chunks[ci];
|
|
|
|
const init = await tg('sendMessage', { chat_id: opts.chat, text: headerStr + opts.cursor });
|
|
if (!init.ok) { console.error('placeholder failed:', init.description); exitCode = 1; break; }
|
|
const mid = init.result.message_id;
|
|
mids.push(mid);
|
|
|
|
// Auto-batch sized from this chunk's length, not the whole text.
|
|
let batch = opts.batch;
|
|
if (!batch || batch < 1) {
|
|
const perEdit = opts.rtt + opts.gap;
|
|
batch = Math.max(1, Math.ceil(chunkText.length * perEdit / (opts.target * 1000)));
|
|
}
|
|
|
|
const breakpoints = [];
|
|
let pos = 0;
|
|
while (pos < chunkText.length) {
|
|
let next = pos + batch;
|
|
if (opts.wordWrap && next < chunkText.length) {
|
|
const window = Math.ceil(batch / 2);
|
|
let best = -1;
|
|
for (let j = Math.max(pos + 1, next - window); j <= Math.min(chunkText.length, next + window); j++) {
|
|
if (/\s/.test(chunkText[j - 1])) { best = j; break; }
|
|
}
|
|
if (best > 0) next = best;
|
|
}
|
|
if (next > chunkText.length) next = chunkText.length;
|
|
breakpoints.push(next);
|
|
pos = next;
|
|
}
|
|
|
|
const altCursor = opts.cursor === '▌' ? '▐' : opts.cursor;
|
|
const cursors = [opts.cursor, altCursor];
|
|
const results = [];
|
|
let lastSent = '';
|
|
|
|
for (let k = 0; k < breakpoints.length - 1; k++) {
|
|
const i = breakpoints[k];
|
|
const partial = chunkText.slice(0, i);
|
|
if (partial === lastSent) continue;
|
|
lastSent = partial;
|
|
const text = headerStr + partial + cursors[k % 2];
|
|
results.push(await tg('editMessageText', { chat_id: opts.chat, message_id: mid, text }));
|
|
if (opts.gap > 0) await new Promise(r => setTimeout(r, opts.gap));
|
|
}
|
|
// Final edit: full chunk text, no cursor
|
|
const finalRes = await tg('editMessageText', { chat_id: opts.chat, message_id: mid, text: headerStr + chunkText });
|
|
results.push(finalRes);
|
|
|
|
const fail = results.filter(r => !r.ok);
|
|
if (fail.length && !finalRes.ok) {
|
|
// Final edit failed — user doesn't see the full text. Escalate.
|
|
console.error(`stream chunk ${ci+1}/${chunks.length} final edit failed: ${finalRes.description}`);
|
|
exitCode = 1;
|
|
} else if (fail.length) {
|
|
console.error(`stream chunk ${ci+1}/${chunks.length}: ${results.length - fail.length}/${results.length} edits ok (final ok)`);
|
|
}
|
|
}
|
|
if (mids.length) console.log(mids.join(','));
|
|
}
|
|
} catch (e) {
|
|
log('error', 'main exception', { err: String(e?.message || e) });
|
|
console.error(e?.message || e);
|
|
exitCode = 1;
|
|
} finally {
|
|
clearTimeout(overall);
|
|
releaseSlot(slot);
|
|
if (memWatchTimer) clearInterval(memWatchTimer);
|
|
// MUST exit from finally so early returns from try still honor exitCode.
|
|
// Otherwise `return` inside try jumps straight past the post-finally line.
|
|
process.exit(exitCode);
|
|
}
|
|
}
|
|
|
|
main().catch(e => { log('error', 'unhandled', { err: String(e?.message || e) }); process.exit(1); });
|