fix: resolve DM channel for agent:main:main sessions
The main agent session uses key 'agent:main:main' which doesn't contain a channel ID. The session monitor now falls back to reading deliveryContext/lastTo from sessions.json and resolves 'user:XXXX' format via the Mattermost direct channel API. Fixes: status watcher not tracking the main agent's active transcript
This commit is contained in:
@@ -42,6 +42,7 @@ function buildConfig() {
|
|||||||
token: getEnv('MM_BOT_TOKEN', null, true),
|
token: getEnv('MM_BOT_TOKEN', null, true),
|
||||||
baseUrl: getEnv('MM_BASE_URL', 'https://slack.solio.tech'),
|
baseUrl: getEnv('MM_BASE_URL', 'https://slack.solio.tech'),
|
||||||
maxSockets: getEnvInt('MM_MAX_SOCKETS', 4),
|
maxSockets: getEnvInt('MM_MAX_SOCKETS', 4),
|
||||||
|
botUserId: getEnv('MM_BOT_USER_ID', null),
|
||||||
},
|
},
|
||||||
|
|
||||||
// Transcript directory (OpenClaw agents)
|
// Transcript directory (OpenClaw agents)
|
||||||
|
|||||||
@@ -29,6 +29,9 @@ class SessionMonitor extends EventEmitter {
|
|||||||
* @param {string} opts.transcriptDir - Base /home/node/.openclaw/agents directory
|
* @param {string} opts.transcriptDir - Base /home/node/.openclaw/agents directory
|
||||||
* @param {number} [opts.pollMs] - Poll interval in ms (default 2000)
|
* @param {number} [opts.pollMs] - Poll interval in ms (default 2000)
|
||||||
* @param {string|null} [opts.defaultChannel] - Fallback channel ID for non-MM sessions
|
* @param {string|null} [opts.defaultChannel] - Fallback channel ID for non-MM sessions
|
||||||
|
* @param {string|null} [opts.mmToken] - Mattermost bot token (for DM channel resolution)
|
||||||
|
* @param {string|null} [opts.mmUrl] - Mattermost base URL
|
||||||
|
* @param {string|null} [opts.botUserId] - Bot's own Mattermost user ID
|
||||||
* @param {object} [opts.logger] - pino logger
|
* @param {object} [opts.logger] - pino logger
|
||||||
*/
|
*/
|
||||||
constructor(opts) {
|
constructor(opts) {
|
||||||
@@ -36,10 +39,15 @@ class SessionMonitor extends EventEmitter {
|
|||||||
this.transcriptDir = opts.transcriptDir;
|
this.transcriptDir = opts.transcriptDir;
|
||||||
this.pollMs = opts.pollMs || 500;
|
this.pollMs = opts.pollMs || 500;
|
||||||
this.defaultChannel = opts.defaultChannel || null;
|
this.defaultChannel = opts.defaultChannel || null;
|
||||||
|
this.mmToken = opts.mmToken || null;
|
||||||
|
this.mmUrl = opts.mmUrl || null;
|
||||||
|
this.botUserId = opts.botUserId || null;
|
||||||
this.logger = opts.logger || null;
|
this.logger = opts.logger || null;
|
||||||
|
|
||||||
// Map<sessionKey, sessionEntry>
|
// Map<sessionKey, sessionEntry>
|
||||||
this._knownSessions = new Map();
|
this._knownSessions = new Map();
|
||||||
|
// Cache: "user:XXXX" -> channelId (resolved DM channels)
|
||||||
|
this._dmChannelCache = new Map();
|
||||||
this._pollTimer = null;
|
this._pollTimer = null;
|
||||||
this._running = false;
|
this._running = false;
|
||||||
}
|
}
|
||||||
@@ -198,10 +206,107 @@ class SessionMonitor extends EventEmitter {
|
|||||||
/**
|
/**
|
||||||
* Determine if a session is a Mattermost session.
|
* Determine if a session is a Mattermost session.
|
||||||
* @param {string} sessionKey
|
* @param {string} sessionKey
|
||||||
|
* @param {object} [sessionEntry] - Session entry from sessions.json
|
||||||
* @returns {boolean}
|
* @returns {boolean}
|
||||||
*/
|
*/
|
||||||
static isMattermostSession(sessionKey) {
|
static isMattermostSession(sessionKey, sessionEntry) {
|
||||||
return sessionKey.includes(':mattermost:') || sessionKey.includes(':mm:');
|
if (sessionKey.includes(':mattermost:') || sessionKey.includes(':mm:')) return true;
|
||||||
|
// Check deliveryContext/channel/lastChannel in session entry
|
||||||
|
if (sessionEntry) {
|
||||||
|
if (sessionEntry.channel === 'mattermost' || sessionEntry.lastChannel === 'mattermost') return true;
|
||||||
|
const dc = sessionEntry.deliveryContext;
|
||||||
|
if (dc && dc.channel === 'mattermost') return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolve channel ID from session entry's deliveryContext/lastTo/origin.
|
||||||
|
* Handles "user:XXXX" format by resolving DM channel via Mattermost API.
|
||||||
|
* @param {object} entry - Session entry from sessions.json
|
||||||
|
* @returns {Promise<string|null>} Channel ID
|
||||||
|
*/
|
||||||
|
async resolveChannelFromEntry(entry) {
|
||||||
|
// Try deliveryContext.to first, then lastTo, then origin.to
|
||||||
|
const to = (entry.deliveryContext && entry.deliveryContext.to) || entry.lastTo || (entry.origin && entry.origin.to);
|
||||||
|
if (!to) return null;
|
||||||
|
|
||||||
|
// If it's a channel:XXXX format, extract directly
|
||||||
|
if (to.startsWith('channel:')) {
|
||||||
|
return to.slice(8);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If it's a user:XXXX format, resolve the DM channel
|
||||||
|
if (to.startsWith('user:')) {
|
||||||
|
const userId = to.slice(5);
|
||||||
|
return this._resolveDmChannel(userId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolve DM channel ID between the bot and a user via Mattermost API.
|
||||||
|
* @private
|
||||||
|
* @param {string} userId
|
||||||
|
* @returns {Promise<string|null>}
|
||||||
|
*/
|
||||||
|
async _resolveDmChannel(userId) {
|
||||||
|
const cacheKey = `user:${userId}`;
|
||||||
|
if (this._dmChannelCache.has(cacheKey)) {
|
||||||
|
return this._dmChannelCache.get(cacheKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.mmToken || !this.mmUrl || !this.botUserId) {
|
||||||
|
if (this.logger) this.logger.warn({ userId }, 'Cannot resolve DM channel — missing mmToken/mmUrl/botUserId');
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const url = new URL('/api/v4/channels/direct', this.mmUrl);
|
||||||
|
const https = require('https');
|
||||||
|
const http = require('http');
|
||||||
|
const transport = url.protocol === 'https:' ? https : http;
|
||||||
|
|
||||||
|
const channelId = await new Promise((resolve, reject) => {
|
||||||
|
const body = JSON.stringify([this.botUserId, userId]);
|
||||||
|
const req = transport.request({
|
||||||
|
hostname: url.hostname,
|
||||||
|
port: url.port || (url.protocol === 'https:' ? 443 : 80),
|
||||||
|
path: url.pathname,
|
||||||
|
method: 'POST',
|
||||||
|
headers: {
|
||||||
|
'Authorization': `Bearer ${this.mmToken}`,
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'Content-Length': Buffer.byteLength(body),
|
||||||
|
},
|
||||||
|
rejectUnauthorized: false,
|
||||||
|
}, (res) => {
|
||||||
|
let data = '';
|
||||||
|
res.on('data', (chunk) => { data += chunk; });
|
||||||
|
res.on('end', () => {
|
||||||
|
try {
|
||||||
|
const parsed = JSON.parse(data);
|
||||||
|
resolve(parsed.id || null);
|
||||||
|
} catch (_e) {
|
||||||
|
resolve(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
req.on('error', (e) => { reject(e); });
|
||||||
|
req.write(body);
|
||||||
|
req.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
if (channelId) {
|
||||||
|
this._dmChannelCache.set(cacheKey, channelId);
|
||||||
|
if (this.logger) this.logger.info({ userId, channelId }, 'Resolved DM channel');
|
||||||
|
}
|
||||||
|
return channelId;
|
||||||
|
} catch (err) {
|
||||||
|
if (this.logger) this.logger.warn({ userId, err: err.message }, 'Failed to resolve DM channel');
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -228,6 +333,12 @@ class SessionMonitor extends EventEmitter {
|
|||||||
spawnDepth: entry.spawnDepth || 0,
|
spawnDepth: entry.spawnDepth || 0,
|
||||||
label: entry.label || null,
|
label: entry.label || null,
|
||||||
channel: entry.channel || null,
|
channel: entry.channel || null,
|
||||||
|
// Preserve full entry for channel resolution
|
||||||
|
deliveryContext: entry.deliveryContext || null,
|
||||||
|
lastTo: entry.lastTo || null,
|
||||||
|
lastChannel: entry.lastChannel || null,
|
||||||
|
origin: entry.origin || null,
|
||||||
|
sessionFile: entry.sessionFile || null,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -253,14 +364,19 @@ class SessionMonitor extends EventEmitter {
|
|||||||
* Handle a newly detected session.
|
* Handle a newly detected session.
|
||||||
* @private
|
* @private
|
||||||
*/
|
*/
|
||||||
_onSessionAdded(entry) {
|
async _onSessionAdded(entry) {
|
||||||
const { agentId, sessionKey, sessionId, spawnedBy, spawnDepth, label } = entry;
|
const { agentId, sessionKey, sessionId, spawnedBy, spawnDepth, label } = entry;
|
||||||
|
|
||||||
const transcriptFile = this._transcriptPath(agentId, sessionId);
|
// Use sessionFile from sessions.json if available, otherwise resolve
|
||||||
|
let transcriptFile = entry.sessionFile || null;
|
||||||
|
if (!transcriptFile || !fs.existsSync(transcriptFile)) {
|
||||||
|
transcriptFile = this._transcriptPath(agentId, sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
// Skip stale sessions — only track if transcript was modified in last 5 minutes
|
// Skip stale sessions — only track if transcript was modified in last 5 minutes
|
||||||
// This prevents creating status boxes for every old session in sessions.json
|
// This prevents creating status boxes for every old session in sessions.json
|
||||||
try {
|
try {
|
||||||
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
const stat = fs.statSync(transcriptFile);
|
const stat = fs.statSync(transcriptFile);
|
||||||
const ageMs = Date.now() - stat.mtimeMs;
|
const ageMs = Date.now() - stat.mtimeMs;
|
||||||
const STALE_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes
|
const STALE_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes
|
||||||
@@ -287,11 +403,16 @@ class SessionMonitor extends EventEmitter {
|
|||||||
// Sub-agents always pass through — they inherit parent channel via watcher-manager
|
// Sub-agents always pass through — they inherit parent channel via watcher-manager
|
||||||
const isSubAgent = !!spawnedBy;
|
const isSubAgent = !!spawnedBy;
|
||||||
|
|
||||||
// Resolve channel ID from session key
|
// Resolve channel ID — try session key first, then deliveryContext/lastTo
|
||||||
let channelId = SessionMonitor.parseChannelId(sessionKey);
|
let channelId = SessionMonitor.parseChannelId(sessionKey);
|
||||||
|
|
||||||
|
// If session key doesn't contain channel, resolve from session entry metadata
|
||||||
|
if (!channelId) {
|
||||||
|
channelId = await this.resolveChannelFromEntry(entry);
|
||||||
|
}
|
||||||
|
|
||||||
// Fall back to default channel for non-MM sessions
|
// Fall back to default channel for non-MM sessions
|
||||||
if (!channelId && !isSubAgent && !SessionMonitor.isMattermostSession(sessionKey)) {
|
if (!channelId && !isSubAgent && !SessionMonitor.isMattermostSession(sessionKey, entry)) {
|
||||||
channelId = this.defaultChannel;
|
channelId = this.defaultChannel;
|
||||||
if (!channelId) {
|
if (!channelId) {
|
||||||
if (this.logger) {
|
if (this.logger) {
|
||||||
|
|||||||
@@ -206,6 +206,9 @@ async function startDaemon() {
|
|||||||
transcriptDir: config.transcriptDir,
|
transcriptDir: config.transcriptDir,
|
||||||
pollMs: config.sessionPollMs,
|
pollMs: config.sessionPollMs,
|
||||||
defaultChannel: config.defaultChannel,
|
defaultChannel: config.defaultChannel,
|
||||||
|
mmToken: config.mm.token,
|
||||||
|
mmUrl: config.mm.baseUrl,
|
||||||
|
botUserId: config.mm.botUserId,
|
||||||
logger: logger.child({ module: 'session-monitor' }),
|
logger: logger.child({ module: 'session-monitor' }),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user