feat: Phase 0+1 — repo sync, pino, lint fixes, core components

Phase 0:
- Synced latest live-status.js from workspace (9928 bytes)
- Fixed 43 lint issues: empty catch blocks, console statements
- Added pino dependency
- Created src/tool-labels.json with all known tool mappings
- make check passes

Phase 1 (Core Components):
- src/config.js: env-var config with validation, throws on missing required vars
- src/logger.js: pino singleton with child loggers, level validation
- src/circuit-breaker.js: CLOSED/OPEN/HALF_OPEN state machine with callbacks
- src/tool-labels.js: exact/prefix/regex tool->label resolver with external override
- src/status-box.js: Mattermost post manager (keepAlive, throttle, retry, circuit breaker)
- src/status-formatter.js: pure SessionState->text formatter (nested, compact)
- src/health.js: HTTP health endpoint + metrics
- src/status-watcher.js: JSONL file watcher (inotify, compaction detection, idle detection)

Tests:
- test/unit/config.test.js: 7 tests
- test/unit/circuit-breaker.test.js: 12 tests
- test/unit/logger.test.js: 5 tests
- test/unit/status-formatter.test.js: 20 tests
- test/unit/tool-labels.test.js: 15 tests

All 59 unit tests pass. make check clean.
This commit is contained in:
sol
2026-03-07 17:26:53 +00:00
parent b3ec2c61db
commit 43cfebee96
21 changed files with 2691 additions and 287 deletions

143
src/circuit-breaker.js Normal file
View File

@@ -0,0 +1,143 @@
'use strict';
/**
* circuit-breaker.js — Circuit breaker for API resilience.
*
* States:
* CLOSED — Normal operation. Failures tracked.
* OPEN — Too many failures. Calls rejected immediately.
* HALF_OPEN — Cooldown expired. One probe call allowed.
*
* Transition rules:
* CLOSED -> OPEN: failures >= threshold
* OPEN -> HALF_OPEN: cooldown expired
* HALF_OPEN -> CLOSED: probe succeeds
* HALF_OPEN -> OPEN: probe fails
*/
const STATE = {
CLOSED: 'closed',
OPEN: 'open',
HALF_OPEN: 'half_open',
};
class CircuitBreaker {
/**
* @param {object} opts
* @param {number} opts.threshold - Consecutive failures to open (default 5)
* @param {number} opts.cooldownMs - Milliseconds before half-open probe (default 30000)
* @param {Function} [opts.onStateChange] - Called with (newState, oldState)
* @param {object} [opts.logger] - Optional logger
*/
constructor(opts = {}) {
this.threshold = opts.threshold || 5;
this.cooldownMs = opts.cooldownMs || 30000;
this.onStateChange = opts.onStateChange || null;
this.logger = opts.logger || null;
this.state = STATE.CLOSED;
this.failures = 0;
this.openedAt = null;
this.lastError = null;
}
/**
* Execute a function through the circuit breaker.
* Throws CircuitOpenError if the circuit is open.
* @param {Function} fn - Async function to execute
* @returns {Promise<*>}
*/
async execute(fn) {
if (this.state === STATE.OPEN) {
const elapsed = Date.now() - this.openedAt;
if (elapsed >= this.cooldownMs) {
this._transition(STATE.HALF_OPEN);
} else {
throw new CircuitOpenError(
`Circuit open (${Math.ceil((this.cooldownMs - elapsed) / 1000)}s remaining)`,
this.lastError,
);
}
}
try {
const result = await fn();
this._onSuccess();
return result;
} catch (err) {
this._onFailure(err);
throw err;
}
}
_onSuccess() {
if (this.state === STATE.HALF_OPEN) {
this._transition(STATE.CLOSED);
}
this.failures = 0;
this.lastError = null;
}
_onFailure(err) {
this.lastError = err;
this.failures++;
if (this.state === STATE.HALF_OPEN) {
// Probe failed — reopen
this.openedAt = Date.now();
this._transition(STATE.OPEN);
} else if (this.state === STATE.CLOSED && this.failures >= this.threshold) {
this.openedAt = Date.now();
this._transition(STATE.OPEN);
}
}
_transition(newState) {
const oldState = this.state;
this.state = newState;
if (newState === STATE.CLOSED) {
this.failures = 0;
this.openedAt = null;
}
if (this.logger) {
this.logger.warn({ from: oldState, to: newState }, 'Circuit breaker state change');
}
if (this.onStateChange) {
this.onStateChange(newState, oldState);
}
}
getState() {
return this.state;
}
getMetrics() {
return {
state: this.state,
failures: this.failures,
threshold: this.threshold,
openedAt: this.openedAt,
lastError: this.lastError ? this.lastError.message : null,
};
}
reset() {
this.state = STATE.CLOSED;
this.failures = 0;
this.openedAt = null;
this.lastError = null;
}
}
class CircuitOpenError extends Error {
constructor(message, cause) {
super(message);
this.name = 'CircuitOpenError';
this.cause = cause;
}
}
module.exports = { CircuitBreaker, CircuitOpenError, STATE };

112
src/config.js Normal file
View File

@@ -0,0 +1,112 @@
'use strict';
/**
* config.js — Centralized env-var config with validation.
* All config is read from environment variables.
* Throws on missing required variables at startup.
*/
function getEnv(name, defaultValue, required = false) {
const val = process.env[name];
if (val === undefined || val === '') {
if (required) {
throw new Error(`Required environment variable ${name} is not set`);
}
return defaultValue;
}
return val;
}
function getEnvInt(name, defaultValue, required = false) {
const val = getEnv(name, undefined, required);
if (val === undefined) return defaultValue;
const n = parseInt(val, 10);
if (isNaN(n)) throw new Error(`Environment variable ${name} must be an integer, got: ${val}`);
return n;
}
function getEnvBool(name, defaultValue) {
const val = process.env[name];
if (val === undefined || val === '') return defaultValue;
return val === '1' || val.toLowerCase() === 'true' || val.toLowerCase() === 'yes';
}
/**
* Build and validate the config object.
* Called once at startup; throws on invalid config.
*/
function buildConfig() {
const config = {
// Mattermost API
mm: {
token: getEnv('MM_BOT_TOKEN', null, true),
baseUrl: getEnv('MM_BASE_URL', 'https://slack.solio.tech'),
maxSockets: getEnvInt('MM_MAX_SOCKETS', 4),
},
// Transcript directory (OpenClaw agents)
transcriptDir: getEnv('TRANSCRIPT_DIR', '/home/node/.openclaw/agents'),
// Timing
throttleMs: getEnvInt('THROTTLE_MS', 500),
idleTimeoutS: getEnvInt('IDLE_TIMEOUT_S', 60),
sessionPollMs: getEnvInt('SESSION_POLL_MS', 2000),
// Limits
maxActiveSessions: getEnvInt('MAX_ACTIVE_SESSIONS', 20),
maxMessageChars: getEnvInt('MAX_MESSAGE_CHARS', 15000),
maxStatusLines: getEnvInt('MAX_STATUS_LINES', 20),
maxRetries: getEnvInt('MAX_RETRIES', 3),
// Circuit breaker
circuitBreakerThreshold: getEnvInt('CIRCUIT_BREAKER_THRESHOLD', 5),
circuitBreakerCooldownS: getEnvInt('CIRCUIT_BREAKER_COOLDOWN_S', 30),
// Health check
healthPort: getEnvInt('HEALTH_PORT', 9090),
// Logging
logLevel: getEnv('LOG_LEVEL', 'info'),
// PID file
pidFile: getEnv('PID_FILE', '/tmp/status-watcher.pid'),
// Offset persistence
offsetFile: getEnv('OFFSET_FILE', '/tmp/status-watcher-offsets.json'),
// Optional external tool labels override
toolLabelsFile: getEnv('TOOL_LABELS_FILE', null),
// Fallback channel for non-MM sessions (null = skip)
defaultChannel: getEnv('DEFAULT_CHANNEL', null),
// Feature flags
enableFsWatch: getEnvBool('ENABLE_FS_WATCH', true),
};
// Validate MM base URL
try {
new URL(config.mm.baseUrl);
} catch (_e) {
throw new Error(`MM_BASE_URL is not a valid URL: ${config.mm.baseUrl}`);
}
return config;
}
// Singleton — built once, exported
let _config = null;
function getConfig() {
if (!_config) {
_config = buildConfig();
}
return _config;
}
// Allow resetting config in tests
function resetConfig() {
_config = null;
}
module.exports = { getConfig, resetConfig, buildConfig };

