#!/usr/bin/env node 'use strict'; /* eslint-disable no-console */ /** * watcher-manager.js — Top-level orchestrator for the Live Status v4 daemon. * * CLI: node watcher-manager.js start|stop|status * * Architecture: * - SessionMonitor polls sessions.json every 2s for new/ended sessions * - StatusWatcher watches transcript files via fs.watch (inotify) * - StatusBox manages Mattermost posts (throttle, circuit breaker, retry) * - HealthServer exposes /health endpoint * - Offset persistence: save/restore last read positions on restart * - Graceful shutdown: SIGTERM/SIGINT -> mark all boxes interrupted -> exit 0 */ const fs = require('fs'); const { getConfig } = require('./config'); const { getLogger } = require('./logger'); const { SessionMonitor } = require('./session-monitor'); const { StatusWatcher } = require('./status-watcher'); const { StatusBox } = require('./status-box'); const { PluginClient } = require('./plugin-client'); // status-formatter is used inline via require() in helpers const { HealthServer } = require('./health'); const { loadLabels } = require('./tool-labels'); // ---- CLI Router ---- const cmd = process.argv[2]; if (cmd === 'start') { startDaemon().catch((err) => { console.error('Failed to start:', err.message); process.exit(1); }); } else if (cmd === 'stop') { stopDaemon(); } else if (cmd === 'status') { daemonStatus(); } else { console.log('Usage: node watcher-manager.js '); process.exit(1); } // ---- PID File helpers ---- function writePidFile(pidFile) { // eslint-disable-next-line security/detect-non-literal-fs-filename fs.writeFileSync(pidFile, String(process.pid), 'utf8'); } function readPidFile(pidFile) { try { // eslint-disable-next-line security/detect-non-literal-fs-filename return parseInt(fs.readFileSync(pidFile, 'utf8').trim(), 10); } catch (_e) { return null; } } function removePidFile(pidFile) { try { // eslint-disable-next-line security/detect-non-literal-fs-filename fs.unlinkSync(pidFile); } catch (_e) { /* ignore */ } } function isProcessRunning(pid) { try { process.kill(pid, 0); return true; } catch (_e) { return false; } } // ---- Offset persistence ---- function loadOffsets(offsetFile) { try { // eslint-disable-next-line security/detect-non-literal-fs-filename return JSON.parse(fs.readFileSync(offsetFile, 'utf8')); } catch (_e) { return {}; } } function saveOffsets(offsetFile, sessions) { const offsets = {}; for (const [key, state] of sessions) { offsets[key] = { lastOffset: state.lastOffset || 0, transcriptFile: state.transcriptFile, startTime: state.startTime, }; } try { // eslint-disable-next-line security/detect-non-literal-fs-filename fs.writeFileSync(offsetFile, JSON.stringify(offsets, null, 2), 'utf8'); } catch (_e) { /* ignore write error */ } } // ---- Status box marker for post recovery ---- function makeMarker(sessionKey) { return ``; } // ---- Main daemon ---- async function startDaemon() { const config = getConfig(); const logger = getLogger(); // Check if already running const existingPid = readPidFile(config.pidFile); if (existingPid && isProcessRunning(existingPid)) { console.error(`Daemon already running (PID ${existingPid})`); process.exit(1); } removePidFile(config.pidFile); // Write PID file writePidFile(config.pidFile); logger.info({ pid: process.pid, pidFile: config.pidFile }, 'Status watcher daemon starting'); // Load tool labels loadLabels(config.toolLabelsFile); // Load persisted offsets for restart recovery const savedOffsets = loadOffsets(config.offsetFile); logger.info({ count: Object.keys(savedOffsets).length }, 'Loaded persisted session offsets'); // Shared state // Map const activeBoxes = new Map(); // Pending sub-agents: spawnedBy key -> [info] (arrived before parent was added) const pendingSubAgents = new Map(); let globalMetrics = { activeSessions: 0, updatesSent: 0, updatesFailed: 0, queueDepth: 0, lastError: null, circuit: { state: 'unknown' }, }; // Shared StatusBox instance (single http.Agent pool) const sharedStatusBox = new StatusBox({ baseUrl: config.mm.baseUrl, token: config.mm.token, logger: logger.child({ module: 'status-box' }), throttleMs: config.throttleMs, maxMessageChars: config.maxMessageChars, maxRetries: config.maxRetries, maxSockets: config.mm.maxSockets, }); // Plugin client (optional — enhances rendering via WebSocket instead of PUT) var pluginClient = null; var usePlugin = false; if (config.plugin && config.plugin.enabled && config.plugin.url && config.plugin.secret) { pluginClient = new PluginClient({ pluginUrl: config.plugin.url, secret: config.plugin.secret, logger: logger.child({ module: 'plugin-client' }), }); // Initial plugin detection (awaited before monitor starts — see below) try { var healthy = await pluginClient.isHealthy(); usePlugin = healthy; logger.info({ usePlugin, url: config.plugin.url }, healthy ? 'Plugin detected — using WebSocket rendering mode' : 'Plugin not available — using REST API fallback'); } catch (_detectErr) { usePlugin = false; logger.warn('Plugin detection failed — using REST API fallback'); } // Periodic re-detection setInterval(function () { pluginClient.isHealthy().then(function (healthy) { if (healthy !== usePlugin) { usePlugin = healthy; logger.info({ usePlugin }, healthy ? 'Plugin came online' : 'Plugin went offline — fallback to REST API'); } }); }, config.plugin.detectIntervalMs || 60000); } // StatusWatcher const watcher = new StatusWatcher({ transcriptDir: config.transcriptDir, idleTimeoutS: config.idleTimeoutS, logger: logger.child({ module: 'status-watcher' }), }); // SessionMonitor const monitor = new SessionMonitor({ transcriptDir: config.transcriptDir, pollMs: config.sessionPollMs, defaultChannel: config.defaultChannel, logger: logger.child({ module: 'session-monitor' }), }); // Health server const healthServer = new HealthServer({ port: config.healthPort, logger: logger.child({ module: 'health' }), getMetrics: () => ({ ...globalMetrics, ...sharedStatusBox.getMetrics(), activeSessions: activeBoxes.size, pluginEnabled: usePlugin, }), }); // ---- Session Added ---- monitor.on('session-added', async (info) => { const { sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId } = info; // Skip if no channel if (!channelId) { logger.debug({ sessionKey }, 'No channel for session — skipping'); return; } // Enforce MAX_ACTIVE_SESSIONS if (activeBoxes.size >= config.maxActiveSessions) { logger.warn( { sessionKey, maxActiveSessions: config.maxActiveSessions }, 'Max active sessions reached — dropping session', ); return; } // Sub-agent: skip creating own post (embedded in parent) if (spawnedBy) { if (activeBoxes.has(spawnedBy)) { linkSubAgent( activeBoxes, watcher, spawnedBy, { sessionKey, transcriptFile, channelId, agentId, spawnDepth: info.spawnDepth }, logger, ); } else { // Parent not yet tracked — queue for later logger.debug({ sessionKey, spawnedBy }, 'Sub-agent queued (parent not yet tracked)'); if (!pendingSubAgents.has(spawnedBy)) pendingSubAgents.set(spawnedBy, []); pendingSubAgents .get(spawnedBy) .push({ sessionKey, transcriptFile, channelId, agentId, spawnDepth: info.spawnDepth }); } return; } let postId; // Check for existing post (restart recovery) const saved = savedOffsets[sessionKey]; // eslint-disable-line security/detect-object-injection if (saved) { // Try to find existing post in channel history postId = await findExistingPost(sharedStatusBox, channelId, sessionKey, logger); } // Create new post if none found if (!postId) { try { if (usePlugin && pluginClient) { // Plugin mode: create custom_livestatus post via plugin postId = await pluginClient.createSession(sessionKey, channelId, rootPostId, agentId); logger.info({ sessionKey, postId, channelId, mode: 'plugin' }, 'Created status box via plugin'); } else { // REST API fallback: create regular post with formatted text var initialText = buildInitialText(agentId, sessionKey); postId = await sharedStatusBox.createPost(channelId, initialText, rootPostId); logger.info({ sessionKey, postId, channelId, mode: 'rest' }, 'Created status box via REST API'); } } catch (err) { logger.error({ sessionKey, err }, 'Failed to create status post'); globalMetrics.lastError = err.message; return; } } activeBoxes.set(sessionKey, { postId, channelId, agentId, rootPostId, usePlugin: usePlugin && !!pluginClient, // track which mode this session uses children: new Map(), }); globalMetrics.activeSessions = activeBoxes.size; // Register in watcher const initialState = saved ? { lastOffset: saved.lastOffset, startTime: saved.startTime, agentId } : { agentId }; watcher.addSession(sessionKey, transcriptFile, initialState); // Process any pending sub-agents that were waiting for this parent if (pendingSubAgents.has(sessionKey)) { const pending = pendingSubAgents.get(sessionKey); pendingSubAgents.delete(sessionKey); for (const childInfo of pending) { logger.debug( { childKey: childInfo.sessionKey, parentKey: sessionKey }, 'Processing queued sub-agent', ); linkSubAgent(activeBoxes, watcher, sessionKey, childInfo, logger); } } }); // ---- Session Removed ---- monitor.on('session-removed', (sessionKey) => { // Don't immediately remove — let idle detection handle final flush logger.debug({ sessionKey }, 'Session removed from sessions.json'); }); // ---- Session Update (from watcher) ---- watcher.on('session-update', (sessionKey, state) => { const box = activeBoxes.get(sessionKey); if (!box) { // Sub-agent: update parent updateParentWithChild(activeBoxes, watcher, sharedStatusBox, pluginClient, sessionKey, state, logger); return; } if (box.usePlugin && pluginClient) { // Plugin mode: send structured data (WebSocket broadcast, no post edit) pluginClient.updateSession(sessionKey, { status: state.status, lines: state.lines, elapsed_ms: Date.now() - state.startTime, token_count: state.tokenCount || 0, children: (state.children || []).map(function (c) { return { session_key: c.sessionKey, agent_id: c.agentId, status: c.status, lines: c.lines || [], elapsed_ms: Date.now() - (c.startTime || Date.now()), token_count: c.tokenCount || 0 }; }), start_time_ms: state.startTime, }).catch(function (err) { // If plugin rejects (e.g. session not found), fall back to REST for this session logger.warn({ sessionKey, err: err.message }, 'Plugin update failed — falling back to REST'); box.usePlugin = false; var fallbackText = buildStatusText(box, state, activeBoxes, watcher, sessionKey); sharedStatusBox.updatePost(box.postId, fallbackText).catch(function () {}); }); } else { // REST API fallback: format text and PUT update post var text = buildStatusText(box, state, activeBoxes, watcher, sessionKey); sharedStatusBox.updatePost(box.postId, text).catch(function (err) { logger.error({ sessionKey, err }, 'Failed to update status post'); globalMetrics.lastError = err.message; globalMetrics.updatesFailed++; }); } // Persist offsets periodically saveOffsets(config.offsetFile, watcher.sessions); }); // ---- Session Idle (from watcher) ---- watcher.on('session-idle', async (sessionKey, state) => { const box = activeBoxes.get(sessionKey); if (!box) { // Sub-agent completed updateParentWithChild(activeBoxes, watcher, sharedStatusBox, pluginClient, sessionKey, state, logger); return; } // Check all children are complete before marking done const allChildrenDone = checkChildrenComplete(box, watcher); if (!allChildrenDone) { logger.debug({ sessionKey }, 'Parent waiting for child sessions to complete'); return; } // Final update with done status const doneState = { ...state, status: 'done' }; try { if (box.usePlugin && pluginClient) { // Plugin mode: mark session complete await pluginClient.deleteSession(sessionKey); logger.info({ sessionKey, postId: box.postId, mode: 'plugin' }, 'Session complete via plugin'); } else { // REST API fallback var text = buildStatusText(box, doneState, activeBoxes, watcher, sessionKey); await sharedStatusBox.forceFlush(box.postId); await sharedStatusBox.updatePost(box.postId, text); logger.info({ sessionKey, postId: box.postId, mode: 'rest' }, 'Session complete — status box updated'); } } catch (err) { logger.error({ sessionKey, err }, 'Failed to update final status'); } // Clean up — remove from all tracking so session can be re-detected if it becomes active again activeBoxes.delete(sessionKey); watcher.removeSession(sessionKey); monitor.forgetSession(sessionKey); globalMetrics.activeSessions = activeBoxes.size; // Persist final offsets saveOffsets(config.offsetFile, watcher.sessions); }); // ---- Start all subsystems ---- await healthServer.start(); try { watcher.start(); } catch (err) { logger.warn({ err }, 'fs.watch failed — creating transcript dir'); // eslint-disable-next-line security/detect-non-literal-fs-filename fs.mkdirSync(config.transcriptDir, { recursive: true }); watcher.start(); } monitor.start(); logger.info('Status watcher daemon ready'); // ---- Graceful shutdown ---- let shuttingDown = false; async function shutdown(signal) { if (shuttingDown) return; shuttingDown = true; logger.info({ signal }, 'Shutting down gracefully'); // Stop accepting new sessions monitor.stop(); watcher.stop(); // Mark all active boxes as interrupted const updates = []; for (const [sessionKey, box] of activeBoxes) { const state = watcher.getSessionState(sessionKey) || { sessionKey, status: 'interrupted', startTime: Date.now(), lines: [], agentId: box.agentId, depth: 0, tokenCount: 0, children: [], }; const intState = { ...state, status: 'interrupted' }; const text = buildStatusText(box, intState, activeBoxes, watcher, sessionKey); updates.push( sharedStatusBox .updatePost(box.postId, text) .catch((e) => logger.error({ sessionKey, e }, 'Shutdown update failed')), ); } await Promise.allSettled(updates); await sharedStatusBox.flushAll(); // Cleanup await healthServer.stop(); sharedStatusBox.destroy(); if (pluginClient) pluginClient.destroy(); removePidFile(config.pidFile); logger.info('Shutdown complete'); process.exit(0); } process.on('SIGTERM', () => shutdown('SIGTERM')); process.on('SIGINT', () => shutdown('SIGINT')); // Offset persistence interval (every 30s) setInterval(() => { saveOffsets(config.offsetFile, watcher.sessions); }, 30000); } // ---- Helper functions ---- function linkSubAgent(activeBoxes, watcher, parentKey, childInfo, logger) { const parentBox = activeBoxes.get(parentKey); if (!parentBox) return; const { sessionKey, transcriptFile, agentId, spawnDepth } = childInfo; if (!parentBox.children) parentBox.children = new Map(); parentBox.children.set(sessionKey, { sessionKey, transcriptFile, agentId }); watcher.addSession(sessionKey, transcriptFile, { agentId, depth: spawnDepth || 1, }); logger.info({ sessionKey, parent: parentKey }, 'Sub-agent linked to parent'); } function buildInitialText(agentId, sessionKey) { const { format } = require('./status-formatter'); return format({ sessionKey, status: 'active', startTime: Date.now(), lines: [], agentId, depth: 0, tokenCount: 0, children: [], }); } function buildStatusText(box, state, activeBoxes, watcher, _sessionKey) { const { format } = require('./status-formatter'); // Build child states for nesting const childStates = []; if (box.children && box.children.size > 0) { for (const [childKey] of box.children) { const childWatcherState = watcher.getSessionState(childKey); if (childWatcherState) { childStates.push({ ...childWatcherState, depth: 1 }); } } } return format({ ...state, children: childStates }); } function updateParentWithChild(activeBoxes, watcher, statusBox, pluginClient, childKey, childState, logger) { // Find parent for (var entry of activeBoxes.entries()) { var parentKey = entry[0]; var box = entry[1]; if (box.children && box.children.has(childKey)) { var parentState = watcher.getSessionState(parentKey); if (!parentState) return; if (box.usePlugin && pluginClient) { // Plugin mode: send structured update for parent pluginClient.updateSession(parentKey, { status: parentState.status, lines: parentState.lines, elapsed_ms: Date.now() - parentState.startTime, token_count: parentState.tokenCount || 0, children: [], start_time_ms: parentState.startTime, }).catch(function (err) { logger.error({ parentKey, childKey, err: err.message }, 'Failed to update parent via plugin'); }); } else { var text = buildStatusText(box, parentState, activeBoxes, watcher, parentKey); statusBox .updatePost(box.postId, text) .catch(function (err) { logger.error({ parentKey, childKey, err }, 'Failed to update parent'); }); } return; } } } function checkChildrenComplete(box, watcher) { if (!box.children || box.children.size === 0) return true; for (const [childKey] of box.children) { const childState = watcher.getSessionState(childKey); if (childState && childState.status === 'active') return false; } return true; } async function findExistingPost(statusBox, channelId, sessionKey, logger) { // Search channel history for a post with our marker // This uses the Mattermost search API const marker = makeMarker(sessionKey); try { // Use the internal _apiCall method to search posts const result = await statusBox._apiCallWithRetry('POST', '/api/v4/posts/search', { channel_id: channelId, terms: marker, is_or_search: false, }); if (result && result.posts) { for (const post of Object.values(result.posts)) { if (post.message && post.message.includes(marker)) { logger.info( { sessionKey, postId: post.id }, 'Found existing status post (restart recovery)', ); return post.id; } } } } catch (_e) { /* search failed — create new post */ } return null; } // ---- Stop command ---- function stopDaemon() { // Need to read config for pidFile location process.env.MM_BOT_TOKEN = process.env.MM_BOT_TOKEN || 'placeholder'; let config; try { config = getConfig(); } catch (_e) { config = { pidFile: '/tmp/status-watcher.pid' }; } const pid = readPidFile(config.pidFile); if (!pid) { console.log('Daemon not running (no PID file)'); return; } if (!isProcessRunning(pid)) { console.log(`Daemon not running (PID ${pid} not found)`); removePidFile(config.pidFile); return; } console.log(`Stopping daemon (PID ${pid})...`); process.kill(pid, 'SIGTERM'); } // ---- Status command ---- function daemonStatus() { process.env.MM_BOT_TOKEN = process.env.MM_BOT_TOKEN || 'placeholder'; let config; try { config = getConfig(); } catch (_e) { config = { pidFile: '/tmp/status-watcher.pid', healthPort: 9090 }; } const pid = readPidFile(config.pidFile); if (!pid) { console.log('Status: stopped'); return; } if (!isProcessRunning(pid)) { console.log(`Status: stopped (stale PID ${pid})`); return; } console.log(`Status: running (PID ${pid})`); console.log(`Health: http://localhost:${config.healthPort}/health`); }