Files
MATTERMOST_OPENCLAW_LIVESTATUS/src/session-monitor.js
sol 6df3278e91 feat: Phase 3 — sub-agent detection, nested status, cascade completion
Phase 3 (Sub-Agent Support):
- session-monitor.js: sub-agents always passed through (inherit parent channel)
- watcher-manager.js enhancements:
  - Pending sub-agent queue: child sessions that arrive before parent are queued
    and processed when parent is registered (no dropped sub-agents)
  - linkSubAgent(): extracted helper for clean parent-child linking
  - Cascade completion: parent stays active until all children complete
  - Sub-agents embedded in parent status post (no separate top-level post)
- status-formatter.js: recursive nested rendering at configurable depth

Integration tests - test/integration/sub-agent.test.js (9 tests):
  3.1 Sub-agent detection via spawnedBy (monitor level)
  3.2 Nested status rendering (depth indentation, multiple children, deep nesting)
  3.3 Cascade completion (pending tool call tracking across sessions)
  3.4 Sub-agent JSONL parsing (usage events, error tool results)

All 95 tests pass (59 unit + 36 integration). make check clean.
2026-03-07 17:36:11 +00:00

285 lines
7.7 KiB
JavaScript

'use strict';
/**
* session-monitor.js — Polls sessions.json every 2s to detect new/ended sessions.
*
* sessions.json format (per agent):
* {
* "agent:main:mattermost:channel:abc123:thread:xyz": {
* "sessionId": "uuid",
* "spawnedBy": null | "agent:main:...",
* "spawnDepth": 0,
* "label": "proj035-planner",
* "channel": "mattermost"
* }
* }
*
* Emits:
* 'session-added' ({ sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId })
* 'session-removed' (sessionKey)
*/
const fs = require('fs');
const path = require('path');
const { EventEmitter } = require('events');
class SessionMonitor extends EventEmitter {
/**
* @param {object} opts
* @param {string} opts.transcriptDir - Base /home/node/.openclaw/agents directory
* @param {number} [opts.pollMs] - Poll interval in ms (default 2000)
* @param {string|null} [opts.defaultChannel] - Fallback channel ID for non-MM sessions
* @param {object} [opts.logger] - pino logger
*/
constructor(opts) {
super();
this.transcriptDir = opts.transcriptDir;
this.pollMs = opts.pollMs || 2000;
this.defaultChannel = opts.defaultChannel || null;
this.logger = opts.logger || null;
// Map<sessionKey, sessionEntry>
this._knownSessions = new Map();
this._pollTimer = null;
this._running = false;
}
start() {
if (this._running) return;
this._running = true;
// Initial scan
this._poll();
this._pollTimer = setInterval(() => {
this._poll();
}, this.pollMs);
if (this.logger) this.logger.info({ pollMs: this.pollMs }, 'SessionMonitor started');
}
stop() {
this._running = false;
if (this._pollTimer) {
clearInterval(this._pollTimer);
this._pollTimer = null;
}
if (this.logger) this.logger.info('SessionMonitor stopped');
}
/**
* Get all agent directories under transcriptDir.
* @private
* @returns {string[]} Agent IDs
*/
_getAgentDirs() {
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
return fs.readdirSync(this.transcriptDir).filter((name) => {
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
return fs.statSync(path.join(this.transcriptDir, name)).isDirectory();
} catch (_e) {
return false;
}
});
} catch (_e) {
return [];
}
}
/**
* Read sessions.json for a given agent.
* @private
* @param {string} agentId
* @returns {object} Sessions map
*/
_readSessionsJson(agentId) {
const sessionsPath = path.join(this.transcriptDir, agentId, 'sessions', 'sessions.json');
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
const raw = fs.readFileSync(sessionsPath, 'utf8');
return JSON.parse(raw);
} catch (_e) {
return {};
}
}
/**
* Resolve the transcript file path for a session.
* @private
* @param {string} agentId
* @param {string} sessionId - UUID
* @returns {string}
*/
_transcriptPath(agentId, sessionId) {
return path.join(this.transcriptDir, agentId, 'sessions', `${sessionId}.jsonl`);
}
/**
* Parse channel ID from session key.
* Session key format: "agent:main:mattermost:channel:{channelId}:thread:{threadId}"
* or: "agent:main:mattermost:dm:{userId}"
* @param {string} sessionKey
* @returns {string|null}
*/
static parseChannelId(sessionKey) {
const parts = sessionKey.split(':');
// agent:main:mattermost:channel:CHANNEL_ID:...
const chanIdx = parts.indexOf('channel');
if (chanIdx >= 0 && parts[chanIdx + 1]) {
return parts[chanIdx + 1]; // eslint-disable-line security/detect-object-injection
}
// agent:main:mattermost:dm:USER_ID (use as channel)
const dmIdx = parts.indexOf('dm');
if (dmIdx >= 0 && parts[dmIdx + 1]) {
return parts[dmIdx + 1]; // eslint-disable-line security/detect-object-injection
}
return null;
}
/**
* Parse root post ID (thread ID) from session key.
* @param {string} sessionKey
* @returns {string|null}
*/
static parseRootPostId(sessionKey) {
const parts = sessionKey.split(':');
const threadIdx = parts.indexOf('thread');
if (threadIdx >= 0 && parts[threadIdx + 1]) {
return parts[threadIdx + 1]; // eslint-disable-line security/detect-object-injection
}
return null;
}
/**
* Extract agent ID from session key.
* @param {string} sessionKey
* @returns {string}
*/
static parseAgentId(sessionKey) {
const parts = sessionKey.split(':');
if (parts[0] === 'agent' && parts[1]) return parts[1];
return parts[0] || 'unknown';
}
/**
* Determine if a session is a Mattermost session.
* @param {string} sessionKey
* @returns {boolean}
*/
static isMattermostSession(sessionKey) {
return sessionKey.includes(':mattermost:') || sessionKey.includes(':mm:');
}
/**
* Poll all agents' sessions.json files for changes.
* @private
*/
_poll() {
if (!this._running) return;
const agentDirs = this._getAgentDirs();
const currentSessions = new Map();
for (const agentId of agentDirs) {
const sessions = this._readSessionsJson(agentId);
for (const [sessionKey, entry] of Object.entries(sessions)) {
const sessionId = entry.sessionId || entry.uuid;
if (!sessionId) continue;
currentSessions.set(sessionKey, {
agentId,
sessionKey,
sessionId,
spawnedBy: entry.spawnedBy || null,
spawnDepth: entry.spawnDepth || 0,
label: entry.label || null,
channel: entry.channel || null,
});
}
}
// Detect added sessions
for (const [sessionKey, entry] of currentSessions) {
if (!this._knownSessions.has(sessionKey)) {
this._onSessionAdded(entry);
}
}
// Detect removed sessions
for (const [sessionKey] of this._knownSessions) {
if (!currentSessions.has(sessionKey)) {
this._onSessionRemoved(sessionKey);
}
}
this._knownSessions = currentSessions;
}
/**
* Handle a newly detected session.
* @private
*/
_onSessionAdded(entry) {
const { agentId, sessionKey, sessionId, spawnedBy, spawnDepth, label } = entry;
const transcriptFile = this._transcriptPath(agentId, sessionId);
// Sub-agents always pass through — they inherit parent channel via watcher-manager
const isSubAgent = !!spawnedBy;
// Resolve channel ID from session key
let channelId = SessionMonitor.parseChannelId(sessionKey);
// Fall back to default channel for non-MM sessions
if (!channelId && !isSubAgent && !SessionMonitor.isMattermostSession(sessionKey)) {
channelId = this.defaultChannel;
if (!channelId) {
if (this.logger) {
this.logger.debug({ sessionKey }, 'Skipping non-MM session (no channel, no default)');
}
return;
}
}
const rootPostId = SessionMonitor.parseRootPostId(sessionKey);
const parsedAgentId = SessionMonitor.parseAgentId(sessionKey);
if (this.logger) {
this.logger.info({ sessionKey, agentId, channelId, spawnedBy }, 'Session detected');
}
this.emit('session-added', {
sessionKey,
transcriptFile,
spawnedBy,
spawnDepth,
channelId,
rootPostId,
agentId: label || parsedAgentId,
});
}
/**
* Handle a removed session.
* @private
*/
_onSessionRemoved(sessionKey) {
if (this.logger) {
this.logger.info({ sessionKey }, 'Session ended');
}
this.emit('session-removed', sessionKey);
}
/**
* Get list of currently known sessions.
* @returns {Map}
*/
getKnownSessions() {
return new Map(this._knownSessions);
}
}
module.exports = { SessionMonitor };