diff --git a/src/config.js b/src/config.js index 60cf5a5..2f1c588 100644 --- a/src/config.js +++ b/src/config.js @@ -42,6 +42,7 @@ function buildConfig() { token: getEnv('MM_BOT_TOKEN', null, true), baseUrl: getEnv('MM_BASE_URL', 'https://slack.solio.tech'), maxSockets: getEnvInt('MM_MAX_SOCKETS', 4), + botUserId: getEnv('MM_BOT_USER_ID', null), }, // Transcript directory (OpenClaw agents) diff --git a/src/session-monitor.js b/src/session-monitor.js index 7c5122c..0390c41 100644 --- a/src/session-monitor.js +++ b/src/session-monitor.js @@ -29,6 +29,9 @@ class SessionMonitor extends EventEmitter { * @param {string} opts.transcriptDir - Base /home/node/.openclaw/agents directory * @param {number} [opts.pollMs] - Poll interval in ms (default 2000) * @param {string|null} [opts.defaultChannel] - Fallback channel ID for non-MM sessions + * @param {string|null} [opts.mmToken] - Mattermost bot token (for DM channel resolution) + * @param {string|null} [opts.mmUrl] - Mattermost base URL + * @param {string|null} [opts.botUserId] - Bot's own Mattermost user ID * @param {object} [opts.logger] - pino logger */ constructor(opts) { @@ -36,10 +39,15 @@ class SessionMonitor extends EventEmitter { this.transcriptDir = opts.transcriptDir; this.pollMs = opts.pollMs || 500; this.defaultChannel = opts.defaultChannel || null; + this.mmToken = opts.mmToken || null; + this.mmUrl = opts.mmUrl || null; + this.botUserId = opts.botUserId || null; this.logger = opts.logger || null; // Map this._knownSessions = new Map(); + // Cache: "user:XXXX" -> channelId (resolved DM channels) + this._dmChannelCache = new Map(); this._pollTimer = null; this._running = false; } @@ -198,10 +206,107 @@ class SessionMonitor extends EventEmitter { /** * Determine if a session is a Mattermost session. * @param {string} sessionKey + * @param {object} [sessionEntry] - Session entry from sessions.json * @returns {boolean} */ - static isMattermostSession(sessionKey) { - return sessionKey.includes(':mattermost:') || sessionKey.includes(':mm:'); + static isMattermostSession(sessionKey, sessionEntry) { + if (sessionKey.includes(':mattermost:') || sessionKey.includes(':mm:')) return true; + // Check deliveryContext/channel/lastChannel in session entry + if (sessionEntry) { + if (sessionEntry.channel === 'mattermost' || sessionEntry.lastChannel === 'mattermost') return true; + const dc = sessionEntry.deliveryContext; + if (dc && dc.channel === 'mattermost') return true; + } + return false; + } + + /** + * Resolve channel ID from session entry's deliveryContext/lastTo/origin. + * Handles "user:XXXX" format by resolving DM channel via Mattermost API. + * @param {object} entry - Session entry from sessions.json + * @returns {Promise} Channel ID + */ + async resolveChannelFromEntry(entry) { + // Try deliveryContext.to first, then lastTo, then origin.to + const to = (entry.deliveryContext && entry.deliveryContext.to) || entry.lastTo || (entry.origin && entry.origin.to); + if (!to) return null; + + // If it's a channel:XXXX format, extract directly + if (to.startsWith('channel:')) { + return to.slice(8); + } + + // If it's a user:XXXX format, resolve the DM channel + if (to.startsWith('user:')) { + const userId = to.slice(5); + return this._resolveDmChannel(userId); + } + + return null; + } + + /** + * Resolve DM channel ID between the bot and a user via Mattermost API. + * @private + * @param {string} userId + * @returns {Promise} + */ + async _resolveDmChannel(userId) { + const cacheKey = `user:${userId}`; + if (this._dmChannelCache.has(cacheKey)) { + return this._dmChannelCache.get(cacheKey); + } + + if (!this.mmToken || !this.mmUrl || !this.botUserId) { + if (this.logger) this.logger.warn({ userId }, 'Cannot resolve DM channel — missing mmToken/mmUrl/botUserId'); + return null; + } + + try { + const url = new URL('/api/v4/channels/direct', this.mmUrl); + const https = require('https'); + const http = require('http'); + const transport = url.protocol === 'https:' ? https : http; + + const channelId = await new Promise((resolve, reject) => { + const body = JSON.stringify([this.botUserId, userId]); + const req = transport.request({ + hostname: url.hostname, + port: url.port || (url.protocol === 'https:' ? 443 : 80), + path: url.pathname, + method: 'POST', + headers: { + 'Authorization': `Bearer ${this.mmToken}`, + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body), + }, + rejectUnauthorized: false, + }, (res) => { + let data = ''; + res.on('data', (chunk) => { data += chunk; }); + res.on('end', () => { + try { + const parsed = JSON.parse(data); + resolve(parsed.id || null); + } catch (_e) { + resolve(null); + } + }); + }); + req.on('error', (e) => { reject(e); }); + req.write(body); + req.end(); + }); + + if (channelId) { + this._dmChannelCache.set(cacheKey, channelId); + if (this.logger) this.logger.info({ userId, channelId }, 'Resolved DM channel'); + } + return channelId; + } catch (err) { + if (this.logger) this.logger.warn({ userId, err: err.message }, 'Failed to resolve DM channel'); + return null; + } } /** @@ -228,6 +333,12 @@ class SessionMonitor extends EventEmitter { spawnDepth: entry.spawnDepth || 0, label: entry.label || null, channel: entry.channel || null, + // Preserve full entry for channel resolution + deliveryContext: entry.deliveryContext || null, + lastTo: entry.lastTo || null, + lastChannel: entry.lastChannel || null, + origin: entry.origin || null, + sessionFile: entry.sessionFile || null, }); } } @@ -253,14 +364,19 @@ class SessionMonitor extends EventEmitter { * Handle a newly detected session. * @private */ - _onSessionAdded(entry) { + async _onSessionAdded(entry) { const { agentId, sessionKey, sessionId, spawnedBy, spawnDepth, label } = entry; - const transcriptFile = this._transcriptPath(agentId, sessionId); + // Use sessionFile from sessions.json if available, otherwise resolve + let transcriptFile = entry.sessionFile || null; + if (!transcriptFile || !fs.existsSync(transcriptFile)) { + transcriptFile = this._transcriptPath(agentId, sessionId); + } // Skip stale sessions — only track if transcript was modified in last 5 minutes // This prevents creating status boxes for every old session in sessions.json try { + // eslint-disable-next-line security/detect-non-literal-fs-filename const stat = fs.statSync(transcriptFile); const ageMs = Date.now() - stat.mtimeMs; const STALE_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes @@ -287,11 +403,16 @@ class SessionMonitor extends EventEmitter { // Sub-agents always pass through — they inherit parent channel via watcher-manager const isSubAgent = !!spawnedBy; - // Resolve channel ID from session key + // Resolve channel ID — try session key first, then deliveryContext/lastTo let channelId = SessionMonitor.parseChannelId(sessionKey); + // If session key doesn't contain channel, resolve from session entry metadata + if (!channelId) { + channelId = await this.resolveChannelFromEntry(entry); + } + // Fall back to default channel for non-MM sessions - if (!channelId && !isSubAgent && !SessionMonitor.isMattermostSession(sessionKey)) { + if (!channelId && !isSubAgent && !SessionMonitor.isMattermostSession(sessionKey, entry)) { channelId = this.defaultChannel; if (!channelId) { if (this.logger) { diff --git a/src/watcher-manager.js b/src/watcher-manager.js index 2ce2210..8a4d494 100644 --- a/src/watcher-manager.js +++ b/src/watcher-manager.js @@ -206,6 +206,9 @@ async function startDaemon() { transcriptDir: config.transcriptDir, pollMs: config.sessionPollMs, defaultChannel: config.defaultChannel, + mmToken: config.mm.token, + mmUrl: config.mm.baseUrl, + botUserId: config.mm.botUserId, logger: logger.child({ module: 'session-monitor' }), });