Files
MATTERMOST_OPENCLAW_LIVESTATUS/src/session-monitor.js
sol 0d0e6e9d90 fix: resolve DM channel for agent:main:main sessions
The main agent session uses key 'agent:main:main' which doesn't
contain a channel ID. The session monitor now falls back to reading
deliveryContext/lastTo from sessions.json and resolves 'user:XXXX'
format via the Mattermost direct channel API.

Fixes: status watcher not tracking the main agent's active transcript
2026-03-07 22:35:40 +00:00

464 lines
14 KiB
JavaScript

'use strict';
/**
* session-monitor.js — Polls sessions.json every 2s to detect new/ended sessions.
*
* sessions.json format (per agent):
* {
* "agent:main:mattermost:channel:abc123:thread:xyz": {
* "sessionId": "uuid",
* "spawnedBy": null | "agent:main:...",
* "spawnDepth": 0,
* "label": "proj035-planner",
* "channel": "mattermost"
* }
* }
*
* Emits:
* 'session-added' ({ sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId })
* 'session-removed' (sessionKey)
*/
const fs = require('fs');
const path = require('path');
const { EventEmitter } = require('events');
class SessionMonitor extends EventEmitter {
/**
* @param {object} opts
* @param {string} opts.transcriptDir - Base /home/node/.openclaw/agents directory
* @param {number} [opts.pollMs] - Poll interval in ms (default 2000)
* @param {string|null} [opts.defaultChannel] - Fallback channel ID for non-MM sessions
* @param {string|null} [opts.mmToken] - Mattermost bot token (for DM channel resolution)
* @param {string|null} [opts.mmUrl] - Mattermost base URL
* @param {string|null} [opts.botUserId] - Bot's own Mattermost user ID
* @param {object} [opts.logger] - pino logger
*/
constructor(opts) {
super();
this.transcriptDir = opts.transcriptDir;
this.pollMs = opts.pollMs || 500;
this.defaultChannel = opts.defaultChannel || null;
this.mmToken = opts.mmToken || null;
this.mmUrl = opts.mmUrl || null;
this.botUserId = opts.botUserId || null;
this.logger = opts.logger || null;
// Map<sessionKey, sessionEntry>
this._knownSessions = new Map();
// Cache: "user:XXXX" -> channelId (resolved DM channels)
this._dmChannelCache = new Map();
this._pollTimer = null;
this._running = false;
}
start() {
if (this._running) return;
this._running = true;
// Initial scan
this._poll();
this._pollTimer = setInterval(() => {
this._poll();
}, this.pollMs);
if (this.logger) this.logger.info({ pollMs: this.pollMs }, 'SessionMonitor started');
}
stop() {
this._running = false;
if (this._pollTimer) {
clearInterval(this._pollTimer);
this._pollTimer = null;
}
if (this.logger) this.logger.info('SessionMonitor stopped');
}
/**
* Remove a session from known sessions so it can be re-detected on next poll.
* Called when the watcher marks a session as idle/done.
* @param {string} sessionKey
*/
forgetSession(sessionKey) {
this._knownSessions.delete(sessionKey);
}
/**
* Get all agent directories under transcriptDir.
* @private
* @returns {string[]} Agent IDs
*/
_getAgentDirs() {
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
return fs.readdirSync(this.transcriptDir).filter((name) => {
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
return fs.statSync(path.join(this.transcriptDir, name)).isDirectory();
} catch (_e) {
return false;
}
});
} catch (_e) {
return [];
}
}
/**
* Read sessions.json for a given agent.
* @private
* @param {string} agentId
* @returns {object} Sessions map
*/
_readSessionsJson(agentId) {
const sessionsPath = path.join(this.transcriptDir, agentId, 'sessions', 'sessions.json');
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
const raw = fs.readFileSync(sessionsPath, 'utf8');
return JSON.parse(raw);
} catch (_e) {
return {};
}
}
/**
* Resolve the transcript file path for a session.
* @private
* @param {string} agentId
* @param {string} sessionId - UUID
* @returns {string}
*/
_transcriptPath(agentId, sessionId) {
const sessionsDir = path.join(this.transcriptDir, agentId, 'sessions');
const directPath = path.join(sessionsDir, `${sessionId}.jsonl`);
// OpenClaw may use timestamp-prefixed filenames: {ISO}_{sessionId}.jsonl
// Check direct path first, then glob for *_{sessionId}.jsonl
if (fs.existsSync(directPath)) {
return directPath;
}
try {
const files = fs.readdirSync(sessionsDir);
const suffix = `${sessionId}.jsonl`;
const match = files.find(
(f) => f.endsWith(suffix) && f !== suffix && !f.endsWith('.deleted'),
);
if (match) {
return path.join(sessionsDir, match);
}
} catch (_e) {
// Directory doesn't exist or unreadable
}
// Fallback to direct path (will fail with ENOENT, which is handled upstream)
return directPath;
}
/**
* Parse channel ID from session key.
* Session key format: "agent:main:mattermost:channel:{channelId}:thread:{threadId}"
* or: "agent:main:mattermost:dm:{userId}"
* @param {string} sessionKey
* @returns {string|null}
*/
static parseChannelId(sessionKey) {
const parts = sessionKey.split(':');
// agent:main:mattermost:channel:CHANNEL_ID:...
const chanIdx = parts.indexOf('channel');
if (chanIdx >= 0 && parts[chanIdx + 1]) {
return parts[chanIdx + 1]; // eslint-disable-line security/detect-object-injection
}
// agent:main:mattermost:dm:USER_ID (use as channel)
const dmIdx = parts.indexOf('dm');
if (dmIdx >= 0 && parts[dmIdx + 1]) {
return parts[dmIdx + 1]; // eslint-disable-line security/detect-object-injection
}
return null;
}
/**
* Parse root post ID (thread ID) from session key.
* @param {string} sessionKey
* @returns {string|null}
*/
static parseRootPostId(sessionKey) {
const parts = sessionKey.split(':');
const threadIdx = parts.indexOf('thread');
if (threadIdx >= 0 && parts[threadIdx + 1]) {
return parts[threadIdx + 1]; // eslint-disable-line security/detect-object-injection
}
return null;
}
/**
* Extract agent ID from session key.
* @param {string} sessionKey
* @returns {string}
*/
static parseAgentId(sessionKey) {
const parts = sessionKey.split(':');
if (parts[0] === 'agent' && parts[1]) return parts[1];
return parts[0] || 'unknown';
}
/**
* Determine if a session is a Mattermost session.
* @param {string} sessionKey
* @param {object} [sessionEntry] - Session entry from sessions.json
* @returns {boolean}
*/
static isMattermostSession(sessionKey, sessionEntry) {
if (sessionKey.includes(':mattermost:') || sessionKey.includes(':mm:')) return true;
// Check deliveryContext/channel/lastChannel in session entry
if (sessionEntry) {
if (sessionEntry.channel === 'mattermost' || sessionEntry.lastChannel === 'mattermost') return true;
const dc = sessionEntry.deliveryContext;
if (dc && dc.channel === 'mattermost') return true;
}
return false;
}
/**
* Resolve channel ID from session entry's deliveryContext/lastTo/origin.
* Handles "user:XXXX" format by resolving DM channel via Mattermost API.
* @param {object} entry - Session entry from sessions.json
* @returns {Promise<string|null>} Channel ID
*/
async resolveChannelFromEntry(entry) {
// Try deliveryContext.to first, then lastTo, then origin.to
const to = (entry.deliveryContext && entry.deliveryContext.to) || entry.lastTo || (entry.origin && entry.origin.to);
if (!to) return null;
// If it's a channel:XXXX format, extract directly
if (to.startsWith('channel:')) {
return to.slice(8);
}
// If it's a user:XXXX format, resolve the DM channel
if (to.startsWith('user:')) {
const userId = to.slice(5);
return this._resolveDmChannel(userId);
}
return null;
}
/**
* Resolve DM channel ID between the bot and a user via Mattermost API.
* @private
* @param {string} userId
* @returns {Promise<string|null>}
*/
async _resolveDmChannel(userId) {
const cacheKey = `user:${userId}`;
if (this._dmChannelCache.has(cacheKey)) {
return this._dmChannelCache.get(cacheKey);
}
if (!this.mmToken || !this.mmUrl || !this.botUserId) {
if (this.logger) this.logger.warn({ userId }, 'Cannot resolve DM channel — missing mmToken/mmUrl/botUserId');
return null;
}
try {
const url = new URL('/api/v4/channels/direct', this.mmUrl);
const https = require('https');
const http = require('http');
const transport = url.protocol === 'https:' ? https : http;
const channelId = await new Promise((resolve, reject) => {
const body = JSON.stringify([this.botUserId, userId]);
const req = transport.request({
hostname: url.hostname,
port: url.port || (url.protocol === 'https:' ? 443 : 80),
path: url.pathname,
method: 'POST',
headers: {
'Authorization': `Bearer ${this.mmToken}`,
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
},
rejectUnauthorized: false,
}, (res) => {
let data = '';
res.on('data', (chunk) => { data += chunk; });
res.on('end', () => {
try {
const parsed = JSON.parse(data);
resolve(parsed.id || null);
} catch (_e) {
resolve(null);
}
});
});
req.on('error', (e) => { reject(e); });
req.write(body);
req.end();
});
if (channelId) {
this._dmChannelCache.set(cacheKey, channelId);
if (this.logger) this.logger.info({ userId, channelId }, 'Resolved DM channel');
}
return channelId;
} catch (err) {
if (this.logger) this.logger.warn({ userId, err: err.message }, 'Failed to resolve DM channel');
return null;
}
}
/**
* Poll all agents' sessions.json files for changes.
* @private
*/
_poll() {
if (!this._running) return;
const agentDirs = this._getAgentDirs();
const currentSessions = new Map();
for (const agentId of agentDirs) {
const sessions = this._readSessionsJson(agentId);
for (const [sessionKey, entry] of Object.entries(sessions)) {
const sessionId = entry.sessionId || entry.uuid;
if (!sessionId) continue;
currentSessions.set(sessionKey, {
agentId,
sessionKey,
sessionId,
spawnedBy: entry.spawnedBy || null,
spawnDepth: entry.spawnDepth || 0,
label: entry.label || null,
channel: entry.channel || null,
// Preserve full entry for channel resolution
deliveryContext: entry.deliveryContext || null,
lastTo: entry.lastTo || null,
lastChannel: entry.lastChannel || null,
origin: entry.origin || null,
sessionFile: entry.sessionFile || null,
});
}
}
// Detect added sessions
for (const [sessionKey, entry] of currentSessions) {
if (!this._knownSessions.has(sessionKey)) {
this._onSessionAdded(entry);
}
}
// Detect removed sessions
for (const [sessionKey] of this._knownSessions) {
if (!currentSessions.has(sessionKey)) {
this._onSessionRemoved(sessionKey);
}
}
this._knownSessions = currentSessions;
}
/**
* Handle a newly detected session.
* @private
*/
async _onSessionAdded(entry) {
const { agentId, sessionKey, sessionId, spawnedBy, spawnDepth, label } = entry;
// Use sessionFile from sessions.json if available, otherwise resolve
let transcriptFile = entry.sessionFile || null;
if (!transcriptFile || !fs.existsSync(transcriptFile)) {
transcriptFile = this._transcriptPath(agentId, sessionId);
}
// Skip stale sessions — only track if transcript was modified in last 5 minutes
// This prevents creating status boxes for every old session in sessions.json
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
const stat = fs.statSync(transcriptFile);
const ageMs = Date.now() - stat.mtimeMs;
const STALE_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes
if (ageMs > STALE_THRESHOLD_MS) {
if (this.logger) {
this.logger.debug(
{ sessionKey, ageS: Math.floor(ageMs / 1000) },
'Skipping stale session (transcript not recently modified)',
);
}
return;
}
} catch (_e) {
// File doesn't exist — skip silently
if (this.logger) {
this.logger.debug(
{ sessionKey, transcriptFile },
'Skipping session (transcript not found)',
);
}
return;
}
// Sub-agents always pass through — they inherit parent channel via watcher-manager
const isSubAgent = !!spawnedBy;
// Resolve channel ID — try session key first, then deliveryContext/lastTo
let channelId = SessionMonitor.parseChannelId(sessionKey);
// If session key doesn't contain channel, resolve from session entry metadata
if (!channelId) {
channelId = await this.resolveChannelFromEntry(entry);
}
// Fall back to default channel for non-MM sessions
if (!channelId && !isSubAgent && !SessionMonitor.isMattermostSession(sessionKey, entry)) {
channelId = this.defaultChannel;
if (!channelId) {
if (this.logger) {
this.logger.debug({ sessionKey }, 'Skipping non-MM session (no channel, no default)');
}
return;
}
}
const rootPostId = SessionMonitor.parseRootPostId(sessionKey);
const parsedAgentId = SessionMonitor.parseAgentId(sessionKey);
if (this.logger) {
this.logger.info({ sessionKey, agentId, channelId, spawnedBy }, 'Session detected');
}
this.emit('session-added', {
sessionKey,
transcriptFile,
spawnedBy,
spawnDepth,
channelId,
rootPostId,
agentId: label || parsedAgentId,
});
}
/**
* Handle a removed session.
* @private
*/
_onSessionRemoved(sessionKey) {
if (this.logger) {
this.logger.info({ sessionKey }, 'Session ended');
}
this.emit('session-removed', sessionKey);
}
/**
* Get list of currently known sessions.
* @returns {Map}
*/
getKnownSessions() {
return new Map(this._knownSessions);
}
}
module.exports = { SessionMonitor };