feat: Phase 2 — session monitor, lifecycle, watcher manager

Phase 2 (Session Monitor + Lifecycle):
- src/session-monitor.js: polls sessions.json every 2s for new/ended sessions
  - Detects agents via transcriptDir subdirectory scan
  - Resolves channelId/rootPostId from session key format
  - Emits session-added/session-removed events
  - Handles multi-agent environments
  - Falls back to defaultChannel for non-MM sessions
- src/watcher-manager.js: top-level orchestrator
  - Starts session-monitor, status-watcher, health-server
  - Creates/updates Mattermost status posts on session events
  - Sub-agent linking: children embedded in parent status
  - Offset persistence (save/restore lastOffset on restart)
  - Post recovery on restart (search channel history for marker)
  - SIGTERM/SIGINT graceful shutdown: mark all boxes interrupted
  - CLI: node watcher-manager.js start|stop|status
  - MAX_ACTIVE_SESSIONS enforcement

Integration tests:
- test/integration/session-monitor.test.js: 14 tests
  - Session detection, removal, multi-agent, malformed JSON handling
- test/integration/status-watcher.test.js: 13 tests
  - JSONL parsing, tool_call/result pairs, idle detection, offset recovery

All 86 tests pass (59 unit + 27 integration). make check clean.
This commit is contained in:
sol
2026-03-07 17:32:28 +00:00
parent 43cfebee96
commit e3bd6c52dd
7 changed files with 1339 additions and 3 deletions

281
src/session-monitor.js Normal file
View File

