feat: Mattermost plugin + daemon integration (Phases 2-5)

Plugin (Go server + React webapp):
- Custom post type 'custom_livestatus' with terminal-style rendering
- WebSocket broadcasts for real-time updates (no PUT, no '(edited)')
- KV store for session persistence across reconnects
- Shared secret auth for daemon-to-plugin communication
- Auto-scroll terminal with user scroll override
- Collapsible sub-agent sections
- Theme-compatible CSS (light/dark)

Daemon integration:
- PluginClient for structured data push to plugin
- Auto-detection: GET /health on startup + periodic re-check
- Graceful fallback: if plugin unavailable, uses REST API (PUT)
- Per-session mode tracking: sessions created via plugin stay on plugin
- Mid-session fallback: if plugin update fails, auto-switch to REST

Plugin deployed and active on Mattermost v11.4.0.
This commit is contained in:
sol
2026-03-07 22:11:06 +00:00
parent 868574d939
commit c724e57276
9 changed files with 282 additions and 1958 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
node_modules/ node_modules/
coverage/ coverage/
*.log *.log
plugin/server/dist/
plugin/webapp/node_modules/

View File

@@ -77,7 +77,7 @@ Sub-agent lifecycle hooks (`subagent_spawned`, `subagent_ended`) are channel plu
## Discovery 4: Mattermost API ## Discovery 4: Mattermost API
- `PostEditTimeLimit = -1` — unlimited edits on this server - `PostEditTimeLimit = -1` — unlimited edits on this server
- Bot token: `n73636eit7bg3rgmpsj693mwno` (default/main bot account) - Bot token: `<redacted>` (default/main bot account, set via MM_BOT_TOKEN env var)
- Multiple bot accounts available per agent (see openclaw.json `accounts`) - Multiple bot accounts available per agent (see openclaw.json `accounts`)
- API base: `https://slack.solio.tech/api/v4` - API base: `https://slack.solio.tech/api/v4`
- Post update: `PUT /api/v4/posts/{id}` — no time limit, no count limit - Post update: `PUT /api/v4/posts/{id}` — no time limit, no count limit

3
plugin/assets/icon.svg Normal file
View File

@@ -0,0 +1,3 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" fill="#166de0">
<path d="M20 4H4c-1.1 0-2 .9-2 2v12c0 1.1.9 2 2 2h16c1.1 0 2-.9 2-2V6c0-1.1-.9-2-2-2zm-1 14H5V6h14v12zM7 9h2v2H7zm0 4h2v2H7zm4-4h6v2h-6zm0 4h6v2h-6z"/>
</svg>

After

Width:  |  Height:  |  Size: 237 B

Binary file not shown.

1
plugin/webapp/dist/main.js vendored Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@@ -82,6 +82,15 @@ function buildConfig() {
// Feature flags // Feature flags
enableFsWatch: getEnvBool('ENABLE_FS_WATCH', true), enableFsWatch: getEnvBool('ENABLE_FS_WATCH', true),
// Mattermost plugin integration (optional)
// When configured, updates are sent to the plugin instead of using PUT on posts
plugin: {
url: getEnv('PLUGIN_URL', null), // e.g. https://slack.solio.tech/plugins/com.openclaw.livestatus
secret: getEnv('PLUGIN_SECRET', null),
enabled: getEnvBool('PLUGIN_ENABLED', true),
detectIntervalMs: getEnvInt('PLUGIN_DETECT_INTERVAL_MS', 60000),
},
}; };
// Validate MM base URL // Validate MM base URL

161
src/plugin-client.js Normal file
View File

