Compare commits
9 Commits
backup/wor
...
v5.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f4a66f9644 | ||
|
|
07c6fd701b | ||
|
|
5c4a665fb9 | ||
|
|
f1d3ae9c4c | ||
|
|
3fbd46c2d2 | ||
|
|
e483b0bc42 | ||
|
|
888e8af784 | ||
|
|
897abf0a9a | ||
|
|
0b39b39f3b |
@@ -122,6 +122,16 @@ func (p *Plugin) handleCreateSession(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Validate KV key length — Mattermost enforces a 50-char limit.
|
||||||
|
// Encoded key = kvPrefix (11 chars) + url.PathEscape(sessionKey).
|
||||||
|
// Exceeding the limit causes KVSet to silently succeed but never store data.
|
||||||
|
if len(encodeKey(req.SessionKey)) > 50 {
|
||||||
|
writeJSON(w, http.StatusBadRequest, map[string]string{
|
||||||
|
"error": fmt.Sprintf("session_key too long: encoded key length %d exceeds 50-char KV limit", len(encodeKey(req.SessionKey))),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Check max active sessions
|
// Check max active sessions
|
||||||
config := p.getConfiguration()
|
config := p.getConfiguration()
|
||||||
allSessions, _ := p.store.ListAllSessions()
|
allSessions, _ := p.store.ListAllSessions()
|
||||||
|
|||||||
@@ -21,6 +21,9 @@ type Plugin struct {
|
|||||||
// store wraps KV store operations for session persistence.
|
// store wraps KV store operations for session persistence.
|
||||||
store *Store
|
store *Store
|
||||||
|
|
||||||
|
// botUserIDLock synchronizes access to botUserID.
|
||||||
|
botUserIDLock sync.RWMutex
|
||||||
|
|
||||||
// botUserID is the plugin's bot user ID (created on activation).
|
// botUserID is the plugin's bot user ID (created on activation).
|
||||||
botUserID string
|
botUserID string
|
||||||
|
|
||||||
@@ -41,7 +44,9 @@ func (p *Plugin) OnActivate() error {
|
|||||||
if appErr != nil {
|
if appErr != nil {
|
||||||
p.API.LogWarn("Failed to ensure bot user", "error", appErr.Error())
|
p.API.LogWarn("Failed to ensure bot user", "error", appErr.Error())
|
||||||
} else {
|
} else {
|
||||||
|
p.botUserIDLock.Lock()
|
||||||
p.botUserID = botID
|
p.botUserID = botID
|
||||||
|
p.botUserIDLock.Unlock()
|
||||||
p.API.LogInfo("Plugin bot user ensured", "botUserID", botID)
|
p.API.LogInfo("Plugin bot user ensured", "botUserID", botID)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -75,8 +80,10 @@ func (p *Plugin) sessionCleanupLoop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getBotUserID returns the plugin's bot user ID.
|
// getBotUserID returns the plugin's bot user ID (thread-safe).
|
||||||
func (p *Plugin) getBotUserID() string {
|
func (p *Plugin) getBotUserID() string {
|
||||||
|
p.botUserIDLock.RLock()
|
||||||
|
defer p.botUserIDLock.RUnlock()
|
||||||
return p.botUserID
|
return p.botUserID
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -93,6 +100,10 @@ func (p *Plugin) OnDeactivate() error {
|
|||||||
p.API.LogWarn("Failed to list sessions on deactivate", "error", err.Error())
|
p.API.LogWarn("Failed to list sessions on deactivate", "error", err.Error())
|
||||||
} else {
|
} else {
|
||||||
for _, s := range sessions {
|
for _, s := range sessions {
|
||||||
|
// Skip sessions already in a terminal state — do not overwrite done/error
|
||||||
|
if s.Status == "done" || s.Status == "error" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
s.Status = "interrupted"
|
s.Status = "interrupted"
|
||||||
_ = p.store.SaveSession(s.SessionKey, s)
|
_ = p.store.SaveSession(s.SessionKey, s)
|
||||||
p.broadcastUpdate(s.ChannelID, s)
|
p.broadcastUpdate(s.ChannelID, s)
|
||||||
|
|||||||
@@ -61,6 +61,13 @@ class LiveStatusPlugin {
|
|||||||
|
|
||||||
window.__livestatus_updates[data.post_id] = update;
|
window.__livestatus_updates[data.post_id] = update;
|
||||||
|
|
||||||
|
// Evict completed sessions from the update cache after 60s to prevent unbounded growth
|
||||||
|
if (data.status === 'done' || data.status === 'error' || data.status === 'interrupted') {
|
||||||
|
setTimeout(() => {
|
||||||
|
delete window.__livestatus_updates[data.post_id];
|
||||||
|
}, 60000);
|
||||||
|
}
|
||||||
|
|
||||||
// Notify post-specific listeners
|
// Notify post-specific listeners
|
||||||
const listeners = window.__livestatus_listeners[data.post_id];
|
const listeners = window.__livestatus_listeners[data.post_id];
|
||||||
if (listeners) {
|
if (listeners) {
|
||||||
@@ -82,7 +89,16 @@ class LiveStatusPlugin {
|
|||||||
}
|
}
|
||||||
|
|
||||||
uninitialize(): void {
|
uninitialize(): void {
|
||||||
// Cleanup handled by Mattermost plugin framework
|
// Clear global listener and update stores to prevent accumulation across reloads
|
||||||
|
window.__livestatus_listeners = {};
|
||||||
|
window.__livestatus_updates = {};
|
||||||
|
|
||||||
|
// Unregister the custom post type component if it was registered
|
||||||
|
if (this.postTypeComponentId) {
|
||||||
|
// registry is not available here — Mattermost framework cleans up on deactivate.
|
||||||
|
// Clearing postTypeComponentId prevents stale references.
|
||||||
|
this.postTypeComponentId = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ function buildConfig() {
|
|||||||
},
|
},
|
||||||
|
|
||||||
// Transcript directory (OpenClaw agents)
|
// Transcript directory (OpenClaw agents)
|
||||||
transcriptDir: getEnv('TRANSCRIPT_DIR', '/home/node/.openclaw/agents'),
|
transcriptDir: getEnv('TRANSCRIPT_DIR', '/root/.openclaw/agents'),
|
||||||
|
|
||||||
// Timing
|
// Timing
|
||||||
throttleMs: getEnvInt('THROTTLE_MS', 500),
|
throttleMs: getEnvInt('THROTTLE_MS', 500),
|
||||||
|
|||||||
@@ -311,6 +311,10 @@ class StatusBox extends EventEmitter {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
req.setTimeout(30000, () => {
|
||||||
|
req.destroy(new Error('HTTP request timed out after 30s'));
|
||||||
|
});
|
||||||
|
|
||||||
req.on('error', reject);
|
req.on('error', reject);
|
||||||
if (bodyStr) req.write(bodyStr);
|
if (bodyStr) req.write(bodyStr);
|
||||||
req.end();
|
req.end();
|
||||||
|
|||||||
@@ -82,6 +82,10 @@ class StatusWatcher extends EventEmitter {
|
|||||||
tokenCount: 0,
|
tokenCount: 0,
|
||||||
children: [],
|
children: [],
|
||||||
idleTimer: null,
|
idleTimer: null,
|
||||||
|
_lineBuffer: '',
|
||||||
|
_lockActive: false,
|
||||||
|
_lockExists: undefined,
|
||||||
|
_lockPollTimer: null,
|
||||||
};
|
};
|
||||||
|
|
||||||
this.sessions.set(sessionKey, state);
|
this.sessions.set(sessionKey, state);
|
||||||
@@ -98,6 +102,9 @@ class StatusWatcher extends EventEmitter {
|
|||||||
|
|
||||||
// Start file polling as fallback (fs.watch may not work on bind mounts in Docker)
|
// Start file polling as fallback (fs.watch may not work on bind mounts in Docker)
|
||||||
this._startFilePoll(sessionKey, state);
|
this._startFilePoll(sessionKey, state);
|
||||||
|
|
||||||
|
// Start lock file poll fallback (inotify misses on Docker bind mounts)
|
||||||
|
this._startLockPoll(sessionKey, state);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -130,6 +137,60 @@ class StatusWatcher extends EventEmitter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start polling a lock file for changes (fallback for fs.watch on Docker bind mounts).
|
||||||
|
* Idempotent with fs.watch — if fs.watch already fired, _lockActive is already updated
|
||||||
|
* and no duplicate events are emitted.
|
||||||
|
* @private
|
||||||
|
*/
|
||||||
|
_startLockPoll(sessionKey, state) {
|
||||||
|
var self = this;
|
||||||
|
var lockFile = state.transcriptFile + '.lock';
|
||||||
|
|
||||||
|
state._lockPollTimer = setInterval(function () {
|
||||||
|
try {
|
||||||
|
var exists = fs.existsSync(lockFile);
|
||||||
|
|
||||||
|
// First poll: just initialize state, don't emit
|
||||||
|
if (state._lockExists === undefined) {
|
||||||
|
state._lockExists = exists;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var changed = exists !== state._lockExists;
|
||||||
|
state._lockExists = exists;
|
||||||
|
|
||||||
|
if (!changed) return;
|
||||||
|
|
||||||
|
if (exists) {
|
||||||
|
// Lock file appeared — session activated by user message
|
||||||
|
if (!state._lockActive) {
|
||||||
|
state._lockActive = true;
|
||||||
|
if (self.logger) {
|
||||||
|
self.logger.info({ sessionKey }, 'Lock poll: lock file appeared — emitting session-lock');
|
||||||
|
}
|
||||||
|
self.emit('session-lock', sessionKey);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Lock file disappeared — turn complete
|
||||||
|
if (state._lockActive) {
|
||||||
|
state._lockActive = false;
|
||||||
|
if (self.logger) {
|
||||||
|
self.logger.info({ sessionKey }, 'Lock poll: lock file removed — emitting session-lock-released');
|
||||||
|
}
|
||||||
|
self.emit('session-lock-released', sessionKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (_e) {
|
||||||
|
// Ignore stat errors (file/dir may not exist yet)
|
||||||
|
}
|
||||||
|
}, 1000);
|
||||||
|
|
||||||
|
if (self.logger) {
|
||||||
|
self.logger.info({ sessionKey }, 'Lock poll timer started');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove a session from watching.
|
* Remove a session from watching.
|
||||||
* @param {string} sessionKey
|
* @param {string} sessionKey
|
||||||
@@ -140,6 +201,7 @@ class StatusWatcher extends EventEmitter {
|
|||||||
|
|
||||||
if (state.idleTimer) clearTimeout(state.idleTimer);
|
if (state.idleTimer) clearTimeout(state.idleTimer);
|
||||||
if (state._filePollTimer) clearInterval(state._filePollTimer);
|
if (state._filePollTimer) clearInterval(state._filePollTimer);
|
||||||
|
if (state._lockPollTimer) clearInterval(state._lockPollTimer);
|
||||||
|
|
||||||
// Keep fileToSession mapping alive so fs.watch still fires for this file.
|
// Keep fileToSession mapping alive so fs.watch still fires for this file.
|
||||||
// Mark it as a "ghost" — changes trigger 'session-file-changed' so the
|
// Mark it as a "ghost" — changes trigger 'session-file-changed' so the
|
||||||
@@ -211,6 +273,10 @@ class StatusWatcher extends EventEmitter {
|
|||||||
clearInterval(state._filePollTimer);
|
clearInterval(state._filePollTimer);
|
||||||
state._filePollTimer = null;
|
state._filePollTimer = null;
|
||||||
}
|
}
|
||||||
|
if (state._lockPollTimer) {
|
||||||
|
clearInterval(state._lockPollTimer);
|
||||||
|
state._lockPollTimer = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (this.logger) this.logger.info('StatusWatcher stopped');
|
if (this.logger) this.logger.info('StatusWatcher stopped');
|
||||||
}
|
}
|
||||||
@@ -236,6 +302,12 @@ class StatusWatcher extends EventEmitter {
|
|||||||
// Lock file CREATED — gateway started processing user message.
|
// Lock file CREATED — gateway started processing user message.
|
||||||
// Earliest possible signal: fires before any JSONL write.
|
// Earliest possible signal: fires before any JSONL write.
|
||||||
if (sessionKey) {
|
if (sessionKey) {
|
||||||
|
const sessionState = this.sessions.get(sessionKey);
|
||||||
|
// Sync _lockActive so poll fallback stays idempotent
|
||||||
|
if (sessionState && !sessionState._lockActive) {
|
||||||
|
sessionState._lockActive = true;
|
||||||
|
sessionState._lockExists = true;
|
||||||
|
}
|
||||||
if (this.logger) this.logger.info({ sessionKey }, 'Lock file created — session active, triggering early reactivation');
|
if (this.logger) this.logger.info({ sessionKey }, 'Lock file created — session active, triggering early reactivation');
|
||||||
this.emit('session-lock', sessionKey);
|
this.emit('session-lock', sessionKey);
|
||||||
} else {
|
} else {
|
||||||
@@ -245,6 +317,12 @@ class StatusWatcher extends EventEmitter {
|
|||||||
// Lock file DELETED — gateway finished the turn and sent final reply.
|
// Lock file DELETED — gateway finished the turn and sent final reply.
|
||||||
// Immediate idle signal: no need to wait for cache-ttl or 60s timeout.
|
// Immediate idle signal: no need to wait for cache-ttl or 60s timeout.
|
||||||
if (sessionKey) {
|
if (sessionKey) {
|
||||||
|
const sessionState = this.sessions.get(sessionKey);
|
||||||
|
// Sync _lockActive so poll fallback stays idempotent
|
||||||
|
if (sessionState && sessionState._lockActive) {
|
||||||
|
sessionState._lockActive = false;
|
||||||
|
sessionState._lockExists = false;
|
||||||
|
}
|
||||||
if (this.logger) this.logger.info({ sessionKey }, 'Lock file deleted — turn complete, marking session done immediately');
|
if (this.logger) this.logger.info({ sessionKey }, 'Lock file deleted — turn complete, marking session done immediately');
|
||||||
this.emit('session-lock-released', sessionKey);
|
this.emit('session-lock-released', sessionKey);
|
||||||
} else {
|
} else {
|
||||||
@@ -263,12 +341,11 @@ class StatusWatcher extends EventEmitter {
|
|||||||
// Ghost watch: file changed for a completed session — signal immediate re-detection
|
// Ghost watch: file changed for a completed session — signal immediate re-detection
|
||||||
if (sessionKey.startsWith('\x00ghost:')) {
|
if (sessionKey.startsWith('\x00ghost:')) {
|
||||||
const originalKey = sessionKey.slice(7);
|
const originalKey = sessionKey.slice(7);
|
||||||
// Remove ghost so we don't fire repeatedly
|
// Do NOT delete ghost entry here — let caller clean up after pollNow confirms the session
|
||||||
this.fileToSession.delete(fullPath);
|
|
||||||
if (this.logger) {
|
if (this.logger) {
|
||||||
this.logger.info({ sessionKey: originalKey }, 'fs.watch: file change on completed session — triggering reactivation');
|
this.logger.info({ sessionKey: originalKey }, 'fs.watch: file change on completed session — triggering reactivation');
|
||||||
}
|
}
|
||||||
this.emit('session-reactivate', originalKey);
|
this.emit('session-reactivate', originalKey, fullPath);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -320,9 +397,11 @@ class StatusWatcher extends EventEmitter {
|
|||||||
|
|
||||||
state.lastOffset += bytesRead;
|
state.lastOffset += bytesRead;
|
||||||
|
|
||||||
// Parse JSONL lines
|
// Parse JSONL lines — handle partial lines at chunk boundary
|
||||||
const chunk = buffer.toString('utf8', 0, bytesRead);
|
const chunk = buffer.toString('utf8', 0, bytesRead);
|
||||||
const lines = chunk.split('\n').filter((l) => l.trim());
|
const raw = (state._lineBuffer || '') + chunk;
|
||||||
|
state._lineBuffer = raw.endsWith('\n') ? '' : raw.split('\n').pop();
|
||||||
|
const lines = raw.split('\n').slice(0, raw.endsWith('\n') ? undefined : -1).filter((l) => l.trim());
|
||||||
|
|
||||||
for (const line of lines) {
|
for (const line of lines) {
|
||||||
this._parseLine(sessionKey, state, line);
|
this._parseLine(sessionKey, state, line);
|
||||||
@@ -539,6 +618,17 @@ class StatusWatcher extends EventEmitter {
|
|||||||
const elapsed = Date.now() - state.lastActivityAt;
|
const elapsed = Date.now() - state.lastActivityAt;
|
||||||
const idleMs = this.idleTimeoutS * 1000;
|
const idleMs = this.idleTimeoutS * 1000;
|
||||||
|
|
||||||
|
// Safeguard: if pendingToolCalls is stuck > 0 for more than 30s, clamp to 0
|
||||||
|
if (state.pendingToolCalls > 0 && elapsed > 30000) {
|
||||||
|
if (this.logger) {
|
||||||
|
this.logger.warn(
|
||||||
|
{ sessionKey, pendingToolCalls: state.pendingToolCalls, elapsedS: Math.floor(elapsed / 1000) },
|
||||||
|
'_checkIdle: pendingToolCalls stuck > 30s — clamping to 0 to unblock idle detection',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
state.pendingToolCalls = 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (elapsed >= idleMs && state.pendingToolCalls === 0) {
|
if (elapsed >= idleMs && state.pendingToolCalls === 0) {
|
||||||
if (this.logger) {
|
if (this.logger) {
|
||||||
this.logger.info({ sessionKey, elapsedS: Math.floor(elapsed / 1000) }, 'Session idle');
|
this.logger.info({ sessionKey, elapsedS: Math.floor(elapsed / 1000) }, 'Session idle');
|
||||||
|
|||||||
@@ -47,8 +47,19 @@ if (cmd === 'start') {
|
|||||||
|
|
||||||
// ---- PID File helpers ----
|
// ---- PID File helpers ----
|
||||||
function writePidFile(pidFile) {
|
function writePidFile(pidFile) {
|
||||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
try {
|
||||||
fs.writeFileSync(pidFile, String(process.pid), 'utf8');
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
|
const fd = fs.openSync(pidFile, 'wx');
|
||||||
|
fs.writeSync(fd, String(process.pid));
|
||||||
|
fs.closeSync(fd);
|
||||||
|
} catch (err) {
|
||||||
|
if (err.code === 'EEXIST') {
|
||||||
|
// Another process won the race — bail out
|
||||||
|
console.error('PID file already exists — another daemon may be running');
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function readPidFile(pidFile) {
|
function readPidFile(pidFile) {
|
||||||
@@ -100,8 +111,10 @@ function saveOffsets(offsetFile, sessions) {
|
|||||||
try {
|
try {
|
||||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
fs.writeFileSync(offsetFile, JSON.stringify(offsets, null, 2), 'utf8');
|
fs.writeFileSync(offsetFile, JSON.stringify(offsets, null, 2), 'utf8');
|
||||||
} catch (_e) {
|
} catch (writeErr) {
|
||||||
/* ignore write error */
|
// Log disk-full / permission errors so they are visible in daemon logs
|
||||||
|
// eslint-disable-next-line no-console
|
||||||
|
console.warn('[saveOffsets] Failed to write offset file:', writeErr.message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,6 +128,16 @@ async function startDaemon() {
|
|||||||
const config = getConfig();
|
const config = getConfig();
|
||||||
const logger = getLogger();
|
const logger = getLogger();
|
||||||
|
|
||||||
|
// Global error handlers — prevent silent daemon death from unhandled rejections
|
||||||
|
process.on('unhandledRejection', (reason) => {
|
||||||
|
logger.error({ reason }, 'Unhandled promise rejection — shutting down');
|
||||||
|
shutdown().finally(() => process.exit(1));
|
||||||
|
});
|
||||||
|
process.on('uncaughtException', (err) => {
|
||||||
|
logger.error({ err }, 'Uncaught exception — shutting down');
|
||||||
|
shutdown().finally(() => process.exit(1));
|
||||||
|
});
|
||||||
|
|
||||||
// Check if already running
|
// Check if already running
|
||||||
const existingPid = readPidFile(config.pidFile);
|
const existingPid = readPidFile(config.pidFile);
|
||||||
if (existingPid && isProcessRunning(existingPid)) {
|
if (existingPid && isProcessRunning(existingPid)) {
|
||||||
@@ -137,6 +160,8 @@ async function startDaemon() {
|
|||||||
// Shared state
|
// Shared state
|
||||||
// Map<sessionKey, { postId, agentId, channelId, rootPostId, children: Map }>
|
// Map<sessionKey, { postId, agentId, channelId, rootPostId, children: Map }>
|
||||||
const activeBoxes = new Map();
|
const activeBoxes = new Map();
|
||||||
|
// Guard against concurrent session-added events for the same key (e.g. lock + ghost fire simultaneously)
|
||||||
|
const sessionAddInProgress = new Set();
|
||||||
|
|
||||||
// Completed sessions: Map<sessionKey, { postId, lastOffset }>
|
// Completed sessions: Map<sessionKey, { postId, lastOffset }>
|
||||||
// Tracks sessions that went idle so we can reuse their post on reactivation
|
// Tracks sessions that went idle so we can reuse their post on reactivation
|
||||||
@@ -233,12 +258,59 @@ async function startDaemon() {
|
|||||||
monitor.on('session-added', async (info) => {
|
monitor.on('session-added', async (info) => {
|
||||||
const { sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId } = info;
|
const { sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId } = info;
|
||||||
|
|
||||||
|
// Guard: prevent duplicate concurrent session-added for same key.
|
||||||
|
// Happens when lock file event + ghost watch both fire simultaneously,
|
||||||
|
// both call pollNow(), and both session-added events land before activeBoxes is updated.
|
||||||
|
if (sessionAddInProgress.has(sessionKey)) {
|
||||||
|
logger.debug({ sessionKey }, 'session-added already in progress — dedup skip');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (activeBoxes.has(sessionKey)) {
|
||||||
|
logger.debug({ sessionKey }, 'session-added for already-active session — skip');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sessionAddInProgress.add(sessionKey);
|
||||||
|
|
||||||
// Skip if no channel
|
// Skip if no channel
|
||||||
if (!channelId) {
|
if (!channelId) {
|
||||||
logger.debug({ sessionKey }, 'No channel for session — skipping');
|
logger.debug({ sessionKey }, 'No channel for session — skipping');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skip heartbeat/internal sessions (agent:main:main, agent:main:cli, etc.)
|
||||||
|
// These have no real Mattermost conversation context and produce spurious status boxes.
|
||||||
|
// A heartbeat session key ends with ':main' and has no channel/thread suffix.
|
||||||
|
if (/^agent:[^:]+:main$/.test(sessionKey) || /^agent:[^:]+:cli$/.test(sessionKey)) {
|
||||||
|
logger.debug({ sessionKey }, 'Heartbeat/internal session — skipping status box');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dedup: skip if another active session already owns this channel.
|
||||||
|
// This prevents duplicate status boxes when a threadless parent session and a
|
||||||
|
// thread-specific child session both resolve to the same MM channel/DM.
|
||||||
|
// Thread sessions (containing ':thread:') take priority over bare channel sessions.
|
||||||
|
const isThreadSession = sessionKey.includes(':thread:');
|
||||||
|
for (const [existingKey, existingBox] of activeBoxes) {
|
||||||
|
if (existingBox.channelId === channelId) {
|
||||||
|
const existingIsThread = existingKey.includes(':thread:');
|
||||||
|
if (isThreadSession && !existingIsThread) {
|
||||||
|
// New session is a thread — it takes priority. Remove the parent box.
|
||||||
|
logger.info({ sessionKey, displaced: existingKey }, 'Thread session displacing bare channel session');
|
||||||
|
activeBoxes.delete(existingKey);
|
||||||
|
watcher.removeSession(existingKey);
|
||||||
|
} else if (!isThreadSession && existingIsThread) {
|
||||||
|
// Existing session is a thread — skip this bare channel session.
|
||||||
|
logger.debug({ sessionKey, existingKey }, 'Bare channel session skipped — thread session already owns this channel');
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
// Same type — skip the newcomer, first-in wins.
|
||||||
|
logger.debug({ sessionKey, existingKey }, 'Duplicate channel session skipped');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Enforce MAX_ACTIVE_SESSIONS
|
// Enforce MAX_ACTIVE_SESSIONS
|
||||||
if (activeBoxes.size >= config.maxActiveSessions) {
|
if (activeBoxes.size >= config.maxActiveSessions) {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
@@ -317,6 +389,7 @@ async function startDaemon() {
|
|||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error({ sessionKey, err }, 'Failed to create status post');
|
logger.error({ sessionKey, err }, 'Failed to create status post');
|
||||||
globalMetrics.lastError = err.message;
|
globalMetrics.lastError = err.message;
|
||||||
|
sessionAddInProgress.delete(sessionKey);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -329,6 +402,7 @@ async function startDaemon() {
|
|||||||
usePlugin: usePlugin && !!pluginClient, // track which mode this session uses
|
usePlugin: usePlugin && !!pluginClient, // track which mode this session uses
|
||||||
children: new Map(),
|
children: new Map(),
|
||||||
});
|
});
|
||||||
|
sessionAddInProgress.delete(sessionKey);
|
||||||
globalMetrics.activeSessions = activeBoxes.size;
|
globalMetrics.activeSessions = activeBoxes.size;
|
||||||
|
|
||||||
// Register in watcher.
|
// Register in watcher.
|
||||||
@@ -373,11 +447,13 @@ async function startDaemon() {
|
|||||||
// ---- Ghost reactivation (from watcher fs.watch on completed session file) ----
|
// ---- Ghost reactivation (from watcher fs.watch on completed session file) ----
|
||||||
// Fires immediately when the transcript file changes after a session completes.
|
// Fires immediately when the transcript file changes after a session completes.
|
||||||
// Clears the completedSessions cooldown so the next monitor poll re-detects instantly.
|
// Clears the completedSessions cooldown so the next monitor poll re-detects instantly.
|
||||||
watcher.on('session-reactivate', (sessionKey) => {
|
watcher.on('session-reactivate', (sessionKey, ghostPath) => {
|
||||||
logger.info({ sessionKey }, 'Ghost watch triggered reactivation — clearing completed cooldown');
|
logger.info({ sessionKey }, 'Ghost watch triggered reactivation — clearing completed cooldown');
|
||||||
monitor.clearCompleted(sessionKey);
|
monitor.clearCompleted(sessionKey);
|
||||||
// Force an immediate poll so the session is re-added without waiting 2s
|
// Force an immediate poll so the session is re-added without waiting 2s
|
||||||
monitor.pollNow();
|
monitor.pollNow();
|
||||||
|
// Clean up ghost entry now — clearCompleted+pollNow is sufficient, ghost served its purpose
|
||||||
|
if (ghostPath) watcher.fileToSession.delete(ghostPath);
|
||||||
});
|
});
|
||||||
|
|
||||||
// ---- Lock file reactivation (earliest possible trigger) ----
|
// ---- Lock file reactivation (earliest possible trigger) ----
|
||||||
@@ -437,11 +513,17 @@ async function startDaemon() {
|
|||||||
}),
|
}),
|
||||||
start_time_ms: state.startTime,
|
start_time_ms: state.startTime,
|
||||||
}).catch(function (err) {
|
}).catch(function (err) {
|
||||||
// If plugin rejects (e.g. session not found), fall back to REST for this session
|
// Only permanently disable plugin mode for hard failures (non-retryable errors).
|
||||||
logger.warn({ sessionKey, err: err.message }, 'Plugin update failed — falling back to REST');
|
// 429 and 5xx are transient — keep plugin mode and retry on next update.
|
||||||
box.usePlugin = false;
|
if (err.statusCode === 429 || (err.statusCode >= 500 && err.statusCode < 600)) {
|
||||||
var fallbackText = buildStatusText(box, state, activeBoxes, watcher, sessionKey);
|
logger.warn({ sessionKey, statusCode: err.statusCode }, 'Plugin API transient error — keeping plugin mode, will retry next update');
|
||||||
sharedStatusBox.updatePost(box.postId, fallbackText).catch(function () {});
|
// do NOT set box.usePlugin = false
|
||||||
|
} else {
|
||||||
|
logger.warn({ sessionKey, err: err.message }, 'Plugin API hard failure — falling back to REST');
|
||||||
|
box.usePlugin = false;
|
||||||
|
var fallbackText = buildStatusText(box, state, activeBoxes, watcher, sessionKey);
|
||||||
|
sharedStatusBox.updatePost(box.postId, fallbackText).catch(function () {});
|
||||||
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
// REST API fallback: format text and PUT update post
|
// REST API fallback: format text and PUT update post
|
||||||
@@ -459,7 +541,7 @@ async function startDaemon() {
|
|||||||
|
|
||||||
// ---- Session Idle (from watcher) ----
|
// ---- Session Idle (from watcher) ----
|
||||||
watcher.on('session-idle', async (sessionKey, state) => {
|
watcher.on('session-idle', async (sessionKey, state) => {
|
||||||
const box = activeBoxes.get(sessionKey);
|
const box = activeBoxes.get(sessionKey); // snapshot at entry
|
||||||
if (!box) {
|
if (!box) {
|
||||||
// Sub-agent completed
|
// Sub-agent completed
|
||||||
updateParentWithChild(activeBoxes, watcher, sharedStatusBox, pluginClient, sessionKey, state, logger);
|
updateParentWithChild(activeBoxes, watcher, sharedStatusBox, pluginClient, sessionKey, state, logger);
|
||||||
@@ -492,12 +574,23 @@ async function startDaemon() {
|
|||||||
logger.error({ sessionKey, err }, 'Failed to update final status');
|
logger.error({ sessionKey, err }, 'Failed to update final status');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Guard: if the box was replaced during the awaits above (new session reactivated),
|
||||||
|
// skip cleanup to avoid killing the newly re-added session.
|
||||||
|
if (activeBoxes.get(sessionKey) !== box) {
|
||||||
|
logger.info({ sessionKey }, 'session-idle: box replaced during await — skipping cleanup (new session active)');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Save to completedBoxes so we can reuse the post ID if the session reactivates
|
// Save to completedBoxes so we can reuse the post ID if the session reactivates
|
||||||
completedBoxes.set(sessionKey, {
|
completedBoxes.set(sessionKey, {
|
||||||
postId: box.postId,
|
postId: box.postId,
|
||||||
lastOffset: state.lastOffset || 0,
|
lastOffset: state.lastOffset || 0,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Persist final offsets BEFORE removing session from watcher so the
|
||||||
|
// completed session's last offset is captured in the snapshot.
|
||||||
|
saveOffsets(config.offsetFile, watcher.sessions);
|
||||||
|
|
||||||
// Clean up active tracking
|
// Clean up active tracking
|
||||||
activeBoxes.delete(sessionKey);
|
activeBoxes.delete(sessionKey);
|
||||||
watcher.removeSession(sessionKey);
|
watcher.removeSession(sessionKey);
|
||||||
@@ -505,9 +598,6 @@ async function startDaemon() {
|
|||||||
// ensures we reuse the existing post instead of creating a new one.
|
// ensures we reuse the existing post instead of creating a new one.
|
||||||
monitor.forgetSession(sessionKey);
|
monitor.forgetSession(sessionKey);
|
||||||
globalMetrics.activeSessions = activeBoxes.size;
|
globalMetrics.activeSessions = activeBoxes.size;
|
||||||
|
|
||||||
// Persist final offsets
|
|
||||||
saveOffsets(config.offsetFile, watcher.sessions);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// ---- Start all subsystems ----
|
// ---- Start all subsystems ----
|
||||||
@@ -534,6 +624,9 @@ async function startDaemon() {
|
|||||||
|
|
||||||
logger.info({ signal }, 'Shutting down gracefully');
|
logger.info({ signal }, 'Shutting down gracefully');
|
||||||
|
|
||||||
|
// Persist offsets for all active sessions before stopping the watcher
|
||||||
|
saveOffsets(config.offsetFile, watcher.sessions);
|
||||||
|
|
||||||
// Stop accepting new sessions
|
// Stop accepting new sessions
|
||||||
monitor.stop();
|
monitor.stop();
|
||||||
watcher.stop();
|
watcher.stop();
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ fi
|
|||||||
export MM_BOT_TOKEN="${MM_BOT_TOKEN:?MM_BOT_TOKEN is required}"
|
export MM_BOT_TOKEN="${MM_BOT_TOKEN:?MM_BOT_TOKEN is required}"
|
||||||
export MM_BASE_URL="${MM_BASE_URL:-https://slack.solio.tech}"
|
export MM_BASE_URL="${MM_BASE_URL:-https://slack.solio.tech}"
|
||||||
export MM_BOT_USER_ID="${MM_BOT_USER_ID:-eqtkymoej7rw7dp8xbh7hywzrr}"
|
export MM_BOT_USER_ID="${MM_BOT_USER_ID:-eqtkymoej7rw7dp8xbh7hywzrr}"
|
||||||
export TRANSCRIPT_DIR="${TRANSCRIPT_DIR:-/home/node/.openclaw/agents}"
|
export TRANSCRIPT_DIR="${TRANSCRIPT_DIR:-/root/.openclaw/agents}"
|
||||||
export LOG_LEVEL="${LOG_LEVEL:-info}"
|
export LOG_LEVEL="${LOG_LEVEL:-info}"
|
||||||
export IDLE_TIMEOUT_S="${IDLE_TIMEOUT_S:-60}"
|
export IDLE_TIMEOUT_S="${IDLE_TIMEOUT_S:-60}"
|
||||||
export SESSION_POLL_MS="${SESSION_POLL_MS:-2000}"
|
export SESSION_POLL_MS="${SESSION_POLL_MS:-2000}"
|
||||||
|
|||||||
Reference in New Issue
Block a user