118
src/health.js Normal file
View File

@@ -0,0 +1,118 @@
'use strict';
/**
* health.js — HTTP health endpoint + metrics.
*
* GET /health -> JSON { status, activeSessions, uptime, lastError, metrics }
* GET /metrics -> JSON { detailed metrics }
*/
/* eslint-disable no-console */
const http = require('http');
class HealthServer {
/**
* @param {object} opts
* @param {number} opts.port - Port to listen on (0 = disabled)
* @param {Function} opts.getMetrics - Callback that returns metrics object
* @param {object} [opts.logger] - pino logger
*/
constructor(opts) {
this.port = opts.port;
this.getMetrics = opts.getMetrics;
this.logger = opts.logger || null;
this.server = null;
this.startTime = Date.now();
}
start() {
if (this.port === 0) {
if (this.logger) this.logger.info('Health server disabled (port=0)');
return Promise.resolve();
}
return new Promise((resolve, reject) => {
this.server = http.createServer((req, res) => {
this._handleRequest(req, res);
});
this.server.on('error', (err) => {
if (this.logger) {
this.logger.error({ err }, 'Health server error');
} else {
console.error('Health server error:', err.message);
}
reject(err);
});
this.server.listen(this.port, '127.0.0.1', () => {
if (this.logger) {
this.logger.info({ port: this.port }, 'Health server listening');
}
resolve();
});
});
}
stop() {
return new Promise((resolve) => {
if (!this.server) {
resolve();
return;
}
this.server.close(() => resolve());
});
}
_handleRequest(req, res) {
const url = new URL(req.url, `http://localhost:${this.port}`);
if (req.method !== 'GET') {
res.writeHead(405, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Method not allowed' }));
return;
}
let body;
switch (url.pathname) {
case '/health':
body = this._buildHealthResponse();
break;
case '/metrics':
body = this._buildMetricsResponse();
break;
default:
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Not found' }));
return;
}
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(body, null, 2));
}
_buildHealthResponse() {
const metrics = this.getMetrics();
const status = metrics.circuit && metrics.circuit.state === 'open' ? 'degraded' : 'healthy';
return {
status,
uptime: Math.floor((Date.now() - this.startTime) / 1000),
activeSessions: metrics.activeSessions || 0,
lastError: metrics.lastError || null,
metrics: {
updates_sent: metrics.updatesSent || 0,
updates_failed: metrics.updatesFailed || 0,
circuit_state: metrics.circuit ? metrics.circuit.state : 'unknown',
queue_depth: metrics.queueDepth || 0,
},
};
}
_buildMetricsResponse() {
return this.getMetrics();
}
}
module.exports = { HealthServer };

View File