@@ -0,0 +1,161 @@
'use strict';
/**
* plugin-client.js — HTTP client for the OpenClaw Live Status Mattermost plugin.
*
* When the plugin is available, the daemon sends structured data to the plugin
* instead of using PUT to update Mattermost posts directly. This eliminates
* the "(edited)" label, enables WebSocket real-time rendering, and avoids
* Mattermost's post API rate limits during streaming updates.
*
* Fallback: if the plugin is unavailable, watcher-manager falls back to the
* standard REST API (PUT post updates via StatusBox).
*/
var https = require('https');
var http = require('http');
var DEFAULT_TIMEOUT_MS = 5000;
var DEFAULT_MAX_SOCKETS = 4;
function PluginClient(opts) {
this.pluginUrl = opts.pluginUrl; // e.g. https://slack.solio.tech/plugins/com.openclaw.livestatus
this.secret = opts.secret;
this.logger = opts.logger || null;
this.timeoutMs = opts.timeoutMs || DEFAULT_TIMEOUT_MS;
var parsedUrl = new URL(this.pluginUrl);
this.hostname = parsedUrl.hostname;
this.port = parsedUrl.port
? parseInt(parsedUrl.port, 10)
: parsedUrl.protocol === 'https:' ? 443 : 80;
this.basePath = parsedUrl.pathname.replace(/\/$/, '');
this.isHttps = parsedUrl.protocol === 'https:';
this.agent = new (this.isHttps ? https : http).Agent({
keepAlive: true,
maxSockets: opts.maxSockets || DEFAULT_MAX_SOCKETS,
});
}
/**
* Check if the plugin is healthy and available.
* @returns {Promise<boolean>}
*/
PluginClient.prototype.isHealthy = function () {
var self = this;
return this._request('GET', '/api/v1/health', null)
.then(function (data) {
return data && data.status === 'healthy';
})
.catch(function () {
return false;
});
};
/**
* Create a new session via the plugin (returns post_id).
* @param {string} sessionKey
* @param {string} channelId
* @param {string} rootId
* @param {string} agentId
* @returns {Promise<string>} post_id
*/
PluginClient.prototype.createSession = function (sessionKey, channelId, rootId, agentId) {
return this._request('POST', '/api/v1/sessions', {
session_key: sessionKey,
channel_id: channelId,
root_id: rootId || '',
agent_id: agentId,
}).then(function (data) {
return data.post_id;
});
};
/**
* Update a session with new status data (WebSocket broadcast, no post edit).
* @param {string} sessionKey
* @param {object} data - { status, lines, elapsed_ms, token_count, children, start_time_ms }
* @returns {Promise<void>}
*/
PluginClient.prototype.updateSession = function (sessionKey, data) {
var encodedKey = encodeURIComponent(sessionKey);
return this._request('PUT', '/api/v1/sessions/' + encodedKey, data).then(function () {});
};
/**
* Complete/delete a session (marks as done, broadcasts final state).
* @param {string} sessionKey
* @returns {Promise<void>}
*/
PluginClient.prototype.deleteSession = function (sessionKey) {
var encodedKey = encodeURIComponent(sessionKey);
return this._request('DELETE', '/api/v1/sessions/' + encodedKey, null).then(function () {});
};
/**
* Low-level HTTP request to the plugin.
* @private
*/
PluginClient.prototype._request = function (method, apiPath, body) {
var self = this;
var transport = this.isHttps ? https : http;
var bodyStr = body ? JSON.stringify(body) : null;
return new Promise(function (resolve, reject) {
var reqOpts = {
hostname: self.hostname,
port: self.port,
path: self.basePath + apiPath,
method: method,
agent: self.agent,
headers: {
'Authorization': 'Bearer ' + self.secret,
'Content-Type': 'application/json',
},
timeout: self.timeoutMs,
};
if (bodyStr) {
reqOpts.headers['Content-Length'] = Buffer.byteLength(bodyStr);
}
var req = transport.request(reqOpts, function (res) {
var data = '';
res.on('data', function (chunk) { data += chunk; });
res.on('end', function () {
if (res.statusCode >= 200 && res.statusCode < 300) {
try {
resolve(data ? JSON.parse(data) : {});
} catch (_e) {
resolve({});
}
} else {
var msg = 'HTTP ' + res.statusCode;
try { msg = JSON.parse(data).error || msg; } catch (_e) {}
var err = new Error(msg);
err.statusCode = res.statusCode;
reject(err);
}
});
});
req.on('timeout', function () {
req.destroy();
reject(new Error('Plugin request timed out'));
});
req.on('error', reject);
if (bodyStr) req.write(bodyStr);
req.end();
});
};
/**
* Destroy the HTTP agent (cleanup).
*/
PluginClient.prototype.destroy = function () {
this.agent.destroy();
};
module.exports = { PluginClient };

View File