@@ -0,0 +1,281 @@
'use strict';
/**
* session-monitor.js — Polls sessions.json every 2s to detect new/ended sessions.
*
* sessions.json format (per agent):
* {
* "agent:main:mattermost:channel:abc123:thread:xyz": {
* "sessionId": "uuid",
* "spawnedBy": null | "agent:main:...",
* "spawnDepth": 0,
* "label": "proj035-planner",
* "channel": "mattermost"
* }
* }
*
* Emits:
* 'session-added' ({ sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId })
* 'session-removed' (sessionKey)
*/
const fs = require('fs');
const path = require('path');
const { EventEmitter } = require('events');
class SessionMonitor extends EventEmitter {
/**
* @param {object} opts
* @param {string} opts.transcriptDir - Base /home/node/.openclaw/agents directory
* @param {number} [opts.pollMs] - Poll interval in ms (default 2000)
* @param {string|null} [opts.defaultChannel] - Fallback channel ID for non-MM sessions
* @param {object} [opts.logger] - pino logger
*/
constructor(opts) {
super();
this.transcriptDir = opts.transcriptDir;
this.pollMs = opts.pollMs || 2000;
this.defaultChannel = opts.defaultChannel || null;
this.logger = opts.logger || null;
// Map<sessionKey, sessionEntry>
this._knownSessions = new Map();
this._pollTimer = null;
this._running = false;
}
start() {
if (this._running) return;
this._running = true;
// Initial scan
this._poll();
this._pollTimer = setInterval(() => {
this._poll();
}, this.pollMs);
if (this.logger) this.logger.info({ pollMs: this.pollMs }, 'SessionMonitor started');
}
stop() {
this._running = false;
if (this._pollTimer) {
clearInterval(this._pollTimer);
this._pollTimer = null;
}
if (this.logger) this.logger.info('SessionMonitor stopped');
}
/**
* Get all agent directories under transcriptDir.
* @private
* @returns {string[]} Agent IDs
*/
_getAgentDirs() {
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
return fs.readdirSync(this.transcriptDir).filter((name) => {
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
return fs.statSync(path.join(this.transcriptDir, name)).isDirectory();
} catch (_e) {
return false;
}
});
} catch (_e) {
return [];
}
}
/**
* Read sessions.json for a given agent.
* @private
* @param {string} agentId
* @returns {object} Sessions map
*/
_readSessionsJson(agentId) {
const sessionsPath = path.join(this.transcriptDir, agentId, 'sessions', 'sessions.json');
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
const raw = fs.readFileSync(sessionsPath, 'utf8');
return JSON.parse(raw);
} catch (_e) {
return {};
}
}
/**
* Resolve the transcript file path for a session.
* @private
* @param {string} agentId
* @param {string} sessionId - UUID
* @returns {string}
*/
_transcriptPath(agentId, sessionId) {
return path.join(this.transcriptDir, agentId, 'sessions', `${sessionId}.jsonl`);
}
/**
* Parse channel ID from session key.
* Session key format: "agent:main:mattermost:channel:{channelId}:thread:{threadId}"
* or: "agent:main:mattermost:dm:{userId}"
* @param {string} sessionKey
* @returns {string|null}
*/
static parseChannelId(sessionKey) {
const parts = sessionKey.split(':');
// agent:main:mattermost:channel:CHANNEL_ID:...
const chanIdx = parts.indexOf('channel');
if (chanIdx >= 0 && parts[chanIdx + 1]) {
return parts[chanIdx + 1]; // eslint-disable-line security/detect-object-injection
}
// agent:main:mattermost:dm:USER_ID (use as channel)
const dmIdx = parts.indexOf('dm');
if (dmIdx >= 0 && parts[dmIdx + 1]) {
return parts[dmIdx + 1]; // eslint-disable-line security/detect-object-injection
}
return null;
}
/**
* Parse root post ID (thread ID) from session key.
* @param {string} sessionKey
* @returns {string|null}
*/
static parseRootPostId(sessionKey) {
const parts = sessionKey.split(':');
const threadIdx = parts.indexOf('thread');
if (threadIdx >= 0 && parts[threadIdx + 1]) {
return parts[threadIdx + 1]; // eslint-disable-line security/detect-object-injection
}
return null;
}
/**
* Extract agent ID from session key.
* @param {string} sessionKey
* @returns {string}
*/
static parseAgentId(sessionKey) {
const parts = sessionKey.split(':');
if (parts[0] === 'agent' && parts[1]) return parts[1];
return parts[0] || 'unknown';
}
/**
* Determine if a session is a Mattermost session.
* @param {string} sessionKey
* @returns {boolean}
*/
static isMattermostSession(sessionKey) {
return sessionKey.includes(':mattermost:') || sessionKey.includes(':mm:');
}
/**
* Poll all agents' sessions.json files for changes.
* @private
*/
_poll() {
if (!this._running) return;
const agentDirs = this._getAgentDirs();
const currentSessions = new Map();
for (const agentId of agentDirs) {
const sessions = this._readSessionsJson(agentId);
for (const [sessionKey, entry] of Object.entries(sessions)) {
const sessionId = entry.sessionId || entry.uuid;
if (!sessionId) continue;
currentSessions.set(sessionKey, {
agentId,
sessionKey,
sessionId,
spawnedBy: entry.spawnedBy || null,
spawnDepth: entry.spawnDepth || 0,
label: entry.label || null,
channel: entry.channel || null,
});
}
}
// Detect added sessions
for (const [sessionKey, entry] of currentSessions) {
if (!this._knownSessions.has(sessionKey)) {
this._onSessionAdded(entry);
}
}
// Detect removed sessions
for (const [sessionKey] of this._knownSessions) {
if (!currentSessions.has(sessionKey)) {
this._onSessionRemoved(sessionKey);
}
}
this._knownSessions = currentSessions;
}
/**
* Handle a newly detected session.
* @private
*/
_onSessionAdded(entry) {
const { agentId, sessionKey, sessionId, spawnedBy, spawnDepth, label } = entry;
const transcriptFile = this._transcriptPath(agentId, sessionId);
// Resolve channel ID from session key
let channelId = SessionMonitor.parseChannelId(sessionKey);
// Fall back to default channel for non-MM sessions
if (!channelId && !SessionMonitor.isMattermostSession(sessionKey)) {
channelId = this.defaultChannel;
if (!channelId) {
if (this.logger) {
this.logger.debug({ sessionKey }, 'Skipping non-MM session (no channel, no default)');
}
return;
}
}
const rootPostId = SessionMonitor.parseRootPostId(sessionKey);
const parsedAgentId = SessionMonitor.parseAgentId(sessionKey);
if (this.logger) {
this.logger.info({ sessionKey, agentId, channelId, spawnedBy }, 'Session detected');
}
this.emit('session-added', {
sessionKey,
transcriptFile,
spawnedBy,
spawnDepth,
channelId,
rootPostId,
agentId: label || parsedAgentId,
});
}
/**
* Handle a removed session.
* @private
*/
_onSessionRemoved(sessionKey) {
if (this.logger) {
this.logger.info({ sessionKey }, 'Session ended');
}
this.emit('session-removed', sessionKey);
}
/**
* Get list of currently known sessions.
* @returns {Map}
*/
getKnownSessions() {
return new Map(this._knownSessions);
}
}
module.exports = { SessionMonitor };