@@ -1,5 +1,7 @@
#!/usr/bin/env node
/* eslint-disable no-console */
const https = require('https');
const http = require('http');
const fs = require('fs');
@@ -12,272 +14,315 @@ const options = {};
const otherArgs = [];
for (let i = 0; i < args.length; i++) {
const arg = args[i];
const next = args[i + 1];
if (arg === '--channel' && next) { options.channel = next; i++; }
else if (arg === '--reply-to' && next) { options.replyTo = next; i++; }
else if (arg === '--agent' && next) { options.agent = next; i++; }
else if (arg === '--token' && next) { options.token = next; i++; }
else if (arg === '--host' && next) { options.host = next; i++; }
else if (arg === '--rich') { options.rich = true; }
else if (!command && ['create', 'update', 'complete', 'error', 'delete'].includes(arg)) {
command = arg;
} else {
otherArgs.push(arg);
}
const arg = args[i]; // eslint-disable-line security/detect-object-injection
const next = args[i + 1]; // eslint-disable-line security/detect-object-injection
if (arg === '--channel' && next) {
options.channel = next;
i++;
} else if (arg === '--reply-to' && next) {
options.replyTo = next;
i++;
} else if (arg === '--agent' && next) {
options.agent = next;
i++;
} else if (arg === '--token' && next) {
options.token = next;
i++;
} else if (arg === '--host' && next) {
options.host = next;
i++;
} else if (arg === '--rich') {
options.rich = true;
} else if (!command && ['create', 'update', 'complete', 'error', 'delete'].includes(arg)) {
command = arg;
} else {
otherArgs.push(arg);
}
}
// --- LOAD CONFIG ---
function loadConfig() {
const searchPaths = [
process.env.OPENCLAW_CONFIG_DIR && path.join(process.env.OPENCLAW_CONFIG_DIR, 'openclaw.json'),
process.env.XDG_CONFIG_HOME && path.join(process.env.XDG_CONFIG_HOME, 'openclaw.json'),
path.join(process.env.HOME || '/root', '.openclaw', 'openclaw.json'),
'/home/node/.openclaw/openclaw.json'
].filter(Boolean);
const searchPaths = [
process.env.OPENCLAW_CONFIG_DIR && path.join(process.env.OPENCLAW_CONFIG_DIR, 'openclaw.json'),
process.env.XDG_CONFIG_HOME && path.join(process.env.XDG_CONFIG_HOME, 'openclaw.json'),
path.join(process.env.HOME || '/root', '.openclaw', 'openclaw.json'),
'/home/node/.openclaw/openclaw.json',
].filter(Boolean);
for (const p of searchPaths) {
try { return JSON.parse(fs.readFileSync(p, 'utf8')); }
catch (_) {}
for (const p of searchPaths) {
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
return JSON.parse(fs.readFileSync(p, 'utf8'));
} catch (_e) {
/* file not found or invalid JSON — try next path */
}
return null;
}
return null;
}
function resolveToken(config) {
if (options.token) return options.token;
if (process.env.MM_BOT_TOKEN) return process.env.MM_BOT_TOKEN;
if (!config) return null;
if (options.token) return options.token;
if (process.env.MM_BOT_TOKEN) return process.env.MM_BOT_TOKEN;
if (!config) return null;
const mm = config.mattermost || (config.channels && config.channels.mattermost) || {};
const accounts = mm.accounts || {};
const mm = config.mattermost || (config.channels && config.channels.mattermost) || {};
const accounts = mm.accounts || {};
if (options.agent) {
try {
const mapPath = path.join(__dirname, 'agent-accounts.json');
const agentMap = JSON.parse(fs.readFileSync(mapPath, 'utf8'));
const accName = agentMap[options.agent];
if (accName && accounts[accName] && accounts[accName].botToken) {
return accounts[accName].botToken;
}
} catch (_) {}
if (options.agent) {
try {
const mapPath = path.join(__dirname, 'agent-accounts.json');
const agentMap = JSON.parse(fs.readFileSync(mapPath, 'utf8'));
// eslint-disable-next-line security/detect-object-injection
const accName = agentMap[options.agent];
// eslint-disable-next-line security/detect-object-injection
if (accName && accounts[accName] && accounts[accName].botToken) {
// eslint-disable-next-line security/detect-object-injection
return accounts[accName].botToken;
}
} catch (_e) {
/* agent-accounts.json not found or agent not mapped */
}
}
if (accounts.default && accounts.default.botToken) return accounts.default.botToken;
for (const acc of Object.values(accounts)) {
if (acc.botToken) return acc.botToken;
}
return null;
if (accounts.default && accounts.default.botToken) return accounts.default.botToken;
for (const acc of Object.values(accounts)) {
if (acc.botToken) return acc.botToken;
}
return null;
}
function resolveHost(config) {
if (options.host) return options.host;
if (process.env.MM_HOST) return process.env.MM_HOST;
if (config) {
const mm = config.mattermost || (config.channels && config.channels.mattermost) || {};
const baseUrl = mm.baseUrl || '';
if (baseUrl) { try { return new URL(baseUrl).hostname; } catch (_) {} }
if (options.host) return options.host;
if (process.env.MM_HOST) return process.env.MM_HOST;
if (config) {
const mm = config.mattermost || (config.channels && config.channels.mattermost) || {};
const baseUrl = mm.baseUrl || '';
if (baseUrl) {
try {
return new URL(baseUrl).hostname;
} catch (_e) {
/* invalid URL */
}
}
return 'localhost';
}
return 'localhost';
}
function resolvePort(config) {
if (process.env.MM_PORT) return parseInt(process.env.MM_PORT, 10);
if (config) {
const mm = config.mattermost || (config.channels && config.channels.mattermost) || {};
const baseUrl = mm.baseUrl || '';
if (baseUrl) {
try {
const url = new URL(baseUrl);
return url.port ? parseInt(url.port, 10) : (url.protocol === 'https:' ? 443 : 80);
} catch (_) {}
}
if (process.env.MM_PORT) return parseInt(process.env.MM_PORT, 10);
if (config) {
const mm = config.mattermost || (config.channels && config.channels.mattermost) || {};
const baseUrl = mm.baseUrl || '';
if (baseUrl) {
try {
const url = new URL(baseUrl);
return url.port ? parseInt(url.port, 10) : url.protocol === 'https:' ? 443 : 80;
} catch (_e) {
/* invalid URL — use default port */
}
}
return 443;
}
return 443;
}
function resolveProtocol(config) {
if (config) {
const mm = config.mattermost || (config.channels && config.channels.mattermost) || {};
if ((mm.baseUrl || '').startsWith('http://')) return 'http';
}
return 'https';
if (config) {
const mm = config.mattermost || (config.channels && config.channels.mattermost) || {};
if ((mm.baseUrl || '').startsWith('http://')) return 'http';
}
return 'https';
}
// --- BUILD CONFIG ---
const ocConfig = loadConfig();
const CONFIG = {
host: resolveHost(ocConfig),
port: resolvePort(ocConfig),
protocol: resolveProtocol(ocConfig),
token: resolveToken(ocConfig),
channel_id: options.channel || process.env.MM_CHANNEL_ID || process.env.CHANNEL_ID
host: resolveHost(ocConfig),
port: resolvePort(ocConfig),
protocol: resolveProtocol(ocConfig),
token: resolveToken(ocConfig),
channel_id: options.channel || process.env.MM_CHANNEL_ID || process.env.CHANNEL_ID,
};
if (!CONFIG.token) {
console.error('Error: No bot token found.');
console.error(' Set MM_BOT_TOKEN, use --token, or configure openclaw.json');
process.exit(1);
console.error('Error: No bot token found.');
console.error(' Set MM_BOT_TOKEN, use --token, or configure openclaw.json');
process.exit(1);
}
// --- HTTP REQUEST ---
function request(method, apiPath, data) {
const transport = CONFIG.protocol === 'https' ? https : http;
return new Promise((resolve, reject) => {
const req = transport.request({
hostname: CONFIG.host, port: CONFIG.port,
path: '/api/v4' + apiPath, method,
headers: { 'Authorization': `Bearer ${CONFIG.token}`, 'Content-Type': 'application/json' }
}, (res) => {
let body = '';
res.on('data', (chunk) => body += chunk);
res.on('end', () => {
if (res.statusCode >= 200 && res.statusCode < 300) {
try { resolve(JSON.parse(body)); } catch (_) { resolve(body); }
} else {
let msg = `HTTP ${res.statusCode}`;
try { msg = JSON.parse(body).message || msg; } catch (_) {}
reject(new Error(msg));
}
});
const transport = CONFIG.protocol === 'https' ? https : http;
return new Promise((resolve, reject) => {
const req = transport.request(
{
hostname: CONFIG.host,
port: CONFIG.port,
path: '/api/v4' + apiPath,
method,
headers: { Authorization: `Bearer ${CONFIG.token}`, 'Content-Type': 'application/json' },
},
(res) => {
let body = '';
res.on('data', (chunk) => (body += chunk));
res.on('end', () => {
if (res.statusCode >= 200 && res.statusCode < 300) {
try {
resolve(JSON.parse(body));
} catch (_e) {
resolve(body);
}
} else {
let msg = `HTTP ${res.statusCode}`;
try {
msg = JSON.parse(body).message || msg;
} catch (_e) {
/* use default msg */
}
reject(new Error(msg));
}
});
req.on('error', (e) => reject(e));
if (data) req.write(JSON.stringify(data));
req.end();
});
},
);
req.on('error', (e) => reject(e));
if (data) req.write(JSON.stringify(data));
req.end();
});
}
// --- RICH ATTACHMENT HELPERS ---
const RICH_STYLES = {
create: { color: '#FFA500', prefix: '⏳', status: '🔄 Running' },
update: { color: '#FFA500', prefix: '🔄', status: '🔄 Running' },
complete: { color: '#36A64F', prefix: '✅', status: '✅ Complete' },
error: { color: '#DC3545', prefix: '❌', status: '❌ Error' }
create: { color: '#FFA500', prefix: '⏳', status: '🔄 Running' },
update: { color: '#FFA500', prefix: '🔄', status: '🔄 Running' },
complete: { color: '#36A64F', prefix: '✅', status: '✅ Complete' },
error: { color: '#DC3545', prefix: '❌', status: '❌ Error' },
};
function buildAttachment(cmd, text) {
const style = RICH_STYLES[cmd] || RICH_STYLES.update;
const agentName = options.agent || 'unknown';
const style = RICH_STYLES[cmd] || RICH_STYLES.update; // eslint-disable-line security/detect-object-injection
const agentName = options.agent || 'unknown';
// Split text: first line = title, rest = log body
const lines = text.split('\n');
const title = `${style.prefix} ${lines[0]}`;
const body = lines.length > 1 ? lines.slice(1).join('\n') : '';
// Split text: first line = title, rest = log body
const lines = text.split('\n');
const title = `${style.prefix} ${lines[0]}`;
const body = lines.length > 1 ? lines.slice(1).join('\n') : '';
return {
color: style.color,
title: title,
text: body || undefined,
fields: [
{ short: true, title: 'Agent', value: agentName },
{ short: true, title: 'Status', value: style.status }
]
};
return {
color: style.color,
title: title,
text: body || undefined,
fields: [
{ short: true, title: 'Agent', value: agentName },
{ short: true, title: 'Status', value: style.status },
],
};
}
// --- COMMANDS ---
async function createPost(text, cmd) {
if (!CONFIG.channel_id) {
console.error('Error: Channel ID required. Use --channel <id> or set MM_CHANNEL_ID.');
process.exit(1);
}
if (!text || !text.trim()) {
console.error('Error: Message text is required for create.');
process.exit(1);
}
try {
let payload;
if (options.rich) {
payload = {
channel_id: CONFIG.channel_id,
message: '',
props: { attachments: [buildAttachment(cmd || 'create', text)] }
};
} else {
payload = { channel_id: CONFIG.channel_id, message: text };
}
if (options.replyTo) payload.root_id = options.replyTo;
const result = await request('POST', '/posts', payload);
console.log(result.id);
} catch (e) {
console.error('Error (create):', e.message);
process.exit(1);
if (!CONFIG.channel_id) {
console.error('Error: Channel ID required. Use --channel <id> or set MM_CHANNEL_ID.');
process.exit(1);
}
if (!text || !text.trim()) {
console.error('Error: Message text is required for create.');
process.exit(1);
}
try {
let payload;
if (options.rich) {
payload = {
channel_id: CONFIG.channel_id,
message: '',
props: { attachments: [buildAttachment(cmd || 'create', text)] },
};
} else {
payload = { channel_id: CONFIG.channel_id, message: text };
}
if (options.replyTo) payload.root_id = options.replyTo;
const result = await request('POST', '/posts', payload);
console.log(result.id);
} catch (e) {
console.error('Error (create):', e.message);
process.exit(1);
}
}
async function updatePost(postId, text, cmd) {
if (!postId) {
console.error('Error: Post ID is required for update.');
process.exit(1);
}
if (!text || !text.trim()) {
console.error('Error: Message text is required for update.');
process.exit(1);
}
try {
if (options.rich) {
await request('PUT', `/posts/${postId}`, {
id: postId,
message: '',
props: { attachments: [buildAttachment(cmd || 'update', text)] }
});
} else {
const current = await request('GET', `/posts/${postId}`);
await request('PUT', `/posts/${postId}`, {
id: postId, message: text, props: current.props
});
}
console.log('updated');
} catch (e) {
console.error('Error (update):', e.message);
process.exit(1);
if (!postId) {
console.error('Error: Post ID is required for update.');
process.exit(1);
}
if (!text || !text.trim()) {
console.error('Error: Message text is required for update.');
process.exit(1);
}
try {
if (options.rich) {
await request('PUT', `/posts/${postId}`, {
id: postId,
message: '',
props: { attachments: [buildAttachment(cmd || 'update', text)] },
});
} else {
const current = await request('GET', `/posts/${postId}`);
await request('PUT', `/posts/${postId}`, {
id: postId,
message: text,
props: current.props,
});
}
console.log('updated');
} catch (e) {
console.error('Error (update):', e.message);
process.exit(1);
}
}
async function deletePost(postId) {
if (!postId) {
console.error('Error: Post ID is required for delete.');
process.exit(1);
}
try {
await request('DELETE', `/posts/${postId}`);
console.log('deleted');
} catch (e) {
console.error('Error (delete):', e.message);
process.exit(1);
}
if (!postId) {
console.error('Error: Post ID is required for delete.');
process.exit(1);
}
try {
await request('DELETE', `/posts/${postId}`);
console.log('deleted');
} catch (e) {
console.error('Error (delete):', e.message);
process.exit(1);
}
}
// --- CLI ROUTER ---
if (command === 'create') {
createPost(otherArgs.join(' '), 'create');
createPost(otherArgs.join(' '), 'create');
} else if (command === 'update') {
updatePost(otherArgs[0], otherArgs.slice(1).join(' '), 'update');
updatePost(otherArgs[0], otherArgs.slice(1).join(' '), 'update');
} else if (command === 'complete') {
updatePost(otherArgs[0], otherArgs.slice(1).join(' '), 'complete');
updatePost(otherArgs[0], otherArgs.slice(1).join(' '), 'complete');
} else if (command === 'error') {
updatePost(otherArgs[0], otherArgs.slice(1).join(' '), 'error');
updatePost(otherArgs[0], otherArgs.slice(1).join(' '), 'error');
} else if (command === 'delete') {
deletePost(otherArgs[0]);
deletePost(otherArgs[0]);
} else {
console.log('Usage:');
console.log(' live-status [options] create <text>');
console.log(' live-status [options] update <id> <text>');
console.log(' live-status [options] complete <id> <text>');
console.log(' live-status [options] error <id> <text>');
console.log(' live-status [options] delete <id>');
console.log('');
console.log('Options:');
console.log(' --rich Use rich message attachments (colored cards)');
console.log(' --channel ID Target channel');
console.log(' --reply-to ID Post as thread reply');
console.log(' --agent NAME Use bot token mapped to this agent');
console.log(' --token TOKEN Explicit bot token (overrides all)');
console.log(' --host HOST Mattermost hostname');
console.log('');
console.log('Rich mode colors:');
console.log(' create/update → Orange (running)');
console.log(' complete → Green (done)');
console.log(' error → Red (failed)');
process.exit(1);
console.log('Usage:');
console.log(' live-status [options] create <text>');
console.log(' live-status [options] update <id> <text>');
console.log(' live-status [options] complete <id> <text>');
console.log(' live-status [options] error <id> <text>');
console.log(' live-status [options] delete <id>');
console.log('');
console.log('Options:');
console.log(' --rich Use rich message attachments (colored cards)');
console.log(' --channel ID Target channel');
console.log(' --reply-to ID Post as thread reply');
console.log(' --agent NAME Use bot token mapped to this agent');
console.log(' --token TOKEN Explicit bot token (overrides all)');
console.log(' --host HOST Mattermost hostname');
console.log('');
console.log('Rich mode colors:');
console.log(' create/update → Orange (running)');
console.log(' complete → Green (done)');
console.log(' error → Red (failed)');
process.exit(1);
}

43
src/logger.js Normal file
View File

@@ -0,0 +1,43 @@
'use strict';
/**
* logger.js — pino wrapper with default config.
* Singleton logger; supports session-scoped child loggers.
*/
const pino = require('pino');
let _logger = null;
function getLogger() {
if (!_logger) {
// Get log level from env directly (avoid circular dep with config.js)
const rawLevel = process.env.LOG_LEVEL;
const validLevels = ['trace', 'debug', 'info', 'warn', 'error', 'fatal', 'silent'];
const level = rawLevel && validLevels.includes(rawLevel) ? rawLevel : 'info';
_logger = pino({
level,
base: { pid: process.pid },
timestamp: pino.stdTimeFunctions.isoTime,
});
}
return _logger;
}
/**
* Create a child logger scoped to a session.
* @param {string} sessionKey
* @returns {pino.Logger}
*/
function sessionLogger(sessionKey) {
return getLogger().child({ sessionKey });
}
/**
* Reset the logger singleton (for tests).
*/
function resetLogger() {
_logger = null;
}
module.exports = { getLogger, sessionLogger, resetLogger };

328
src/status-box.js Normal file
View File

@@ -0,0 +1,328 @@
'use strict';
/**
* status-box.js — Mattermost post manager.
*
* Features:
* - Shared http.Agent (keepAlive, maxSockets)
* - createPost / updatePost with circuit breaker
* - Throttle: leading-edge fires immediately, trailing flush after THROTTLE_MS
* - Message size guard (truncate to MAX_MESSAGE_CHARS)
* - Retry with exponential backoff on 429/5xx (up to MAX_RETRIES)
* - Structured logs for every API call
*/
const https = require('https');
const http = require('http');
const { CircuitBreaker } = require('./circuit-breaker');
const DEFAULT_THROTTLE_MS = 500;
const DEFAULT_MAX_CHARS = 15000;
const DEFAULT_MAX_RETRIES = 3;
const DEFAULT_MAX_SOCKETS = 4;
class StatusBox {
/**
* @param {object} opts
* @param {string} opts.baseUrl - Mattermost base URL
* @param {string} opts.token - Bot token
* @param {object} [opts.logger] - pino logger
* @param {number} [opts.throttleMs]
* @param {number} [opts.maxMessageChars]
* @param {number} [opts.maxRetries]
* @param {number} [opts.maxSockets]
* @param {CircuitBreaker} [opts.circuitBreaker]
*/
constructor(opts) {
this.baseUrl = opts.baseUrl;
this.token = opts.token;
this.logger = opts.logger || null;
this.throttleMs = opts.throttleMs || DEFAULT_THROTTLE_MS;
this.maxMessageChars = opts.maxMessageChars || DEFAULT_MAX_CHARS;
this.maxRetries = opts.maxRetries || DEFAULT_MAX_RETRIES;
const parsedUrl = new URL(this.baseUrl);
this.hostname = parsedUrl.hostname;
this.port = parsedUrl.port
? parseInt(parsedUrl.port, 10)
: parsedUrl.protocol === 'https:'
? 443
: 80;
this.isHttps = parsedUrl.protocol === 'https:';
const maxSockets = opts.maxSockets || DEFAULT_MAX_SOCKETS;
this.agent = new (this.isHttps ? https : http).Agent({
keepAlive: true,
maxSockets,
});
this.circuitBreaker =
opts.circuitBreaker ||
new CircuitBreaker({
threshold: 5,
cooldownMs: 30000,
logger: this.logger,
});
// Metrics
this.metrics = {
updatesSent: 0,
updatesFailed: 0,
queueDepth: 0,
};
// Throttle state per postId
// Map<postId, { pending: string|null, timer: NodeJS.Timeout|null, lastFiredAt: number }>
this._throttleState = new Map();
}
/**
* Create a new Mattermost post.
* @param {string} channelId
* @param {string} text
* @param {string} [rootId] - Thread root post ID
* @returns {Promise<string>} Post ID
*/
async createPost(channelId, text, rootId) {
const body = { channel_id: channelId, message: this._truncate(text) };
if (rootId) body.root_id = rootId;
const post = await this._apiCall('POST', '/api/v4/posts', body);
if (this.logger) this.logger.debug({ postId: post.id, channelId }, 'Created status post');
this.metrics.updatesSent++;
return post.id;
}
/**
* Update a Mattermost post (throttled).
* Leading edge fires immediately; subsequent calls within throttleMs are batched.
* Guaranteed trailing flush when activity stops.
*
* @param {string} postId
* @param {string} text
* @returns {Promise<void>}
*/
updatePost(postId, text) {
return new Promise((resolve, reject) => {
let state = this._throttleState.get(postId);
if (!state) {
state = { pending: null, timer: null, lastFiredAt: 0, resolvers: [] };
this._throttleState.set(postId, state);
}
state.resolvers.push({ resolve, reject });
state.pending = text;
this.metrics.queueDepth = this._throttleState.size;
const now = Date.now();
const elapsed = now - state.lastFiredAt;
if (elapsed >= this.throttleMs && !state.timer) {
// Leading edge: fire immediately
this._flushUpdate(postId);
} else {
// Trailing flush: schedule if not already scheduled
if (!state.timer) {
state.timer = setTimeout(() => {
this._flushUpdate(postId);
}, this.throttleMs - elapsed);
}
// If timer already scheduled, pending text was updated above — it will flush latest text
}
});
}
/**
* Flush the pending update for a postId.
* @private
*/
async _flushUpdate(postId) {
const state = this._throttleState.get(postId);
if (!state || state.pending === null) return;
const text = state.pending;
const resolvers = [...state.resolvers];
state.pending = null;
state.resolvers = [];
state.lastFiredAt = Date.now();
if (state.timer) {
clearTimeout(state.timer);
state.timer = null;
}
this.metrics.queueDepth = Math.max(0, this.metrics.queueDepth - 1);
try {
await this._apiCallWithRetry('PUT', `/api/v4/posts/${postId}`, {
id: postId,
message: this._truncate(text),
});
this.metrics.updatesSent++;
resolvers.forEach(({ resolve }) => resolve());
} catch (err) {
this.metrics.updatesFailed++;
resolvers.forEach(({ reject }) => reject(err));
}
}
/**
* Force-flush any pending update for a postId (used on shutdown).
* @param {string} postId
* @returns {Promise<void>}
*/
async forceFlush(postId) {
const state = this._throttleState.get(postId);
if (!state) return;
if (state.timer) {
clearTimeout(state.timer);
state.timer = null;
}
if (state.pending !== null) {
await this._flushUpdate(postId);
}
}
/**
* Force-flush all pending updates.
* @returns {Promise<void>}
*/
async flushAll() {
const postIds = [...this._throttleState.keys()];
await Promise.allSettled(postIds.map((id) => this.forceFlush(id)));
}
/**
* Delete a post.
* @param {string} postId
* @returns {Promise<void>}
*/
async deletePost(postId) {
await this._apiCall('DELETE', `/api/v4/posts/${postId}`, null);
}
/**
* Truncate text to maxMessageChars.
* @private
*/
_truncate(text) {
if (text.length <= this.maxMessageChars) return text;
const suffix = '\n...(truncated)';
return text.slice(0, this.maxMessageChars - suffix.length) + suffix;
}
/**
* Make an API call through the circuit breaker with retries.
* @private
*/
async _apiCallWithRetry(method, path, body) {
return this.circuitBreaker.execute(() => this._retryApiCall(method, path, body));
}
/**
* Make an API call directly (no circuit breaker, for createPost).
* @private
*/
async _apiCall(method, apiPath, body) {
return this.circuitBreaker.execute(() => this._retryApiCall(method, apiPath, body));
}
/**
* Retry logic for API calls.
* @private
*/
async _retryApiCall(method, apiPath, body, attempt = 0) {
try {
return await this._httpRequest(method, apiPath, body);
} catch (err) {
const isRetryable = err.statusCode === 429 || (err.statusCode >= 500 && err.statusCode < 600);
if (isRetryable && attempt < this.maxRetries) {
const delayMs = Math.min(1000 * Math.pow(2, attempt), 10000);
if (this.logger) {
this.logger.warn(
{ attempt, delayMs, statusCode: err.statusCode },
'API call failed, retrying',
);
}
await sleep(delayMs);
return this._retryApiCall(method, apiPath, body, attempt + 1);
}
throw err;
}
}
/**
* Low-level HTTP request.
* @private
*/
_httpRequest(method, apiPath, body) {
const transport = this.isHttps ? https : http;
const bodyStr = body ? JSON.stringify(body) : null;
return new Promise((resolve, reject) => {
const reqOpts = {
hostname: this.hostname,
port: this.port,
path: apiPath,
method,
agent: this.agent,
headers: {
Authorization: `Bearer ${this.token}`,
'Content-Type': 'application/json',
},
};
if (bodyStr) {
reqOpts.headers['Content-Length'] = Buffer.byteLength(bodyStr);
}
const req = transport.request(reqOpts, (res) => {
let data = '';
res.on('data', (chunk) => (data += chunk));
res.on('end', () => {
if (res.statusCode >= 200 && res.statusCode < 300) {
try {
resolve(data ? JSON.parse(data) : {});
} catch (_e) {
resolve({});
}
} else {
let msg = `HTTP ${res.statusCode}`;
try {
msg = JSON.parse(data).message || msg;
} catch (_e) {
/* use default */
}
const err = new Error(msg);
err.statusCode = res.statusCode;
reject(err);
}
});
});
req.on('error', reject);
if (bodyStr) req.write(bodyStr);
req.end();
});
}
getMetrics() {
return {
...this.metrics,
circuit: this.circuitBreaker.getMetrics(),
};
}
destroy() {
this.agent.destroy();
// Clear all throttle timers
for (const [, state] of this._throttleState) {
if (state.timer) clearTimeout(state.timer);
}
this._throttleState.clear();
}
}
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
module.exports = { StatusBox };

142
src/status-formatter.js Normal file
View File

@@ -0,0 +1,142 @@
'use strict';
/**
* status-formatter.js — Pure function: SessionState -> formatted Mattermost text.
*
* Output format:
* [ACTIVE] main | 38s
* Reading live-status source code...
* exec: ls /agents/sessions [OK]
* Analyzing agent configurations...
* Sub-agent: proj035-planner
* Reading protocol...
* [DONE] 28s
* [DONE] 53s | 12.4k tokens
*/
const MAX_STATUS_LINES = parseInt(process.env.MAX_STATUS_LINES, 10) || 20;
const MAX_LINE_CHARS = 120;
/**
* Format a SessionState into a Mattermost text string.
*
* @param {object} sessionState
* @param {string} sessionState.sessionKey
* @param {string} sessionState.status - 'active' | 'done' | 'error' | 'interrupted'
* @param {number} sessionState.startTime - ms since epoch
* @param {Array<string>} sessionState.lines - Status lines (most recent activity)
* @param {Array<object>} [sessionState.children] - Child session states
* @param {number} [sessionState.tokenCount] - Token count if available
* @param {string} [sessionState.agentId] - Agent ID (e.g. "main")
* @param {number} [sessionState.depth] - Nesting depth (0 = top-level)
* @returns {string}
*/
function format(sessionState, opts = {}) {
const maxLines = opts.maxLines || MAX_STATUS_LINES;
const depth = sessionState.depth || 0;
const indent = ' '.repeat(depth);
const lines = [];
// Header line
const elapsed = formatElapsed(Date.now() - sessionState.startTime);
const agentId = sessionState.agentId || extractAgentId(sessionState.sessionKey);
const statusPrefix = statusIcon(sessionState.status);
lines.push(`${indent}${statusPrefix} ${agentId} | ${elapsed}`);
// Status lines (trimmed to maxLines, most recent)
const statusLines = (sessionState.lines || []).slice(-maxLines);
for (const line of statusLines) {
lines.push(`${indent} ${truncateLine(line)}`);
}
// Child sessions (sub-agents)
if (sessionState.children && sessionState.children.length > 0) {
for (const child of sessionState.children) {
const childLines = format(child, { maxLines: Math.floor(maxLines / 2), ...opts }).split('\n');
lines.push(...childLines);
}
}
// Footer line (only for done/error/interrupted)
if (sessionState.status !== 'active') {
const tokenStr = sessionState.tokenCount
? ` | ${formatTokens(sessionState.tokenCount)} tokens`
: '';
lines.push(`${indent} [${sessionState.status.toUpperCase()}] ${elapsed}${tokenStr}`);
}
return lines.join('\n');
}
/**
* Format elapsed milliseconds as human-readable string.
* @param {number} ms
* @returns {string}
*/
function formatElapsed(ms) {
if (ms < 0) ms = 0;
const s = Math.floor(ms / 1000);
const m = Math.floor(s / 60);
const h = Math.floor(m / 60);
if (h > 0) return `${h}h${m % 60}m`;
if (m > 0) return `${m}m${s % 60}s`;
return `${s}s`;
}
/**
* Format token count as compact string (e.g. 12400 -> "12.4k").
* @param {number} count
* @returns {string}
*/
function formatTokens(count) {
if (count >= 1000000) return `${(count / 1000000).toFixed(1)}M`;
if (count >= 1000) return `${(count / 1000).toFixed(1)}k`;
return String(count);
}
/**
* Get status icon prefix.
* @param {string} status
* @returns {string}
*/
function statusIcon(status) {
switch (status) {
case 'active':
return '[ACTIVE]';
case 'done':
return '[DONE]';
case 'error':
return '[ERROR]';
case 'interrupted':
return '[INTERRUPTED]';
default:
return '[UNKNOWN]';
}
}
/**
* Truncate a line to MAX_LINE_CHARS.
* @param {string} line
* @returns {string}
*/
function truncateLine(line) {
if (line.length <= MAX_LINE_CHARS) return line;
return line.slice(0, MAX_LINE_CHARS - 3) + '...';
}
/**
* Extract agent ID from session key.
* Session key format: "agent:main:mattermost:channel:abc123:thread:xyz"
* @param {string} sessionKey
* @returns {string}
*/
function extractAgentId(sessionKey) {
if (!sessionKey) return 'unknown';
const parts = sessionKey.split(':');
// "agent:main:..." -> "main"
if (parts[0] === 'agent' && parts[1]) return parts[1];
return sessionKey.split(':')[0] || 'unknown';
}
module.exports = { format, formatElapsed, formatTokens, statusIcon, truncateLine, extractAgentId };

394
src/status-watcher.js Normal file
View File

@@ -0,0 +1,394 @@
'use strict';
/**
* status-watcher.js — Core JSONL watcher.
*
* - fs.watch on TRANSCRIPT_DIR (recursive)
* - On file change: read new bytes, parse JSONL, update SessionState
* - Map parsed events to status lines
* - Detect file truncation (compaction) -> reset offset
* - Debounce updates via status-box.js throttle
* - Idle detection: pendingToolCalls==0 AND no new lines for IDLE_TIMEOUT_S
* - Emits: 'session-update' (sessionKey, sessionState)
* 'session-idle' (sessionKey)
*/
const fs = require('fs');
const path = require('path');
const { EventEmitter } = require('events');
const { resolve: resolveLabel } = require('./tool-labels');
class StatusWatcher extends EventEmitter {
/**
* @param {object} opts
* @param {string} opts.transcriptDir - Base transcript directory
* @param {number} [opts.idleTimeoutS] - Idle timeout in seconds
* @param {object} [opts.logger] - pino logger
*/
constructor(opts) {
super();
this.transcriptDir = opts.transcriptDir;
this.idleTimeoutS = opts.idleTimeoutS || 60;
this.logger = opts.logger || null;
// Map<sessionKey, SessionState>
this.sessions = new Map();
// Map<filePath, sessionKey>
this.fileToSession = new Map();
// fs.Watcher instance
this._watcher = null;
this._running = false;
}
/**
* Register a session to watch.
* @param {string} sessionKey
* @param {string} transcriptFile - Absolute path to {uuid}.jsonl
* @param {object} [initialState] - Pre-populated state (from offset recovery)
*/
addSession(sessionKey, transcriptFile, initialState = {}) {
if (this.sessions.has(sessionKey)) return;
const state = {
sessionKey,
transcriptFile,
status: 'active',
startTime: initialState.startTime || Date.now(),
lines: initialState.lines || [],
pendingToolCalls: 0,
lastOffset: initialState.lastOffset || 0,
lastActivityAt: Date.now(),
agentId: initialState.agentId || extractAgentId(sessionKey),
depth: initialState.depth || 0,
tokenCount: 0,
children: [],
idleTimer: null,
};
this.sessions.set(sessionKey, state);
this.fileToSession.set(transcriptFile, sessionKey);
if (this.logger) {
this.logger.debug({ sessionKey, transcriptFile }, 'Session added to watcher');
}
// Immediately read any existing content
this._readFile(sessionKey, state);
}
/**
* Remove a session from watching.
* @param {string} sessionKey
*/
removeSession(sessionKey) {
const state = this.sessions.get(sessionKey);
if (!state) return;
if (state.idleTimer) clearTimeout(state.idleTimer);
this.fileToSession.delete(state.transcriptFile);
this.sessions.delete(sessionKey);
if (this.logger) {
this.logger.debug({ sessionKey }, 'Session removed from watcher');
}
}
/**
* Get current session state (for offset persistence).
* @param {string} sessionKey
* @returns {object|null}
*/
getSessionState(sessionKey) {
return this.sessions.get(sessionKey) || null;
}
/**
* Start the file system watcher.
*/
start() {
if (this._running) return;
this._running = true;
try {
this._watcher = fs.watch(this.transcriptDir, { recursive: true }, (eventType, filename) => {
if (!filename) return;
const fullPath = path.resolve(this.transcriptDir, filename);
this._onFileChange(fullPath);
});
this._watcher.on('error', (err) => {
if (this.logger) this.logger.error({ err }, 'fs.watch error');
this.emit('error', err);
});
if (this.logger) {
this.logger.info({ dir: this.transcriptDir }, 'StatusWatcher started (fs.watch)');
}
} catch (err) {
if (this.logger) {
this.logger.error({ err }, 'Failed to start fs.watch — transcriptDir may not exist');
}
this._running = false;
throw err;
}
}
/**
* Stop the file system watcher.
*/
stop() {
this._running = false;
if (this._watcher) {
this._watcher.close();
this._watcher = null;
}
// Clear all idle timers
for (const [, state] of this.sessions) {
if (state.idleTimer) {
clearTimeout(state.idleTimer);
state.idleTimer = null;
}
}
if (this.logger) this.logger.info('StatusWatcher stopped');
}
/**
* Handle a file change event.
* @private
*/
_onFileChange(fullPath) {
// Only process .jsonl files
if (!fullPath.endsWith('.jsonl')) return;
const sessionKey = this.fileToSession.get(fullPath);
if (!sessionKey) return;
const state = this.sessions.get(sessionKey);
if (!state) return;
this._readFile(sessionKey, state);
}
/**
* Read new bytes from a transcript file.
* @private
*/
_readFile(sessionKey, state) {
let fd;
try {
fd = fs.openSync(state.transcriptFile, 'r');
const stat = fs.fstatSync(fd);
const fileSize = stat.size;
// Detect file truncation (compaction)
if (fileSize < state.lastOffset) {
if (this.logger) {
this.logger.warn(
{ sessionKey, fileSize, lastOffset: state.lastOffset },
'Transcript truncated (compaction detected) — resetting offset',
);
}
state.lastOffset = 0;
state.lines = ['[session compacted - continuing]'];
state.pendingToolCalls = 0;
}
if (fileSize <= state.lastOffset) {
fs.closeSync(fd);
return;
}
// Read new bytes
const bytesToRead = fileSize - state.lastOffset;
const buffer = Buffer.allocUnsafe(bytesToRead);
const bytesRead = fs.readSync(fd, buffer, 0, bytesToRead, state.lastOffset);
fs.closeSync(fd);
state.lastOffset += bytesRead;
// Parse JSONL lines
const chunk = buffer.toString('utf8', 0, bytesRead);
const lines = chunk.split('\n').filter((l) => l.trim());
for (const line of lines) {
this._parseLine(sessionKey, state, line);
}
// Update activity timestamp
state.lastActivityAt = Date.now();
// Schedule idle check
this._scheduleIdleCheck(sessionKey, state);
// Emit update event
this.emit('session-update', sessionKey, this._sanitizeState(state));
} catch (err) {
if (fd !== undefined) {
try {
fs.closeSync(fd);
} catch (_e) {
/* ignore close error */
}
}
if (this.logger) {
this.logger.error({ sessionKey, err }, 'Error reading transcript file');
}
}
}
/**
* Parse a single JSONL line and update session state.
* @private
*/
_parseLine(sessionKey, state, line) {
let record;
try {
record = JSON.parse(line);
} catch (_e) {
// Skip malformed lines
return;
}
const { type } = record;
switch (type) {
case 'tool_call': {
state.pendingToolCalls++;
const toolName = record.name || record.tool || 'unknown';
const label = resolveLabel(toolName);
const statusLine = ` ${toolName}: ${label}`;
state.lines.push(statusLine);
break;
}
case 'tool_result': {
if (state.pendingToolCalls > 0) state.pendingToolCalls--;
const toolName = record.name || record.tool || 'unknown';
// Update the last tool_call line for this tool to show [OK] or [ERR]
const marker = record.error ? '[ERR]' : '[OK]';
const idx = findLastIndex(state.lines, (l) => l.includes(` ${toolName}:`));
if (idx >= 0) {
// Replace placeholder with result
state.lines[idx] = state.lines[idx].replace(/( \[OK\]| \[ERR\])?$/, ` ${marker}`);
}
break;
}
case 'assistant': {
// Assistant text chunk
const text = (record.text || record.content || '').trim();
if (text) {
const truncated = text.length > 80 ? text.slice(0, 77) + '...' : text;
state.lines.push(truncated);
}
break;
}
case 'usage': {
// Token usage update
if (record.total_tokens) state.tokenCount = record.total_tokens;
else if (record.input_tokens || record.output_tokens) {
state.tokenCount = (record.input_tokens || 0) + (record.output_tokens || 0);
}
break;
}
case 'session_start': {
state.startTime = record.timestamp ? new Date(record.timestamp).getTime() : state.startTime;
break;
}
default:
// Ignore unknown record types
break;
}
}
/**
* Schedule an idle check for a session.
* @private
*/
_scheduleIdleCheck(sessionKey, state) {
if (state.idleTimer) {
clearTimeout(state.idleTimer);
}
state.idleTimer = setTimeout(() => {
this._checkIdle(sessionKey);
}, this.idleTimeoutS * 1000);
}
/**
* Check if a session is idle.
* @private
*/
_checkIdle(sessionKey) {
const state = this.sessions.get(sessionKey);
if (!state) return;
const elapsed = Date.now() - state.lastActivityAt;
const idleMs = this.idleTimeoutS * 1000;
if (elapsed >= idleMs && state.pendingToolCalls === 0) {
if (this.logger) {
this.logger.info({ sessionKey, elapsedS: Math.floor(elapsed / 1000) }, 'Session idle');
}
state.status = 'done';
state.idleTimer = null;
this.emit('session-idle', sessionKey, this._sanitizeState(state));
} else {
// Reschedule
this._scheduleIdleCheck(sessionKey, state);
}
}
/**
* Return a safe copy of session state (without circular refs, timers).
* @private
*/
_sanitizeState(state) {
return {
sessionKey: state.sessionKey,
transcriptFile: state.transcriptFile,
status: state.status,
startTime: state.startTime,
lines: [...state.lines],
pendingToolCalls: state.pendingToolCalls,
lastOffset: state.lastOffset,
lastActivityAt: state.lastActivityAt,
agentId: state.agentId,
depth: state.depth,
tokenCount: state.tokenCount,
children: state.children,
};
}
}
/**
* Extract agent ID from session key.
* @param {string} sessionKey
* @returns {string}
*/
function extractAgentId(sessionKey) {
if (!sessionKey) return 'unknown';
const parts = sessionKey.split(':');
if (parts[0] === 'agent' && parts[1]) return parts[1];
return parts[0] || 'unknown';
}
/**
* Find the last index in an array satisfying a predicate.
* @param {Array} arr
* @param {Function} predicate
* @returns {number}
*/
function findLastIndex(arr, predicate) {
for (let i = arr.length - 1; i >= 0; i--) {
if (predicate(arr[i])) return i; // eslint-disable-line security/detect-object-injection
}
return -1;
}
module.exports = { StatusWatcher };

109
src/tool-labels.js Normal file
View File

@@ -0,0 +1,109 @@
'use strict';
/**
* tool-labels.js — Pattern-matching tool name -> label resolver.
*
* Resolution order:
* 1. Exact match (e.g. "exec" -> "Running command...")
* 2. Prefix match (e.g. "camofox_*" -> "Using browser...")
* 3. Regex match (e.g. /^claude_/ -> "Running Claude Code...")
* 4. Default label ("Working...")
*/
const path = require('path');
const fs = require('fs');
let _labels = null;
let _externalFile = null;
/**
* Load tool labels from JSON file(s).
* Merges external override on top of built-in defaults.
* @param {string|null} externalFile - Path to external JSON override
*/
function loadLabels(externalFile = null) {
_externalFile = externalFile;
// Load built-in defaults
const builtinPath = path.join(__dirname, 'tool-labels.json');
let builtin = {};
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
builtin = JSON.parse(fs.readFileSync(builtinPath, 'utf8'));
} catch (_e) {
/* use empty defaults if file missing */
}
let external = {};
if (externalFile) {
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
external = JSON.parse(fs.readFileSync(externalFile, 'utf8'));
} catch (_e) {
/* external file missing or invalid — use built-in only */
}
}
// Merge: external overrides built-in
_labels = {
exact: Object.assign({}, builtin.exact || {}, external.exact || {}),
prefix: Object.assign({}, builtin.prefix || {}, external.prefix || {}),
regex: [...(builtin.regex || []), ...(external.regex || [])],
default: external.default !== undefined ? external.default : builtin.default || 'Working...',
};
return _labels;
}
/**
* Resolve a tool name to a human-readable label.
* @param {string} toolName
* @returns {string}
*/
function resolve(toolName) {
if (!_labels) loadLabels(_externalFile);
const labels = _labels;
// 1. Exact match
if (Object.prototype.hasOwnProperty.call(labels.exact, toolName)) {
return labels.exact[toolName]; // eslint-disable-line security/detect-object-injection
}
// 2. Prefix match
for (const [prefix, label] of Object.entries(labels.prefix)) {
if (toolName.startsWith(prefix)) {
return label;
}
}
// 3. Regex match (patterns stored as strings like "/^claude_/i")
for (const entry of labels.regex || []) {
const pattern = typeof entry === 'string' ? entry : entry.pattern;
const label = typeof entry === 'string' ? labels.default : entry.label;
if (pattern) {
try {
const match = pattern.match(/^\/(.+)\/([gimuy]*)$/);
if (match) {
const re = new RegExp(match[1], match[2]); // eslint-disable-line security/detect-non-literal-regexp
if (re.test(toolName)) return label;
}
} catch (_e) {
/* invalid regex — skip */
}
}
}
// 4. Default
return labels.default;
}
/**
* Reset labels (for tests).
*/
function resetLabels() {
_labels = null;
_externalFile = null;
}
module.exports = { loadLabels, resolve, resetLabels };

41
src/tool-labels.json Normal file
View File

@@ -0,0 +1,41 @@
{
"_comment": "Tool name to human-readable label mapping. Supports exact match, prefix match (end with *), and regex (start with /).",
"exact": {
"Read": "Reading file...",
"Write": "Writing file...",
"Edit": "Editing file...",
"exec": "Running command...",
"process": "Managing process...",
"web_search": "Searching the web...",
"web_fetch": "Fetching URL...",
"browser": "Controlling browser...",
"canvas": "Drawing canvas...",
"nodes": "Querying nodes...",
"message": "Sending message...",
"tts": "Generating speech...",
"subagents": "Managing sub-agents...",
"image": "Analyzing image...",
"camofox_create_tab": "Opening browser tab...",
"camofox_close_tab": "Closing browser tab...",
"camofox_navigate": "Navigating browser...",
"camofox_click": "Clicking element...",
"camofox_type": "Typing in browser...",
"camofox_scroll": "Scrolling page...",
"camofox_screenshot": "Taking screenshot...",
"camofox_snapshot": "Capturing page snapshot...",
"camofox_list_tabs": "Listing browser tabs...",
"camofox_import_cookies": "Importing cookies...",
"claude_code_start": "Starting Claude Code task...",
"claude_code_status": "Checking Claude Code status...",
"claude_code_output": "Reading Claude Code output...",
"claude_code_cancel": "Cancelling Claude Code task...",
"claude_code_cleanup": "Cleaning up Claude Code sessions...",
"claude_code_sessions": "Listing Claude Code sessions..."
},
"prefix": {
"camofox_": "Using browser...",
"claude_code_": "Running Claude Code...",
"nodes_": "Querying nodes..."
},
"default": "Working..."
}