'use strict'; /** * session-monitor.js — Polls sessions.json every 2s to detect new/ended sessions. * * sessions.json format (per agent): * { * "agent:main:mattermost:channel:abc123:thread:xyz": { * "sessionId": "uuid", * "spawnedBy": null | "agent:main:...", * "spawnDepth": 0, * "label": "proj035-planner", * "channel": "mattermost" * } * } * * Emits: * 'session-added' ({ sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId }) * 'session-removed' (sessionKey) */ const fs = require('fs'); const path = require('path'); const { EventEmitter } = require('events'); class SessionMonitor extends EventEmitter { /** * @param {object} opts * @param {string} opts.transcriptDir - Base /home/node/.openclaw/agents directory * @param {number} [opts.pollMs] - Poll interval in ms (default 2000) * @param {string|null} [opts.defaultChannel] - Fallback channel ID for non-MM sessions * @param {string|null} [opts.mmToken] - Mattermost bot token (for DM channel resolution) * @param {string|null} [opts.mmUrl] - Mattermost base URL * @param {string|null} [opts.botUserId] - Bot's own Mattermost user ID * @param {object} [opts.logger] - pino logger */ constructor(opts) { super(); this.transcriptDir = opts.transcriptDir; this.pollMs = opts.pollMs || 500; this.defaultChannel = opts.defaultChannel || null; this.mmToken = opts.mmToken || null; this.mmUrl = opts.mmUrl || null; this.botUserId = opts.botUserId || null; this.logger = opts.logger || null; // Map this._knownSessions = new Map(); // Set — sessions that were skipped as stale; re-check on next poll this._staleSessions = new Set(); // Cache: "user:XXXX" -> channelId (resolved DM channels) this._dmChannelCache = new Map(); this._pollTimer = null; this._running = false; } start() { if (this._running) return; this._running = true; // Initial scan this._poll(); this._pollTimer = setInterval(() => { this._poll(); }, this.pollMs); if (this.logger) this.logger.info({ pollMs: this.pollMs }, 'SessionMonitor started'); } stop() { this._running = false; if (this._pollTimer) { clearInterval(this._pollTimer); this._pollTimer = null; } if (this.logger) this.logger.info('SessionMonitor stopped'); } /** * Remove a session from known sessions so it can be re-detected on next poll. * Called when the watcher marks a session as idle/done. * @param {string} sessionKey */ forgetSession(sessionKey) { this._knownSessions.delete(sessionKey); } /** * Get all agent directories under transcriptDir. * @private * @returns {string[]} Agent IDs */ _getAgentDirs() { try { // eslint-disable-next-line security/detect-non-literal-fs-filename return fs.readdirSync(this.transcriptDir).filter((name) => { try { // eslint-disable-next-line security/detect-non-literal-fs-filename return fs.statSync(path.join(this.transcriptDir, name)).isDirectory(); } catch (_e) { return false; } }); } catch (_e) { return []; } } /** * Read sessions.json for a given agent. * @private * @param {string} agentId * @returns {object} Sessions map */ _readSessionsJson(agentId) { const sessionsPath = path.join(this.transcriptDir, agentId, 'sessions', 'sessions.json'); try { // eslint-disable-next-line security/detect-non-literal-fs-filename const raw = fs.readFileSync(sessionsPath, 'utf8'); return JSON.parse(raw); } catch (_e) { return {}; } } /** * Resolve the transcript file path for a session. * @private * @param {string} agentId * @param {string} sessionId - UUID * @returns {string} */ _transcriptPath(agentId, sessionId) { const sessionsDir = path.join(this.transcriptDir, agentId, 'sessions'); const directPath = path.join(sessionsDir, `${sessionId}.jsonl`); // OpenClaw may use timestamp-prefixed filenames: {ISO}_{sessionId}.jsonl // Check direct path first, then glob for *_{sessionId}.jsonl if (fs.existsSync(directPath)) { return directPath; } try { const files = fs.readdirSync(sessionsDir); const suffix = `${sessionId}.jsonl`; const match = files.find( (f) => f.endsWith(suffix) && f !== suffix && !f.endsWith('.deleted'), ); if (match) { return path.join(sessionsDir, match); } } catch (_e) { // Directory doesn't exist or unreadable } // Fallback to direct path (will fail with ENOENT, which is handled upstream) return directPath; } /** * Parse channel ID from session key. * Session key format: "agent:main:mattermost:channel:{channelId}:thread:{threadId}" * or: "agent:main:mattermost:dm:{userId}" * @param {string} sessionKey * @returns {string|null} */ static parseChannelId(sessionKey) { const parts = sessionKey.split(':'); // agent:main:mattermost:channel:CHANNEL_ID:... const chanIdx = parts.indexOf('channel'); if (chanIdx >= 0 && parts[chanIdx + 1]) { return parts[chanIdx + 1]; // eslint-disable-line security/detect-object-injection } // agent:main:mattermost:dm:USER_ID (use as channel) const dmIdx = parts.indexOf('dm'); if (dmIdx >= 0 && parts[dmIdx + 1]) { return parts[dmIdx + 1]; // eslint-disable-line security/detect-object-injection } // agent:main:mattermost:direct:USER_ID — DM sessions use "direct" prefix // Channel ID must be resolved via API (returns null here; resolveChannelFromEntry handles it) return null; } /** * Parse root post ID (thread ID) from session key. * @param {string} sessionKey * @returns {string|null} */ static parseRootPostId(sessionKey) { const parts = sessionKey.split(':'); const threadIdx = parts.indexOf('thread'); if (threadIdx >= 0 && parts[threadIdx + 1]) { return parts[threadIdx + 1]; // eslint-disable-line security/detect-object-injection } return null; } /** * Extract agent ID from session key. * @param {string} sessionKey * @returns {string} */ static parseAgentId(sessionKey) { const parts = sessionKey.split(':'); if (parts[0] === 'agent' && parts[1]) return parts[1]; return parts[0] || 'unknown'; } /** * Determine if a session is a Mattermost session. * @param {string} sessionKey * @param {object} [sessionEntry] - Session entry from sessions.json * @returns {boolean} */ static isMattermostSession(sessionKey, sessionEntry) { if (sessionKey.includes(':mattermost:') || sessionKey.includes(':mm:')) return true; // Check deliveryContext/channel/lastChannel in session entry if (sessionEntry) { if (sessionEntry.channel === 'mattermost' || sessionEntry.lastChannel === 'mattermost') return true; const dc = sessionEntry.deliveryContext; if (dc && dc.channel === 'mattermost') return true; } return false; } /** * Resolve channel ID from session entry's deliveryContext/lastTo/origin. * Handles "user:XXXX" format by resolving DM channel via Mattermost API. * @param {object} entry - Session entry from sessions.json * @returns {Promise} Channel ID */ async resolveChannelFromEntry(entry) { // Try deliveryContext.to first, then lastTo, then origin.to const to = (entry.deliveryContext && entry.deliveryContext.to) || entry.lastTo || (entry.origin && entry.origin.to); if (!to) return null; // If it's a channel:XXXX format, extract directly if (to.startsWith('channel:')) { return to.slice(8); } // If it's a user:XXXX format, resolve the DM channel if (to.startsWith('user:')) { const userId = to.slice(5); return this._resolveDmChannel(userId); } return null; } /** * Resolve DM channel ID between the bot and a user via Mattermost API. * @private * @param {string} userId * @returns {Promise} */ async _resolveDmChannel(userId) { const cacheKey = `user:${userId}`; if (this._dmChannelCache.has(cacheKey)) { return this._dmChannelCache.get(cacheKey); } if (!this.mmToken || !this.mmUrl || !this.botUserId) { if (this.logger) this.logger.warn({ userId }, 'Cannot resolve DM channel — missing mmToken/mmUrl/botUserId'); return null; } try { const url = new URL('/api/v4/channels/direct', this.mmUrl); const https = require('https'); const http = require('http'); const transport = url.protocol === 'https:' ? https : http; const channelId = await new Promise((resolve, reject) => { const body = JSON.stringify([this.botUserId, userId]); const req = transport.request({ hostname: url.hostname, port: url.port || (url.protocol === 'https:' ? 443 : 80), path: url.pathname, method: 'POST', headers: { 'Authorization': `Bearer ${this.mmToken}`, 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body), }, rejectUnauthorized: false, }, (res) => { let data = ''; res.on('data', (chunk) => { data += chunk; }); res.on('end', () => { try { const parsed = JSON.parse(data); resolve(parsed.id || null); } catch (_e) { resolve(null); } }); }); req.on('error', (e) => { reject(e); }); req.write(body); req.end(); }); if (channelId) { this._dmChannelCache.set(cacheKey, channelId); if (this.logger) this.logger.info({ userId, channelId }, 'Resolved DM channel'); } return channelId; } catch (err) { if (this.logger) this.logger.warn({ userId, err: err.message }, 'Failed to resolve DM channel'); return null; } } /** * Poll all agents' sessions.json files for changes. * @private */ _poll() { if (!this._running) return; const agentDirs = this._getAgentDirs(); const currentSessions = new Map(); for (const agentId of agentDirs) { const sessions = this._readSessionsJson(agentId); for (const [sessionKey, entry] of Object.entries(sessions)) { const sessionId = entry.sessionId || entry.uuid; if (!sessionId) continue; currentSessions.set(sessionKey, { agentId, sessionKey, sessionId, spawnedBy: entry.spawnedBy || null, spawnDepth: entry.spawnDepth || 0, label: entry.label || null, channel: entry.channel || null, // Preserve full entry for channel resolution deliveryContext: entry.deliveryContext || null, lastTo: entry.lastTo || null, lastChannel: entry.lastChannel || null, origin: entry.origin || null, sessionFile: entry.sessionFile || null, }); } } // Detect new or previously-stale sessions for (const [sessionKey, entry] of currentSessions) { if (!this._knownSessions.has(sessionKey) || this._staleSessions.has(sessionKey)) { this._onSessionAdded(entry); } } // Detect removed sessions for (const [sessionKey] of this._knownSessions) { if (!currentSessions.has(sessionKey)) { this._onSessionRemoved(sessionKey); this._staleSessions.delete(sessionKey); } } // Clean up stale entries for sessions no longer in sessions.json for (const sessionKey of this._staleSessions) { if (!currentSessions.has(sessionKey)) { this._staleSessions.delete(sessionKey); } } this._knownSessions = currentSessions; } /** * Handle a newly detected session. * @private */ async _onSessionAdded(entry) { const { agentId, sessionKey, sessionId, spawnedBy, spawnDepth, label } = entry; // Use sessionFile from sessions.json if available, otherwise resolve let transcriptFile = entry.sessionFile || null; if (!transcriptFile || !fs.existsSync(transcriptFile)) { transcriptFile = this._transcriptPath(agentId, sessionId); } // Skip stale sessions — only track if transcript was modified in last 5 minutes // This prevents creating status boxes for every old session in sessions.json. // Stale sessions are tracked in _staleSessions and re-checked on every poll // so they get picked up as soon as the transcript becomes active again. try { // eslint-disable-next-line security/detect-non-literal-fs-filename const stat = fs.statSync(transcriptFile); const ageMs = Date.now() - stat.mtimeMs; const STALE_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes if (ageMs > STALE_THRESHOLD_MS) { this._staleSessions.add(sessionKey); if (this.logger) { this.logger.debug( { sessionKey, ageS: Math.floor(ageMs / 1000) }, 'Skipping stale session (transcript not recently modified)', ); } return; } } catch (_e) { // File doesn't exist — skip silently but track as stale for re-check this._staleSessions.add(sessionKey); if (this.logger) { this.logger.debug( { sessionKey, transcriptFile }, 'Skipping session (transcript not found)', ); } return; } // Session is fresh — remove from stale tracking this._staleSessions.delete(sessionKey); // Sub-agents always pass through — they inherit parent channel via watcher-manager const isSubAgent = !!spawnedBy; // Resolve channel ID — try session key first, then deliveryContext/lastTo let channelId = SessionMonitor.parseChannelId(sessionKey); // If session key doesn't contain channel, resolve from session entry metadata if (!channelId) { channelId = await this.resolveChannelFromEntry(entry); } // Fall back to default channel for non-MM sessions if (!channelId && !isSubAgent && !SessionMonitor.isMattermostSession(sessionKey, entry)) { channelId = this.defaultChannel; if (!channelId) { if (this.logger) { this.logger.debug({ sessionKey }, 'Skipping non-MM session (no channel, no default)'); } return; } } const rootPostId = SessionMonitor.parseRootPostId(sessionKey); const parsedAgentId = SessionMonitor.parseAgentId(sessionKey); if (this.logger) { this.logger.info({ sessionKey, agentId, channelId, spawnedBy }, 'Session detected'); } this.emit('session-added', { sessionKey, transcriptFile, spawnedBy, spawnDepth, channelId, rootPostId, agentId: label || parsedAgentId, }); } /** * Handle a removed session. * @private */ _onSessionRemoved(sessionKey) { if (this.logger) { this.logger.info({ sessionKey }, 'Session ended'); } this.emit('session-removed', sessionKey); } /** * Get list of currently known sessions. * @returns {Map} */ getKnownSessions() { return new Map(this._knownSessions); } } module.exports = { SessionMonitor };