@@ -24,6 +24,7 @@ const { getLogger } = require('./logger');
const { SessionMonitor } = require('./session-monitor'); const { SessionMonitor } = require('./session-monitor');
const { StatusWatcher } = require('./status-watcher'); const { StatusWatcher } = require('./status-watcher');
const { StatusBox } = require('./status-box'); const { StatusBox } = require('./status-box');
const { PluginClient } = require('./plugin-client');
// status-formatter is used inline via require() in helpers // status-formatter is used inline via require() in helpers
const { HealthServer } = require('./health'); const { HealthServer } = require('./health');
const { loadLabels } = require('./tool-labels'); const { loadLabels } = require('./tool-labels');
@@ -159,6 +160,36 @@ async function startDaemon() {
maxSockets: config.mm.maxSockets, maxSockets: config.mm.maxSockets,
}); });
// Plugin client (optional — enhances rendering via WebSocket instead of PUT)
var pluginClient = null;
var usePlugin = false;
if (config.plugin && config.plugin.enabled && config.plugin.url && config.plugin.secret) {
pluginClient = new PluginClient({
pluginUrl: config.plugin.url,
secret: config.plugin.secret,
logger: logger.child({ module: 'plugin-client' }),
});
// Initial plugin detection
pluginClient.isHealthy().then(function (healthy) {
usePlugin = healthy;
logger.info({ usePlugin, url: config.plugin.url }, healthy
? 'Plugin detected — using WebSocket rendering mode'
: 'Plugin not available — using REST API fallback');
});
// Periodic re-detection
setInterval(function () {
pluginClient.isHealthy().then(function (healthy) {
if (healthy !== usePlugin) {
usePlugin = healthy;
logger.info({ usePlugin }, healthy ? 'Plugin came online' : 'Plugin went offline — fallback to REST API');
}
});
}, config.plugin.detectIntervalMs || 60000);
}
// StatusWatcher // StatusWatcher
const watcher = new StatusWatcher({ const watcher = new StatusWatcher({
transcriptDir: config.transcriptDir, transcriptDir: config.transcriptDir,
@@ -182,6 +213,7 @@ async function startDaemon() {
...globalMetrics, ...globalMetrics,
...sharedStatusBox.getMetrics(), ...sharedStatusBox.getMetrics(),
activeSessions: activeBoxes.size, activeSessions: activeBoxes.size,
pluginEnabled: usePlugin,
}), }),
}); });
@@ -237,9 +269,16 @@ async function startDaemon() {
// Create new post if none found // Create new post if none found
if (!postId) { if (!postId) {
try { try {
const initialText = buildInitialText(agentId, sessionKey); if (usePlugin && pluginClient) {
// Plugin mode: create custom_livestatus post via plugin
postId = await pluginClient.createSession(sessionKey, channelId, rootPostId, agentId);
logger.info({ sessionKey, postId, channelId, mode: 'plugin' }, 'Created status box via plugin');
} else {
// REST API fallback: create regular post with formatted text
var initialText = buildInitialText(agentId, sessionKey);
postId = await sharedStatusBox.createPost(channelId, initialText, rootPostId); postId = await sharedStatusBox.createPost(channelId, initialText, rootPostId);
logger.info({ sessionKey, postId, channelId }, 'Created status box'); logger.info({ sessionKey, postId, channelId, mode: 'rest' }, 'Created status box via REST API');
}
} catch (err) { } catch (err) {
logger.error({ sessionKey, err }, 'Failed to create status post'); logger.error({ sessionKey, err }, 'Failed to create status post');
globalMetrics.lastError = err.message; globalMetrics.lastError = err.message;
@@ -252,6 +291,7 @@ async function startDaemon() {
channelId, channelId,
agentId, agentId,
rootPostId, rootPostId,
usePlugin: usePlugin && !!pluginClient, // track which mode this session uses
children: new Map(), children: new Map(),
}); });
globalMetrics.activeSessions = activeBoxes.size; globalMetrics.activeSessions = activeBoxes.size;
@@ -287,17 +327,37 @@ async function startDaemon() {
const box = activeBoxes.get(sessionKey); const box = activeBoxes.get(sessionKey);
if (!box) { if (!box) {
// Sub-agent: update parent // Sub-agent: update parent
updateParentWithChild(activeBoxes, watcher, sharedStatusBox, sessionKey, state, logger); updateParentWithChild(activeBoxes, watcher, sharedStatusBox, pluginClient, sessionKey, state, logger);
return; return;
} }
// Build status text if (box.usePlugin && pluginClient) {
const text = buildStatusText(box, state, activeBoxes, watcher, sessionKey); // Plugin mode: send structured data (WebSocket broadcast, no post edit)
sharedStatusBox.updatePost(box.postId, text).catch((err) => { pluginClient.updateSession(sessionKey, {
status: state.status,
lines: state.lines,
elapsed_ms: Date.now() - state.startTime,
token_count: state.tokenCount || 0,
children: (state.children || []).map(function (c) {
return { session_key: c.sessionKey, agent_id: c.agentId, status: c.status, lines: c.lines || [], elapsed_ms: Date.now() - (c.startTime || Date.now()), token_count: c.tokenCount || 0 };
}),
start_time_ms: state.startTime,
}).catch(function (err) {
// If plugin rejects (e.g. session not found), fall back to REST for this session
logger.warn({ sessionKey, err: err.message }, 'Plugin update failed — falling back to REST');
box.usePlugin = false;
var fallbackText = buildStatusText(box, state, activeBoxes, watcher, sessionKey);
sharedStatusBox.updatePost(box.postId, fallbackText).catch(function () {});
});
} else {
// REST API fallback: format text and PUT update post
var text = buildStatusText(box, state, activeBoxes, watcher, sessionKey);
sharedStatusBox.updatePost(box.postId, text).catch(function (err) {
logger.error({ sessionKey, err }, 'Failed to update status post'); logger.error({ sessionKey, err }, 'Failed to update status post');
globalMetrics.lastError = err.message; globalMetrics.lastError = err.message;
globalMetrics.updatesFailed++; globalMetrics.updatesFailed++;
}); });
}
// Persist offsets periodically // Persist offsets periodically
saveOffsets(config.offsetFile, watcher.sessions); saveOffsets(config.offsetFile, watcher.sessions);
@@ -308,7 +368,7 @@ async function startDaemon() {
const box = activeBoxes.get(sessionKey); const box = activeBoxes.get(sessionKey);
if (!box) { if (!box) {
// Sub-agent completed // Sub-agent completed
updateParentWithChild(activeBoxes, watcher, sharedStatusBox, sessionKey, state, logger); updateParentWithChild(activeBoxes, watcher, sharedStatusBox, pluginClient, sessionKey, state, logger);
return; return;
} }
@@ -321,12 +381,19 @@ async function startDaemon() {
// Final update with done status // Final update with done status
const doneState = { ...state, status: 'done' }; const doneState = { ...state, status: 'done' };
const text = buildStatusText(box, doneState, activeBoxes, watcher, sessionKey);
try { try {
if (box.usePlugin && pluginClient) {
// Plugin mode: mark session complete
await pluginClient.deleteSession(sessionKey);
logger.info({ sessionKey, postId: box.postId, mode: 'plugin' }, 'Session complete via plugin');
} else {
// REST API fallback
var text = buildStatusText(box, doneState, activeBoxes, watcher, sessionKey);
await sharedStatusBox.forceFlush(box.postId); await sharedStatusBox.forceFlush(box.postId);
await sharedStatusBox.updatePost(box.postId, text); await sharedStatusBox.updatePost(box.postId, text);
logger.info({ sessionKey, postId: box.postId }, 'Session complete — status box updated'); logger.info({ sessionKey, postId: box.postId, mode: 'rest' }, 'Session complete — status box updated');
}
} catch (err) { } catch (err) {
logger.error({ sessionKey, err }, 'Failed to update final status'); logger.error({ sessionKey, err }, 'Failed to update final status');
} }
@@ -396,6 +463,7 @@ async function startDaemon() {
// Cleanup // Cleanup
await healthServer.stop(); await healthServer.stop();
sharedStatusBox.destroy(); sharedStatusBox.destroy();
if (pluginClient) pluginClient.destroy();
removePidFile(config.pidFile); removePidFile(config.pidFile);
logger.info('Shutdown complete'); logger.info('Shutdown complete');
@@ -461,17 +529,31 @@ function buildStatusText(box, state, activeBoxes, watcher, _sessionKey) {
return format({ ...state, children: childStates }); return format({ ...state, children: childStates });
} }
function updateParentWithChild(activeBoxes, watcher, statusBox, childKey, childState, logger) { function updateParentWithChild(activeBoxes, watcher, statusBox, pluginClient, childKey, childState, logger) {
// Find parent // Find parent
for (const [parentKey, box] of activeBoxes) { for (var entry of activeBoxes.entries()) {
var parentKey = entry[0];
var box = entry[1];
if (box.children && box.children.has(childKey)) { if (box.children && box.children.has(childKey)) {
const parentState = watcher.getSessionState(parentKey); var parentState = watcher.getSessionState(parentKey);
if (!parentState) return; if (!parentState) return;
const text = buildStatusText(box, parentState, activeBoxes, watcher, parentKey); if (box.usePlugin && pluginClient) {
// Plugin mode: send structured update for parent
pluginClient.updateSession(parentKey, {
status: parentState.status,
lines: parentState.lines,
elapsed_ms: Date.now() - parentState.startTime,
token_count: parentState.tokenCount || 0,
children: [],
start_time_ms: parentState.startTime,
}).catch(function (err) { logger.error({ parentKey, childKey, err: err.message }, 'Failed to update parent via plugin'); });
} else {
var text = buildStatusText(box, parentState, activeBoxes, watcher, parentKey);
statusBox statusBox
.updatePost(box.postId, text) .updatePost(box.postId, text)
.catch((err) => logger.error({ parentKey, childKey, err }, 'Failed to update parent')); .catch(function (err) { logger.error({ parentKey, childKey, err }, 'Failed to update parent'); });
}
return; return;
} }
} }