diff --git a/.prettierignore b/.prettierignore index da84ba4..f082476 100644 --- a/.prettierignore +++ b/.prettierignore @@ -2,3 +2,4 @@ node_modules/ coverage/ dist/ package-lock.json +Makefile diff --git a/Makefile b/Makefile index f5176a1..fa7fd33 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,9 @@ export NODE_ENV := development export NODE_PATH := /usr/local/lib/node_modules -.PHONY: check install test lint fmt fmt-check secret-scan +.PHONY: check install test test-integration lint fmt fmt-check secret-scan -check: install lint fmt-check secret-scan test +check: install lint fmt-check secret-scan test test-integration install: npm install @@ -11,6 +11,9 @@ install: test: node --test test/unit/*.test.js +test-integration: + node --test test/integration/*.test.js + lint: eslint . diff --git a/package.json b/package.json index 79c72f3..b8e874a 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "stop": "node src/watcher-manager.js stop", "status": "node src/watcher-manager.js status", "test": "node --test test/unit/*.test.js", - "test:integration": "node --test test/integration/*.test.js" + "test-integration": "node --test test/integration/*.test.js" }, "dependencies": { "pino": "^9.14.0" diff --git a/src/session-monitor.js b/src/session-monitor.js new file mode 100644 index 0000000..744b2d9 --- /dev/null +++ b/src/session-monitor.js @@ -0,0 +1,281 @@ +'use strict'; + +/** + * session-monitor.js — Polls sessions.json every 2s to detect new/ended sessions. + * + * sessions.json format (per agent): + * { + * "agent:main:mattermost:channel:abc123:thread:xyz": { + * "sessionId": "uuid", + * "spawnedBy": null | "agent:main:...", + * "spawnDepth": 0, + * "label": "proj035-planner", + * "channel": "mattermost" + * } + * } + * + * Emits: + * 'session-added' ({ sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId }) + * 'session-removed' (sessionKey) + */ + +const fs = require('fs'); +const path = require('path'); +const { EventEmitter } = require('events'); + +class SessionMonitor extends EventEmitter { + /** + * @param {object} opts + * @param {string} opts.transcriptDir - Base /home/node/.openclaw/agents directory + * @param {number} [opts.pollMs] - Poll interval in ms (default 2000) + * @param {string|null} [opts.defaultChannel] - Fallback channel ID for non-MM sessions + * @param {object} [opts.logger] - pino logger + */ + constructor(opts) { + super(); + this.transcriptDir = opts.transcriptDir; + this.pollMs = opts.pollMs || 2000; + this.defaultChannel = opts.defaultChannel || null; + this.logger = opts.logger || null; + + // Map + this._knownSessions = new Map(); + this._pollTimer = null; + this._running = false; + } + + start() { + if (this._running) return; + this._running = true; + + // Initial scan + this._poll(); + + this._pollTimer = setInterval(() => { + this._poll(); + }, this.pollMs); + + if (this.logger) this.logger.info({ pollMs: this.pollMs }, 'SessionMonitor started'); + } + + stop() { + this._running = false; + if (this._pollTimer) { + clearInterval(this._pollTimer); + this._pollTimer = null; + } + if (this.logger) this.logger.info('SessionMonitor stopped'); + } + + /** + * Get all agent directories under transcriptDir. + * @private + * @returns {string[]} Agent IDs + */ + _getAgentDirs() { + try { + // eslint-disable-next-line security/detect-non-literal-fs-filename + return fs.readdirSync(this.transcriptDir).filter((name) => { + try { + // eslint-disable-next-line security/detect-non-literal-fs-filename + return fs.statSync(path.join(this.transcriptDir, name)).isDirectory(); + } catch (_e) { + return false; + } + }); + } catch (_e) { + return []; + } + } + + /** + * Read sessions.json for a given agent. + * @private + * @param {string} agentId + * @returns {object} Sessions map + */ + _readSessionsJson(agentId) { + const sessionsPath = path.join(this.transcriptDir, agentId, 'sessions', 'sessions.json'); + try { + // eslint-disable-next-line security/detect-non-literal-fs-filename + const raw = fs.readFileSync(sessionsPath, 'utf8'); + return JSON.parse(raw); + } catch (_e) { + return {}; + } + } + + /** + * Resolve the transcript file path for a session. + * @private + * @param {string} agentId + * @param {string} sessionId - UUID + * @returns {string} + */ + _transcriptPath(agentId, sessionId) { + return path.join(this.transcriptDir, agentId, 'sessions', `${sessionId}.jsonl`); + } + + /** + * Parse channel ID from session key. + * Session key format: "agent:main:mattermost:channel:{channelId}:thread:{threadId}" + * or: "agent:main:mattermost:dm:{userId}" + * @param {string} sessionKey + * @returns {string|null} + */ + static parseChannelId(sessionKey) { + const parts = sessionKey.split(':'); + // agent:main:mattermost:channel:CHANNEL_ID:... + const chanIdx = parts.indexOf('channel'); + if (chanIdx >= 0 && parts[chanIdx + 1]) { + return parts[chanIdx + 1]; // eslint-disable-line security/detect-object-injection + } + // agent:main:mattermost:dm:USER_ID (use as channel) + const dmIdx = parts.indexOf('dm'); + if (dmIdx >= 0 && parts[dmIdx + 1]) { + return parts[dmIdx + 1]; // eslint-disable-line security/detect-object-injection + } + return null; + } + + /** + * Parse root post ID (thread ID) from session key. + * @param {string} sessionKey + * @returns {string|null} + */ + static parseRootPostId(sessionKey) { + const parts = sessionKey.split(':'); + const threadIdx = parts.indexOf('thread'); + if (threadIdx >= 0 && parts[threadIdx + 1]) { + return parts[threadIdx + 1]; // eslint-disable-line security/detect-object-injection + } + return null; + } + + /** + * Extract agent ID from session key. + * @param {string} sessionKey + * @returns {string} + */ + static parseAgentId(sessionKey) { + const parts = sessionKey.split(':'); + if (parts[0] === 'agent' && parts[1]) return parts[1]; + return parts[0] || 'unknown'; + } + + /** + * Determine if a session is a Mattermost session. + * @param {string} sessionKey + * @returns {boolean} + */ + static isMattermostSession(sessionKey) { + return sessionKey.includes(':mattermost:') || sessionKey.includes(':mm:'); + } + + /** + * Poll all agents' sessions.json files for changes. + * @private + */ + _poll() { + if (!this._running) return; + + const agentDirs = this._getAgentDirs(); + const currentSessions = new Map(); + + for (const agentId of agentDirs) { + const sessions = this._readSessionsJson(agentId); + for (const [sessionKey, entry] of Object.entries(sessions)) { + const sessionId = entry.sessionId || entry.uuid; + if (!sessionId) continue; + + currentSessions.set(sessionKey, { + agentId, + sessionKey, + sessionId, + spawnedBy: entry.spawnedBy || null, + spawnDepth: entry.spawnDepth || 0, + label: entry.label || null, + channel: entry.channel || null, + }); + } + } + + // Detect added sessions + for (const [sessionKey, entry] of currentSessions) { + if (!this._knownSessions.has(sessionKey)) { + this._onSessionAdded(entry); + } + } + + // Detect removed sessions + for (const [sessionKey] of this._knownSessions) { + if (!currentSessions.has(sessionKey)) { + this._onSessionRemoved(sessionKey); + } + } + + this._knownSessions = currentSessions; + } + + /** + * Handle a newly detected session. + * @private + */ + _onSessionAdded(entry) { + const { agentId, sessionKey, sessionId, spawnedBy, spawnDepth, label } = entry; + + const transcriptFile = this._transcriptPath(agentId, sessionId); + + // Resolve channel ID from session key + let channelId = SessionMonitor.parseChannelId(sessionKey); + + // Fall back to default channel for non-MM sessions + if (!channelId && !SessionMonitor.isMattermostSession(sessionKey)) { + channelId = this.defaultChannel; + if (!channelId) { + if (this.logger) { + this.logger.debug({ sessionKey }, 'Skipping non-MM session (no channel, no default)'); + } + return; + } + } + + const rootPostId = SessionMonitor.parseRootPostId(sessionKey); + const parsedAgentId = SessionMonitor.parseAgentId(sessionKey); + + if (this.logger) { + this.logger.info({ sessionKey, agentId, channelId, spawnedBy }, 'Session detected'); + } + + this.emit('session-added', { + sessionKey, + transcriptFile, + spawnedBy, + spawnDepth, + channelId, + rootPostId, + agentId: label || parsedAgentId, + }); + } + + /** + * Handle a removed session. + * @private + */ + _onSessionRemoved(sessionKey) { + if (this.logger) { + this.logger.info({ sessionKey }, 'Session ended'); + } + this.emit('session-removed', sessionKey); + } + + /** + * Get list of currently known sessions. + * @returns {Map} + */ + getKnownSessions() { + return new Map(this._knownSessions); + } +} + +module.exports = { SessionMonitor }; diff --git a/src/watcher-manager.js b/src/watcher-manager.js new file mode 100644 index 0000000..57c9089 --- /dev/null +++ b/src/watcher-manager.js @@ -0,0 +1,533 @@ +#!/usr/bin/env node +'use strict'; + +/* eslint-disable no-console */ + +/** + * watcher-manager.js — Top-level orchestrator for the Live Status v4 daemon. + * + * CLI: node watcher-manager.js start|stop|status + * + * Architecture: + * - SessionMonitor polls sessions.json every 2s for new/ended sessions + * - StatusWatcher watches transcript files via fs.watch (inotify) + * - StatusBox manages Mattermost posts (throttle, circuit breaker, retry) + * - HealthServer exposes /health endpoint + * - Offset persistence: save/restore last read positions on restart + * - Graceful shutdown: SIGTERM/SIGINT -> mark all boxes interrupted -> exit 0 + */ + +const fs = require('fs'); + +const { getConfig } = require('./config'); +const { getLogger } = require('./logger'); +const { SessionMonitor } = require('./session-monitor'); +const { StatusWatcher } = require('./status-watcher'); +const { StatusBox } = require('./status-box'); +// status-formatter is used inline via require() in helpers +const { HealthServer } = require('./health'); +const { loadLabels } = require('./tool-labels'); + +// ---- CLI Router ---- +const cmd = process.argv[2]; +if (cmd === 'start') { + startDaemon().catch((err) => { + console.error('Failed to start:', err.message); + process.exit(1); + }); +} else if (cmd === 'stop') { + stopDaemon(); +} else if (cmd === 'status') { + daemonStatus(); +} else { + console.log('Usage: node watcher-manager.js '); + process.exit(1); +} + +// ---- PID File helpers ---- +function writePidFile(pidFile) { + // eslint-disable-next-line security/detect-non-literal-fs-filename + fs.writeFileSync(pidFile, String(process.pid), 'utf8'); +} + +function readPidFile(pidFile) { + try { + // eslint-disable-next-line security/detect-non-literal-fs-filename + return parseInt(fs.readFileSync(pidFile, 'utf8').trim(), 10); + } catch (_e) { + return null; + } +} + +function removePidFile(pidFile) { + try { + // eslint-disable-next-line security/detect-non-literal-fs-filename + fs.unlinkSync(pidFile); + } catch (_e) { + /* ignore */ + } +} + +function isProcessRunning(pid) { + try { + process.kill(pid, 0); + return true; + } catch (_e) { + return false; + } +} + +// ---- Offset persistence ---- +function loadOffsets(offsetFile) { + try { + // eslint-disable-next-line security/detect-non-literal-fs-filename + return JSON.parse(fs.readFileSync(offsetFile, 'utf8')); + } catch (_e) { + return {}; + } +} + +function saveOffsets(offsetFile, sessions) { + const offsets = {}; + for (const [key, state] of sessions) { + offsets[key] = { + lastOffset: state.lastOffset || 0, + transcriptFile: state.transcriptFile, + startTime: state.startTime, + }; + } + try { + // eslint-disable-next-line security/detect-non-literal-fs-filename + fs.writeFileSync(offsetFile, JSON.stringify(offsets, null, 2), 'utf8'); + } catch (_e) { + /* ignore write error */ + } +} + +// ---- Status box marker for post recovery ---- +function makeMarker(sessionKey) { + return ``; +} + +// ---- Main daemon ---- +async function startDaemon() { + const config = getConfig(); + const logger = getLogger(); + + // Check if already running + const existingPid = readPidFile(config.pidFile); + if (existingPid && isProcessRunning(existingPid)) { + console.error(`Daemon already running (PID ${existingPid})`); + process.exit(1); + } + removePidFile(config.pidFile); + + // Write PID file + writePidFile(config.pidFile); + logger.info({ pid: process.pid, pidFile: config.pidFile }, 'Status watcher daemon starting'); + + // Load tool labels + loadLabels(config.toolLabelsFile); + + // Load persisted offsets for restart recovery + const savedOffsets = loadOffsets(config.offsetFile); + logger.info({ count: Object.keys(savedOffsets).length }, 'Loaded persisted session offsets'); + + // Shared state + // Map + const activeBoxes = new Map(); + let globalMetrics = { + activeSessions: 0, + updatesSent: 0, + updatesFailed: 0, + queueDepth: 0, + lastError: null, + circuit: { state: 'unknown' }, + }; + + // Shared StatusBox instance (single http.Agent pool) + const sharedStatusBox = new StatusBox({ + baseUrl: config.mm.baseUrl, + token: config.mm.token, + logger: logger.child({ module: 'status-box' }), + throttleMs: config.throttleMs, + maxMessageChars: config.maxMessageChars, + maxRetries: config.maxRetries, + maxSockets: config.mm.maxSockets, + }); + + // StatusWatcher + const watcher = new StatusWatcher({ + transcriptDir: config.transcriptDir, + idleTimeoutS: config.idleTimeoutS, + logger: logger.child({ module: 'status-watcher' }), + }); + + // SessionMonitor + const monitor = new SessionMonitor({ + transcriptDir: config.transcriptDir, + pollMs: config.sessionPollMs, + defaultChannel: config.defaultChannel, + logger: logger.child({ module: 'session-monitor' }), + }); + + // Health server + const healthServer = new HealthServer({ + port: config.healthPort, + logger: logger.child({ module: 'health' }), + getMetrics: () => ({ + ...globalMetrics, + ...sharedStatusBox.getMetrics(), + activeSessions: activeBoxes.size, + }), + }); + + // ---- Session Added ---- + monitor.on('session-added', async (info) => { + const { sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId } = info; + + // Skip if no channel + if (!channelId) { + logger.debug({ sessionKey }, 'No channel for session — skipping'); + return; + } + + // Enforce MAX_ACTIVE_SESSIONS + if (activeBoxes.size >= config.maxActiveSessions) { + logger.warn( + { sessionKey, maxActiveSessions: config.maxActiveSessions }, + 'Max active sessions reached — dropping session', + ); + return; + } + + // Sub-agent: skip creating own post (embedded in parent) + if (spawnedBy && activeBoxes.has(spawnedBy)) { + const parentBox = activeBoxes.get(spawnedBy); + // Link child to parent's session state + const childState = { + sessionKey, + transcriptFile, + spawnedBy, + parentPostId: parentBox.postId, + channelId, + depth: info.spawnDepth || 0, + agentId, + }; + parentBox.children = parentBox.children || new Map(); + parentBox.children.set(sessionKey, childState); + + // Register in watcher + watcher.addSession(sessionKey, transcriptFile, { + agentId, + depth: info.spawnDepth || 1, + }); + logger.info({ sessionKey, parent: spawnedBy }, 'Sub-agent linked to parent'); + return; + } + + let postId; + + // Check for existing post (restart recovery) + const saved = savedOffsets[sessionKey]; // eslint-disable-line security/detect-object-injection + if (saved) { + // Try to find existing post in channel history + postId = await findExistingPost(sharedStatusBox, channelId, sessionKey, logger); + } + + // Create new post if none found + if (!postId) { + try { + const initialText = buildInitialText(agentId, sessionKey); + postId = await sharedStatusBox.createPost(channelId, initialText, rootPostId); + logger.info({ sessionKey, postId, channelId }, 'Created status box'); + } catch (err) { + logger.error({ sessionKey, err }, 'Failed to create status post'); + globalMetrics.lastError = err.message; + return; + } + } + + activeBoxes.set(sessionKey, { + postId, + channelId, + agentId, + rootPostId, + children: new Map(), + }); + globalMetrics.activeSessions = activeBoxes.size; + + // Register in watcher + const initialState = saved + ? { lastOffset: saved.lastOffset, startTime: saved.startTime, agentId } + : { agentId }; + watcher.addSession(sessionKey, transcriptFile, initialState); + }); + + // ---- Session Removed ---- + monitor.on('session-removed', (sessionKey) => { + // Don't immediately remove — let idle detection handle final flush + logger.debug({ sessionKey }, 'Session removed from sessions.json'); + }); + + // ---- Session Update (from watcher) ---- + watcher.on('session-update', (sessionKey, state) => { + const box = activeBoxes.get(sessionKey); + if (!box) { + // Sub-agent: update parent + updateParentWithChild(activeBoxes, watcher, sharedStatusBox, sessionKey, state, logger); + return; + } + + // Build status text + const text = buildStatusText(box, state, activeBoxes, watcher, sessionKey); + sharedStatusBox.updatePost(box.postId, text).catch((err) => { + logger.error({ sessionKey, err }, 'Failed to update status post'); + globalMetrics.lastError = err.message; + globalMetrics.updatesFailed++; + }); + + // Persist offsets periodically + saveOffsets(config.offsetFile, watcher.sessions); + }); + + // ---- Session Idle (from watcher) ---- + watcher.on('session-idle', async (sessionKey, state) => { + const box = activeBoxes.get(sessionKey); + if (!box) { + // Sub-agent completed + updateParentWithChild(activeBoxes, watcher, sharedStatusBox, sessionKey, state, logger); + return; + } + + // Check all children are complete before marking done + const allChildrenDone = checkChildrenComplete(box, watcher); + if (!allChildrenDone) { + logger.debug({ sessionKey }, 'Parent waiting for child sessions to complete'); + return; + } + + // Final update with done status + const doneState = { ...state, status: 'done' }; + const text = buildStatusText(box, doneState, activeBoxes, watcher, sessionKey); + + try { + await sharedStatusBox.forceFlush(box.postId); + await sharedStatusBox.updatePost(box.postId, text); + logger.info({ sessionKey, postId: box.postId }, 'Session complete — status box updated'); + } catch (err) { + logger.error({ sessionKey, err }, 'Failed to update final status'); + } + + // Clean up + activeBoxes.delete(sessionKey); + watcher.removeSession(sessionKey); + globalMetrics.activeSessions = activeBoxes.size; + + // Persist final offsets + saveOffsets(config.offsetFile, watcher.sessions); + }); + + // ---- Start all subsystems ---- + await healthServer.start(); + + try { + watcher.start(); + } catch (err) { + logger.warn({ err }, 'fs.watch failed — creating transcript dir'); + // eslint-disable-next-line security/detect-non-literal-fs-filename + fs.mkdirSync(config.transcriptDir, { recursive: true }); + watcher.start(); + } + + monitor.start(); + + logger.info('Status watcher daemon ready'); + + // ---- Graceful shutdown ---- + let shuttingDown = false; + async function shutdown(signal) { + if (shuttingDown) return; + shuttingDown = true; + + logger.info({ signal }, 'Shutting down gracefully'); + + // Stop accepting new sessions + monitor.stop(); + watcher.stop(); + + // Mark all active boxes as interrupted + const updates = []; + for (const [sessionKey, box] of activeBoxes) { + const state = watcher.getSessionState(sessionKey) || { + sessionKey, + status: 'interrupted', + startTime: Date.now(), + lines: [], + agentId: box.agentId, + depth: 0, + tokenCount: 0, + children: [], + }; + const intState = { ...state, status: 'interrupted' }; + const text = buildStatusText(box, intState, activeBoxes, watcher, sessionKey); + updates.push( + sharedStatusBox + .updatePost(box.postId, text) + .catch((e) => logger.error({ sessionKey, e }, 'Shutdown update failed')), + ); + } + await Promise.allSettled(updates); + await sharedStatusBox.flushAll(); + + // Cleanup + await healthServer.stop(); + sharedStatusBox.destroy(); + removePidFile(config.pidFile); + + logger.info('Shutdown complete'); + process.exit(0); + } + + process.on('SIGTERM', () => shutdown('SIGTERM')); + process.on('SIGINT', () => shutdown('SIGINT')); + + // Offset persistence interval (every 30s) + setInterval(() => { + saveOffsets(config.offsetFile, watcher.sessions); + }, 30000); +} + +// ---- Helper functions ---- + +function buildInitialText(agentId, sessionKey) { + const { format } = require('./status-formatter'); + return format({ + sessionKey, + status: 'active', + startTime: Date.now(), + lines: [], + agentId, + depth: 0, + tokenCount: 0, + children: [], + }); +} + +function buildStatusText(box, state, activeBoxes, watcher, _sessionKey) { + const { format } = require('./status-formatter'); + + // Build child states for nesting + const childStates = []; + if (box.children && box.children.size > 0) { + for (const [childKey] of box.children) { + const childWatcherState = watcher.getSessionState(childKey); + if (childWatcherState) { + childStates.push({ ...childWatcherState, depth: 1 }); + } + } + } + + return format({ ...state, children: childStates }); +} + +function updateParentWithChild(activeBoxes, watcher, statusBox, childKey, childState, logger) { + // Find parent + for (const [parentKey, box] of activeBoxes) { + if (box.children && box.children.has(childKey)) { + const parentState = watcher.getSessionState(parentKey); + if (!parentState) return; + + const text = buildStatusText(box, parentState, activeBoxes, watcher, parentKey); + statusBox + .updatePost(box.postId, text) + .catch((err) => logger.error({ parentKey, childKey, err }, 'Failed to update parent')); + return; + } + } +} + +function checkChildrenComplete(box, watcher) { + if (!box.children || box.children.size === 0) return true; + for (const [childKey] of box.children) { + const childState = watcher.getSessionState(childKey); + if (childState && childState.status === 'active') return false; + } + return true; +} + +async function findExistingPost(statusBox, channelId, sessionKey, logger) { + // Search channel history for a post with our marker + // This uses the Mattermost search API + const marker = makeMarker(sessionKey); + try { + // Use the internal _apiCall method to search posts + const result = await statusBox._apiCallWithRetry('POST', '/api/v4/posts/search', { + channel_id: channelId, + terms: marker, + is_or_search: false, + }); + if (result && result.posts) { + for (const post of Object.values(result.posts)) { + if (post.message && post.message.includes(marker)) { + logger.info( + { sessionKey, postId: post.id }, + 'Found existing status post (restart recovery)', + ); + return post.id; + } + } + } + } catch (_e) { + /* search failed — create new post */ + } + return null; +} + +// ---- Stop command ---- +function stopDaemon() { + // Need to read config for pidFile location + process.env.MM_BOT_TOKEN = process.env.MM_BOT_TOKEN || 'placeholder'; + let config; + try { + config = getConfig(); + } catch (_e) { + config = { pidFile: '/tmp/status-watcher.pid' }; + } + + const pid = readPidFile(config.pidFile); + if (!pid) { + console.log('Daemon not running (no PID file)'); + return; + } + if (!isProcessRunning(pid)) { + console.log(`Daemon not running (PID ${pid} not found)`); + removePidFile(config.pidFile); + return; + } + console.log(`Stopping daemon (PID ${pid})...`); + process.kill(pid, 'SIGTERM'); +} + +// ---- Status command ---- +function daemonStatus() { + process.env.MM_BOT_TOKEN = process.env.MM_BOT_TOKEN || 'placeholder'; + let config; + try { + config = getConfig(); + } catch (_e) { + config = { pidFile: '/tmp/status-watcher.pid', healthPort: 9090 }; + } + + const pid = readPidFile(config.pidFile); + if (!pid) { + console.log('Status: stopped'); + return; + } + if (!isProcessRunning(pid)) { + console.log(`Status: stopped (stale PID ${pid})`); + return; + } + console.log(`Status: running (PID ${pid})`); + console.log(`Health: http://localhost:${config.healthPort}/health`); +} diff --git a/test/integration/session-monitor.test.js b/test/integration/session-monitor.test.js new file mode 100644 index 0000000..3f4cd24 --- /dev/null +++ b/test/integration/session-monitor.test.js @@ -0,0 +1,251 @@ +'use strict'; + +/** + * Integration tests for session-monitor.js + * Tests session detection by writing mock sessions.json files. + */ + +const { describe, it, beforeEach, afterEach } = require('node:test'); +const assert = require('node:assert/strict'); +const fs = require('fs'); +const path = require('path'); +const os = require('os'); + +const { SessionMonitor } = require('../../src/session-monitor'); + +function createTmpDir() { + return fs.mkdtempSync(path.join(os.tmpdir(), 'sm-test-')); +} + +function writeSessionsJson(dir, agentId, sessions) { + const agentDir = path.join(dir, agentId, 'sessions'); + fs.mkdirSync(agentDir, { recursive: true }); + fs.writeFileSync(path.join(agentDir, 'sessions.json'), JSON.stringify(sessions, null, 2)); +} + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +describe('SessionMonitor', () => { + let tmpDir; + let monitor; + + beforeEach(() => { + tmpDir = createTmpDir(); + }); + + afterEach(() => { + if (monitor) { + monitor.stop(); + monitor = null; + } + try { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } catch (_e) { + /* ignore */ + } + }); + + describe('parseChannelId()', () => { + it('parses channel ID from mattermost session key', () => { + const key = 'agent:main:mattermost:channel:abc123:thread:xyz'; + assert.equal(SessionMonitor.parseChannelId(key), 'abc123'); + }); + + it('parses DM channel', () => { + const key = 'agent:main:mattermost:dm:user456'; + assert.equal(SessionMonitor.parseChannelId(key), 'user456'); + }); + + it('returns null for non-MM session', () => { + const key = 'agent:main:hook:session:xyz'; + assert.equal(SessionMonitor.parseChannelId(key), null); + }); + }); + + describe('parseRootPostId()', () => { + it('parses thread ID from session key', () => { + const key = 'agent:main:mattermost:channel:abc123:thread:rootpost999'; + assert.equal(SessionMonitor.parseRootPostId(key), 'rootpost999'); + }); + + it('returns null if no thread', () => { + const key = 'agent:main:mattermost:channel:abc123'; + assert.equal(SessionMonitor.parseRootPostId(key), null); + }); + }); + + describe('parseAgentId()', () => { + it('extracts agent ID', () => { + assert.equal(SessionMonitor.parseAgentId('agent:main:mattermost:channel:abc'), 'main'); + assert.equal(SessionMonitor.parseAgentId('agent:coder-agent:session'), 'coder-agent'); + }); + }); + + describe('isMattermostSession()', () => { + it('detects mattermost sessions', () => { + assert.equal(SessionMonitor.isMattermostSession('agent:main:mattermost:channel:abc'), true); + assert.equal(SessionMonitor.isMattermostSession('agent:main:hook:abc'), false); + }); + }); + + describe('session-added event', () => { + it('emits session-added for new session in sessions.json', async () => { + const sessionKey = 'agent:main:mattermost:channel:testchan:thread:testroot'; + const sessionId = 'aaaa-1111-bbbb-2222'; + + writeSessionsJson(tmpDir, 'main', { + [sessionKey]: { + sessionId, + spawnedBy: null, + spawnDepth: 0, + label: null, + channel: 'mattermost', + }, + }); + + monitor = new SessionMonitor({ + transcriptDir: tmpDir, + pollMs: 50, + }); + + const added = []; + monitor.on('session-added', (info) => added.push(info)); + monitor.start(); + + await sleep(200); + + assert.equal(added.length, 1); + assert.equal(added[0].sessionKey, sessionKey); + assert.equal(added[0].channelId, 'testchan'); + assert.equal(added[0].rootPostId, 'testroot'); + assert.ok(added[0].transcriptFile.endsWith(`${sessionId}.jsonl`)); + }); + + it('emits session-removed when session disappears', async () => { + const sessionKey = 'agent:main:mattermost:channel:testchan:thread:testroot'; + const sessionId = 'cccc-3333-dddd-4444'; + + writeSessionsJson(tmpDir, 'main', { + [sessionKey]: { sessionId, spawnedBy: null, spawnDepth: 0, channel: 'mattermost' }, + }); + + monitor = new SessionMonitor({ transcriptDir: tmpDir, pollMs: 50 }); + + const removed = []; + monitor.on('session-removed', (key) => removed.push(key)); + monitor.start(); + + await sleep(200); + assert.equal(removed.length, 0); + + // Remove the session + writeSessionsJson(tmpDir, 'main', {}); + + await sleep(200); + assert.equal(removed.length, 1); + assert.equal(removed[0], sessionKey); + }); + + it('skips non-MM sessions with no default channel', async () => { + const sessionKey = 'agent:main:hook:session:xyz'; + writeSessionsJson(tmpDir, 'main', { + [sessionKey]: { sessionId: 'hook-uuid', channel: 'hook' }, + }); + + monitor = new SessionMonitor({ transcriptDir: tmpDir, pollMs: 50, defaultChannel: null }); + + const added = []; + monitor.on('session-added', (info) => added.push(info)); + monitor.start(); + + await sleep(200); + assert.equal(added.length, 0); + }); + + it('includes non-MM sessions when defaultChannel is set', async () => { + const sessionKey = 'agent:main:hook:session:xyz'; + writeSessionsJson(tmpDir, 'main', { + [sessionKey]: { sessionId: 'hook-uuid', channel: 'hook' }, + }); + + monitor = new SessionMonitor({ + transcriptDir: tmpDir, + pollMs: 50, + defaultChannel: 'default-channel-id', + }); + + const added = []; + monitor.on('session-added', (info) => added.push(info)); + monitor.start(); + + await sleep(200); + assert.equal(added.length, 1); + assert.equal(added[0].channelId, 'default-channel-id'); + }); + + it('detects multiple agents', async () => { + writeSessionsJson(tmpDir, 'main', { + 'agent:main:mattermost:channel:c1:thread:t1': { + sessionId: 'sess-main', + spawnedBy: null, + channel: 'mattermost', + }, + }); + writeSessionsJson(tmpDir, 'coder-agent', { + 'agent:coder-agent:mattermost:channel:c2:thread:t2': { + sessionId: 'sess-coder', + spawnedBy: null, + channel: 'mattermost', + }, + }); + + monitor = new SessionMonitor({ transcriptDir: tmpDir, pollMs: 50 }); + + const added = []; + monitor.on('session-added', (info) => added.push(info)); + monitor.start(); + + await sleep(200); + assert.equal(added.length, 2); + const keys = added.map((s) => s.sessionKey).sort(); + assert.ok(keys.includes('agent:main:mattermost:channel:c1:thread:t1')); + assert.ok(keys.includes('agent:coder-agent:mattermost:channel:c2:thread:t2')); + }); + + it('handles malformed sessions.json gracefully', async () => { + const agentDir = path.join(tmpDir, 'main', 'sessions'); + fs.mkdirSync(agentDir, { recursive: true }); + fs.writeFileSync(path.join(agentDir, 'sessions.json'), 'not valid json'); + + monitor = new SessionMonitor({ transcriptDir: tmpDir, pollMs: 50 }); + + const added = []; + monitor.on('session-added', (info) => added.push(info)); + monitor.start(); + + await sleep(200); + // Should not throw and should produce no sessions + assert.equal(added.length, 0); + }); + }); + + describe('getKnownSessions()', () => { + it('returns current known sessions', async () => { + const sessionKey = 'agent:main:mattermost:channel:c1:thread:t1'; + writeSessionsJson(tmpDir, 'main', { + [sessionKey]: { sessionId: 'test-uuid', channel: 'mattermost' }, + }); + + monitor = new SessionMonitor({ transcriptDir: tmpDir, pollMs: 50 }); + monitor.start(); + + await sleep(200); + + const sessions = monitor.getKnownSessions(); + assert.equal(sessions.size, 1); + assert.ok(sessions.has(sessionKey)); + }); + }); +}); diff --git a/test/integration/status-watcher.test.js b/test/integration/status-watcher.test.js new file mode 100644 index 0000000..b8c945b --- /dev/null +++ b/test/integration/status-watcher.test.js @@ -0,0 +1,267 @@ +'use strict'; + +/** + * Integration tests for status-watcher.js + * Tests JSONL file watching and event emission. + */ + +const { describe, it, beforeEach, afterEach } = require('node:test'); +const assert = require('node:assert/strict'); +const fs = require('fs'); +const path = require('path'); +const os = require('os'); + +const { StatusWatcher } = require('../../src/status-watcher'); + +function createTmpDir() { + return fs.mkdtempSync(path.join(os.tmpdir(), 'sw-test-')); +} + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function appendLine(file, obj) { + fs.appendFileSync(file, JSON.stringify(obj) + '\n'); +} + +describe('StatusWatcher', () => { + let tmpDir; + let watcher; + + beforeEach(() => { + tmpDir = createTmpDir(); + }); + + afterEach(() => { + if (watcher) { + watcher.stop(); + watcher = null; + } + try { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } catch (_e) { + /* ignore */ + } + }); + + describe('session management', () => { + it('addSession() registers a session', () => { + watcher = new StatusWatcher({ transcriptDir: tmpDir, idleTimeoutS: 600 }); + const file = path.join(tmpDir, 'test.jsonl'); + fs.writeFileSync(file, ''); + + watcher.addSession('test:key', file); + assert.ok(watcher.sessions.has('test:key')); + assert.ok(watcher.fileToSession.has(file)); + }); + + it('removeSession() cleans up', () => { + watcher = new StatusWatcher({ transcriptDir: tmpDir, idleTimeoutS: 600 }); + const file = path.join(tmpDir, 'test.jsonl'); + fs.writeFileSync(file, ''); + + watcher.addSession('test:key', file); + watcher.removeSession('test:key'); + assert.ok(!watcher.sessions.has('test:key')); + assert.ok(!watcher.fileToSession.has(file)); + }); + + it('getSessionState() returns state', () => { + watcher = new StatusWatcher({ transcriptDir: tmpDir, idleTimeoutS: 600 }); + const file = path.join(tmpDir, 'test.jsonl'); + fs.writeFileSync(file, ''); + watcher.addSession('test:key', file); + const state = watcher.getSessionState('test:key'); + assert.ok(state); + assert.equal(state.sessionKey, 'test:key'); + }); + }); + + describe('JSONL parsing', () => { + it('reads existing content on addSession', () => { + const file = path.join(tmpDir, 'test.jsonl'); + fs.writeFileSync(file, ''); + appendLine(file, { type: 'assistant', text: 'Hello world' }); + appendLine(file, { type: 'tool_call', name: 'exec', id: '1' }); + + watcher = new StatusWatcher({ transcriptDir: tmpDir, idleTimeoutS: 600 }); + watcher.addSession('test:key', file); + + const state = watcher.getSessionState('test:key'); + assert.ok(state.lines.length > 0); + assert.ok(state.lines.some((l) => l.includes('exec'))); + }); + + it('emits session-update when file changes', async () => { + const file = path.join(tmpDir, 'test.jsonl'); + fs.writeFileSync(file, ''); + + watcher = new StatusWatcher({ transcriptDir: tmpDir, idleTimeoutS: 600 }); + watcher.start(); + watcher.addSession('test:key', file); + + const updates = []; + watcher.on('session-update', (key, state) => updates.push({ key, state })); + + await sleep(100); + appendLine(file, { type: 'assistant', text: 'Starting task...' }); + await sleep(500); + + assert.ok(updates.length > 0); + assert.equal(updates[0].key, 'test:key'); + }); + + it('parses tool_call and tool_result pairs', () => { + const file = path.join(tmpDir, 'test.jsonl'); + fs.writeFileSync(file, ''); + appendLine(file, { type: 'tool_call', name: 'exec', id: '1' }); + appendLine(file, { type: 'tool_result', name: 'exec', id: '1' }); + + watcher = new StatusWatcher({ transcriptDir: tmpDir, idleTimeoutS: 600 }); + watcher.addSession('test:key', file); + + const state = watcher.getSessionState('test:key'); + assert.equal(state.pendingToolCalls, 0); + // Line should show [OK] + const execLine = state.lines.find((l) => l.includes('exec')); + assert.ok(execLine); + assert.ok(execLine.includes('[OK]')); + }); + + it('tracks pendingToolCalls correctly', () => { + const file = path.join(tmpDir, 'test.jsonl'); + fs.writeFileSync(file, ''); + appendLine(file, { type: 'tool_call', name: 'exec', id: '1' }); + appendLine(file, { type: 'tool_call', name: 'Read', id: '2' }); + appendLine(file, { type: 'tool_result', name: 'exec', id: '1' }); + + watcher = new StatusWatcher({ transcriptDir: tmpDir, idleTimeoutS: 600 }); + watcher.addSession('test:key', file); + + const state = watcher.getSessionState('test:key'); + assert.equal(state.pendingToolCalls, 1); // Read still pending + }); + + it('tracks token usage', () => { + const file = path.join(tmpDir, 'test.jsonl'); + fs.writeFileSync(file, ''); + appendLine(file, { type: 'usage', input_tokens: 1000, output_tokens: 500 }); + + watcher = new StatusWatcher({ transcriptDir: tmpDir, idleTimeoutS: 600 }); + watcher.addSession('test:key', file); + + const state = watcher.getSessionState('test:key'); + assert.equal(state.tokenCount, 1500); + }); + + it('detects file truncation (compaction)', () => { + const file = path.join(tmpDir, 'test.jsonl'); + + // Write some content + fs.writeFileSync( + file, + JSON.stringify({ type: 'assistant', text: 'Hello' }) + + '\n' + + JSON.stringify({ type: 'tool_call', name: 'exec', id: '1' }) + + '\n', + ); + + watcher = new StatusWatcher({ transcriptDir: tmpDir, idleTimeoutS: 600 }); + watcher.addSession('test:key', file); + + const state = watcher.getSessionState('test:key'); + const originalOffset = state.lastOffset; + assert.ok(originalOffset > 0); + + // Truncate the file (simulate compaction) + fs.writeFileSync(file, JSON.stringify({ type: 'assistant', text: 'Resumed' }) + '\n'); + + // Force a re-read + watcher._readFile('test:key', state); + + // Offset should have reset + assert.ok( + state.lastOffset < originalOffset || + state.lines.includes('[session compacted - continuing]'), + ); + }); + + it('handles malformed JSONL lines gracefully', () => { + const file = path.join(tmpDir, 'test.jsonl'); + fs.writeFileSync(file, 'not valid json\n{"type":"assistant","text":"ok"}\n'); + + watcher = new StatusWatcher({ transcriptDir: tmpDir, idleTimeoutS: 600 }); + + // Should not throw + assert.doesNotThrow(() => { + watcher.addSession('test:key', file); + }); + + const state = watcher.getSessionState('test:key'); + assert.ok(state); + assert.ok(state.lines.some((l) => l.includes('ok'))); + }); + }); + + describe('idle detection', () => { + it('emits session-idle after timeout with no activity', async () => { + const file = path.join(tmpDir, 'test.jsonl'); + fs.writeFileSync(file, ''); + appendLine(file, { type: 'assistant', text: 'Starting...' }); + + watcher = new StatusWatcher({ transcriptDir: tmpDir, idleTimeoutS: 0.2 }); // 200ms + watcher.addSession('test:key', file); + + const idle = []; + watcher.on('session-idle', (key) => idle.push(key)); + + await sleep(600); + assert.ok(idle.length > 0); + assert.equal(idle[0], 'test:key'); + }); + + it('resets idle timer on new activity', async () => { + const file = path.join(tmpDir, 'test.jsonl'); + fs.writeFileSync(file, ''); + + watcher = new StatusWatcher({ transcriptDir: tmpDir, idleTimeoutS: 0.3 }); // 300ms + watcher.start(); + watcher.addSession('test:key', file); + + const idle = []; + watcher.on('session-idle', (key) => idle.push(key)); + + // Write activity at 150ms (before 300ms timeout) + await sleep(150); + appendLine(file, { type: 'assistant', text: 'Still working...' }); + + // At 350ms: idle timer was reset, so no idle yet + await sleep(200); + assert.equal(idle.length, 0); + + // At 600ms: idle timer fires (150+300=450ms > 200+300=500ms... need to wait full 300ms) + await sleep(400); + assert.equal(idle.length, 1); + }); + }); + + describe('offset recovery', () => { + it('resumes from saved offset', () => { + const file = path.join(tmpDir, 'test.jsonl'); + const line1 = JSON.stringify({ type: 'assistant', text: 'First' }) + '\n'; + const line2 = JSON.stringify({ type: 'assistant', text: 'Second' }) + '\n'; + fs.writeFileSync(file, line1 + line2); + + watcher = new StatusWatcher({ transcriptDir: tmpDir, idleTimeoutS: 600 }); + + // Start with offset at start of line2 (after line1) + watcher.addSession('test:key', file, { lastOffset: line1.length }); + + const state = watcher.getSessionState('test:key'); + // Should only have parsed line2 + assert.ok(state.lines.some((l) => l.includes('Second'))); + assert.ok(!state.lines.some((l) => l.includes('First'))); + }); + }); +});