533
src/watcher-manager.js Normal file
View File

@@ -0,0 +1,533 @@
#!/usr/bin/env node
'use strict';
/* eslint-disable no-console */
/**
* watcher-manager.js — Top-level orchestrator for the Live Status v4 daemon.
*
* CLI: node watcher-manager.js start|stop|status
*
* Architecture:
* - SessionMonitor polls sessions.json every 2s for new/ended sessions
* - StatusWatcher watches transcript files via fs.watch (inotify)
* - StatusBox manages Mattermost posts (throttle, circuit breaker, retry)
* - HealthServer exposes /health endpoint
* - Offset persistence: save/restore last read positions on restart
* - Graceful shutdown: SIGTERM/SIGINT -> mark all boxes interrupted -> exit 0
*/
const fs = require('fs');
const { getConfig } = require('./config');
const { getLogger } = require('./logger');
const { SessionMonitor } = require('./session-monitor');
const { StatusWatcher } = require('./status-watcher');
const { StatusBox } = require('./status-box');
// status-formatter is used inline via require() in helpers
const { HealthServer } = require('./health');
const { loadLabels } = require('./tool-labels');
// ---- CLI Router ----
const cmd = process.argv[2];
if (cmd === 'start') {
startDaemon().catch((err) => {
console.error('Failed to start:', err.message);
process.exit(1);
});
} else if (cmd === 'stop') {
stopDaemon();
} else if (cmd === 'status') {
daemonStatus();
} else {
console.log('Usage: node watcher-manager.js <start|stop|status>');
process.exit(1);
}
// ---- PID File helpers ----
function writePidFile(pidFile) {
// eslint-disable-next-line security/detect-non-literal-fs-filename
fs.writeFileSync(pidFile, String(process.pid), 'utf8');
}
function readPidFile(pidFile) {
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
return parseInt(fs.readFileSync(pidFile, 'utf8').trim(), 10);
} catch (_e) {
return null;
}
}
function removePidFile(pidFile) {
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
fs.unlinkSync(pidFile);
} catch (_e) {
/* ignore */
}
}
function isProcessRunning(pid) {
try {
process.kill(pid, 0);
return true;
} catch (_e) {
return false;
}
}
// ---- Offset persistence ----
function loadOffsets(offsetFile) {
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
return JSON.parse(fs.readFileSync(offsetFile, 'utf8'));
} catch (_e) {
return {};
}
}
function saveOffsets(offsetFile, sessions) {
const offsets = {};
for (const [key, state] of sessions) {
offsets[key] = {
lastOffset: state.lastOffset || 0,
transcriptFile: state.transcriptFile,
startTime: state.startTime,
};
}
try {
// eslint-disable-next-line security/detect-non-literal-fs-filename
fs.writeFileSync(offsetFile, JSON.stringify(offsets, null, 2), 'utf8');
} catch (_e) {
/* ignore write error */
}
}
// ---- Status box marker for post recovery ----
function makeMarker(sessionKey) {
return `<!-- sw:${sessionKey} -->`;
}
// ---- Main daemon ----
async function startDaemon() {
const config = getConfig();
const logger = getLogger();
// Check if already running
const existingPid = readPidFile(config.pidFile);
if (existingPid && isProcessRunning(existingPid)) {
console.error(`Daemon already running (PID ${existingPid})`);
process.exit(1);
}
removePidFile(config.pidFile);
// Write PID file
writePidFile(config.pidFile);
logger.info({ pid: process.pid, pidFile: config.pidFile }, 'Status watcher daemon starting');
// Load tool labels
loadLabels(config.toolLabelsFile);
// Load persisted offsets for restart recovery
const savedOffsets = loadOffsets(config.offsetFile);
logger.info({ count: Object.keys(savedOffsets).length }, 'Loaded persisted session offsets');
// Shared state
// Map<sessionKey, { postId, statusBox }>
const activeBoxes = new Map();
let globalMetrics = {
activeSessions: 0,
updatesSent: 0,
updatesFailed: 0,
queueDepth: 0,
lastError: null,
circuit: { state: 'unknown' },
};
// Shared StatusBox instance (single http.Agent pool)
const sharedStatusBox = new StatusBox({
baseUrl: config.mm.baseUrl,
token: config.mm.token,
logger: logger.child({ module: 'status-box' }),
throttleMs: config.throttleMs,
maxMessageChars: config.maxMessageChars,
maxRetries: config.maxRetries,
maxSockets: config.mm.maxSockets,
});
// StatusWatcher
const watcher = new StatusWatcher({
transcriptDir: config.transcriptDir,
idleTimeoutS: config.idleTimeoutS,
logger: logger.child({ module: 'status-watcher' }),
});
// SessionMonitor
const monitor = new SessionMonitor({
transcriptDir: config.transcriptDir,
pollMs: config.sessionPollMs,
defaultChannel: config.defaultChannel,
logger: logger.child({ module: 'session-monitor' }),
});
// Health server
const healthServer = new HealthServer({
port: config.healthPort,
logger: logger.child({ module: 'health' }),
getMetrics: () => ({
...globalMetrics,
...sharedStatusBox.getMetrics(),
activeSessions: activeBoxes.size,
}),
});
// ---- Session Added ----
monitor.on('session-added', async (info) => {
const { sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId } = info;
// Skip if no channel
if (!channelId) {
logger.debug({ sessionKey }, 'No channel for session — skipping');
return;
}
// Enforce MAX_ACTIVE_SESSIONS
if (activeBoxes.size >= config.maxActiveSessions) {
logger.warn(
{ sessionKey, maxActiveSessions: config.maxActiveSessions },
'Max active sessions reached — dropping session',
);
return;
}
// Sub-agent: skip creating own post (embedded in parent)
if (spawnedBy && activeBoxes.has(spawnedBy)) {
const parentBox = activeBoxes.get(spawnedBy);
// Link child to parent's session state
const childState = {
sessionKey,
transcriptFile,
spawnedBy,
parentPostId: parentBox.postId,
channelId,
depth: info.spawnDepth || 0,
agentId,
};
parentBox.children = parentBox.children || new Map();
parentBox.children.set(sessionKey, childState);
// Register in watcher
watcher.addSession(sessionKey, transcriptFile, {
agentId,
depth: info.spawnDepth || 1,
});
logger.info({ sessionKey, parent: spawnedBy }, 'Sub-agent linked to parent');
return;
}
let postId;
// Check for existing post (restart recovery)
const saved = savedOffsets[sessionKey]; // eslint-disable-line security/detect-object-injection
if (saved) {
// Try to find existing post in channel history
postId = await findExistingPost(sharedStatusBox, channelId, sessionKey, logger);
}
// Create new post if none found
if (!postId) {
try {
const initialText = buildInitialText(agentId, sessionKey);
postId = await sharedStatusBox.createPost(channelId, initialText, rootPostId);
logger.info({ sessionKey, postId, channelId }, 'Created status box');
} catch (err) {
logger.error({ sessionKey, err }, 'Failed to create status post');
globalMetrics.lastError = err.message;
return;
}
}
activeBoxes.set(sessionKey, {
postId,
channelId,
agentId,
rootPostId,
children: new Map(),
});
globalMetrics.activeSessions = activeBoxes.size;
// Register in watcher
const initialState = saved
? { lastOffset: saved.lastOffset, startTime: saved.startTime, agentId }
: { agentId };
watcher.addSession(sessionKey, transcriptFile, initialState);
});
// ---- Session Removed ----
monitor.on('session-removed', (sessionKey) => {
// Don't immediately remove — let idle detection handle final flush
logger.debug({ sessionKey }, 'Session removed from sessions.json');
});
// ---- Session Update (from watcher) ----
watcher.on('session-update', (sessionKey, state) => {
const box = activeBoxes.get(sessionKey);
if (!box) {
// Sub-agent: update parent
updateParentWithChild(activeBoxes, watcher, sharedStatusBox, sessionKey, state, logger);
return;
}
// Build status text
const text = buildStatusText(box, state, activeBoxes, watcher, sessionKey);
sharedStatusBox.updatePost(box.postId, text).catch((err) => {
logger.error({ sessionKey, err }, 'Failed to update status post');
globalMetrics.lastError = err.message;
globalMetrics.updatesFailed++;
});
// Persist offsets periodically
saveOffsets(config.offsetFile, watcher.sessions);
});
// ---- Session Idle (from watcher) ----
watcher.on('session-idle', async (sessionKey, state) => {
const box = activeBoxes.get(sessionKey);
if (!box) {
// Sub-agent completed
updateParentWithChild(activeBoxes, watcher, sharedStatusBox, sessionKey, state, logger);
return;
}
// Check all children are complete before marking done
const allChildrenDone = checkChildrenComplete(box, watcher);
if (!allChildrenDone) {
logger.debug({ sessionKey }, 'Parent waiting for child sessions to complete');
return;
}
// Final update with done status
const doneState = { ...state, status: 'done' };
const text = buildStatusText(box, doneState, activeBoxes, watcher, sessionKey);
try {
await sharedStatusBox.forceFlush(box.postId);
await sharedStatusBox.updatePost(box.postId, text);
logger.info({ sessionKey, postId: box.postId }, 'Session complete — status box updated');
} catch (err) {
logger.error({ sessionKey, err }, 'Failed to update final status');
}
// Clean up
activeBoxes.delete(sessionKey);
watcher.removeSession(sessionKey);
globalMetrics.activeSessions = activeBoxes.size;
// Persist final offsets
saveOffsets(config.offsetFile, watcher.sessions);
});
// ---- Start all subsystems ----
await healthServer.start();
try {
watcher.start();
} catch (err) {
logger.warn({ err }, 'fs.watch failed — creating transcript dir');
// eslint-disable-next-line security/detect-non-literal-fs-filename
fs.mkdirSync(config.transcriptDir, { recursive: true });
watcher.start();
}
monitor.start();
logger.info('Status watcher daemon ready');
// ---- Graceful shutdown ----
let shuttingDown = false;
async function shutdown(signal) {
if (shuttingDown) return;
shuttingDown = true;
logger.info({ signal }, 'Shutting down gracefully');
// Stop accepting new sessions
monitor.stop();
watcher.stop();
// Mark all active boxes as interrupted
const updates = [];
for (const [sessionKey, box] of activeBoxes) {
const state = watcher.getSessionState(sessionKey) || {
sessionKey,
status: 'interrupted',
startTime: Date.now(),
lines: [],
agentId: box.agentId,
depth: 0,
tokenCount: 0,
children: [],
};
const intState = { ...state, status: 'interrupted' };
const text = buildStatusText(box, intState, activeBoxes, watcher, sessionKey);
updates.push(
sharedStatusBox
.updatePost(box.postId, text)
.catch((e) => logger.error({ sessionKey, e }, 'Shutdown update failed')),
);
}
await Promise.allSettled(updates);
await sharedStatusBox.flushAll();
// Cleanup
await healthServer.stop();
sharedStatusBox.destroy();
removePidFile(config.pidFile);
logger.info('Shutdown complete');
process.exit(0);
}
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
// Offset persistence interval (every 30s)
setInterval(() => {
saveOffsets(config.offsetFile, watcher.sessions);
}, 30000);
}
// ---- Helper functions ----
function buildInitialText(agentId, sessionKey) {
const { format } = require('./status-formatter');
return format({
sessionKey,
status: 'active',
startTime: Date.now(),
lines: [],
agentId,
depth: 0,
tokenCount: 0,
children: [],
});
}
function buildStatusText(box, state, activeBoxes, watcher, _sessionKey) {
const { format } = require('./status-formatter');
// Build child states for nesting
const childStates = [];
if (box.children && box.children.size > 0) {
for (const [childKey] of box.children) {
const childWatcherState = watcher.getSessionState(childKey);
if (childWatcherState) {
childStates.push({ ...childWatcherState, depth: 1 });
}
}
}
return format({ ...state, children: childStates });
}
function updateParentWithChild(activeBoxes, watcher, statusBox, childKey, childState, logger) {
// Find parent
for (const [parentKey, box] of activeBoxes) {
if (box.children && box.children.has(childKey)) {
const parentState = watcher.getSessionState(parentKey);
if (!parentState) return;
const text = buildStatusText(box, parentState, activeBoxes, watcher, parentKey);
statusBox
.updatePost(box.postId, text)
.catch((err) => logger.error({ parentKey, childKey, err }, 'Failed to update parent'));
return;
}
}
}
function checkChildrenComplete(box, watcher) {
if (!box.children || box.children.size === 0) return true;
for (const [childKey] of box.children) {
const childState = watcher.getSessionState(childKey);
if (childState && childState.status === 'active') return false;
}
return true;
}
async function findExistingPost(statusBox, channelId, sessionKey, logger) {
// Search channel history for a post with our marker
// This uses the Mattermost search API
const marker = makeMarker(sessionKey);
try {
// Use the internal _apiCall method to search posts
const result = await statusBox._apiCallWithRetry('POST', '/api/v4/posts/search', {
channel_id: channelId,
terms: marker,
is_or_search: false,
});
if (result && result.posts) {
for (const post of Object.values(result.posts)) {
if (post.message && post.message.includes(marker)) {
logger.info(
{ sessionKey, postId: post.id },
'Found existing status post (restart recovery)',
);
return post.id;
}
}
}
} catch (_e) {
/* search failed — create new post */
}
return null;
}
// ---- Stop command ----
function stopDaemon() {
// Need to read config for pidFile location
process.env.MM_BOT_TOKEN = process.env.MM_BOT_TOKEN || 'placeholder';
let config;
try {
config = getConfig();
} catch (_e) {
config = { pidFile: '/tmp/status-watcher.pid' };
}
const pid = readPidFile(config.pidFile);
if (!pid) {
console.log('Daemon not running (no PID file)');
return;
}
if (!isProcessRunning(pid)) {
console.log(`Daemon not running (PID ${pid} not found)`);
removePidFile(config.pidFile);
return;
}
console.log(`Stopping daemon (PID ${pid})...`);
process.kill(pid, 'SIGTERM');
}
// ---- Status command ----
function daemonStatus() {
process.env.MM_BOT_TOKEN = process.env.MM_BOT_TOKEN || 'placeholder';
let config;
try {
config = getConfig();
} catch (_e) {
config = { pidFile: '/tmp/status-watcher.pid', healthPort: 9090 };
}
const pid = readPidFile(config.pidFile);
if (!pid) {
console.log('Status: stopped');
return;
}
if (!isProcessRunning(pid)) {
console.log(`Status: stopped (stale PID ${pid})`);
return;
}
console.log(`Status: running (PID ${pid})`);
console.log(`Health: http://localhost:${config.healthPort}/health`);
}