9 Commits

Author SHA1 Message Date
Xen
f4a66f9644 v5.0: fix TRANSCRIPT_DIR default, disable custom post type
Root cause: TRANSCRIPT_DIR was /home/node/.openclaw/agents but the actual
sessions live at /root/.openclaw/agents. The daemon started, watched an empty
directory, and never detected any sessions.

Changes:
- config.js: default TRANSCRIPT_DIR -> /root/.openclaw/agents
- start-daemon.sh: same fix for fallback default
- .env.daemon (local): TRANSCRIPT_DIR fixed, PLUGIN_ENABLED=false

The custom_livestatus post type requires the Mattermost plugin webapp React
bundle to render. Disabled by default — now uses plain REST API posts with
markdown formatting, which render reliably everywhere (desktop, mobile, web).

Previous version preserved as git tag v4.1.
2026-03-15 10:20:48 +00:00
Xen
07c6fd701b Revert "fix: ghost watch false-positive reactivation on trailing writes"
This reverts commit 5c4a665fb9.
2026-03-10 08:38:41 +00:00
Xen
5c4a665fb9 fix: ghost watch false-positive reactivation on trailing writes
When the lock file is deleted (turn complete) and triggerIdle fires,
the transcript file continues receiving writes (the agent's own reply
being appended). The ghost watch was firing session-reactivate on these
trailing writes, causing an immediate complete→reactivate→complete loop
within the same turn.

Fix: only emit session-reactivate from ghost watch if the lock file
currently exists. A JSONL write without a lock file is a trailing write
from the completed turn, not a new user message.
2026-03-10 08:38:24 +00:00
Xen
f1d3ae9c4c fix: concurrent session-added dedup via sessionAddInProgress set
Root cause of double status boxes: lock file event + ghost watch both fire
at the same time on reactivation. Both call clearCompleted+pollNow, both
session-added events reach the handler before activeBoxes.has() returns true
for either, so two status boxes are created.

Fix: sessionAddInProgress Set gates the handler. First caller proceeds,
second caller sees the key in-progress and returns immediately. Cleared
on success (after activeBoxes.set) and on error (before return).
2026-03-09 21:13:18 +00:00
Xen
3fbd46c2d2 fix: dedup status boxes — one per channel, thread sessions take priority
When a bare channel session (no 🧵 suffix) and a thread-specific session
both resolve to the same MM channel/DM, two status boxes appeared simultaneously.

Fix: in session-added handler, before creating a box, check if any existing
active session already owns that channelId. Thread sessions displace bare channel
sessions (and take priority). Bare channel sessions are skipped if a thread
session already exists. First-in wins for same-type duplicates.
2026-03-09 21:10:41 +00:00
Xen
e483b0bc42 fix: skip heartbeat/internal sessions (agent:main:main) from status box creation
Heartbeat sessions (key pattern agent:<agent>:main) have no real Mattermost
conversation context. The daemon was resolving them to the DM fallback channel
and creating a new status box on every heartbeat cycle (~every 30min but firing
rapidly during active work). Each one appeared as a separate live status post.

Fix: in session-added handler, skip any session key matching /^agent:[^:]+:main$/
or /^agent:[^:]+:cli$/ before creating a status box.
2026-03-09 20:01:06 +00:00
Xen
888e8af784 fix(batch3): HTTP timeout, transient 429 handling, lock file poll fallback 2026-03-09 19:48:27 +00:00
Xen
897abf0a9a fix(batch2): JSONL line buffering, session-idle race guard, ghost watch deferred cleanup 2026-03-09 19:43:43 +00:00
Xen
0b39b39f3b fix(batch1): safe fixes — saveOffsets ordering, pid atomic, error handlers, go mutex, frontend cleanup 2026-03-09 19:43:30 +00:00
8 changed files with 247 additions and 23 deletions

View File

@@ -122,6 +122,16 @@ func (p *Plugin) handleCreateSession(w http.ResponseWriter, r *http.Request) {
return return
} }
// Validate KV key length — Mattermost enforces a 50-char limit.
// Encoded key = kvPrefix (11 chars) + url.PathEscape(sessionKey).
// Exceeding the limit causes KVSet to silently succeed but never store data.
if len(encodeKey(req.SessionKey)) > 50 {
writeJSON(w, http.StatusBadRequest, map[string]string{
"error": fmt.Sprintf("session_key too long: encoded key length %d exceeds 50-char KV limit", len(encodeKey(req.SessionKey))),
})
return
}
// Check max active sessions // Check max active sessions
config := p.getConfiguration() config := p.getConfiguration()
allSessions, _ := p.store.ListAllSessions() allSessions, _ := p.store.ListAllSessions()

View File

@@ -21,6 +21,9 @@ type Plugin struct {
// store wraps KV store operations for session persistence. // store wraps KV store operations for session persistence.
store *Store store *Store
// botUserIDLock synchronizes access to botUserID.
botUserIDLock sync.RWMutex
// botUserID is the plugin's bot user ID (created on activation). // botUserID is the plugin's bot user ID (created on activation).
botUserID string botUserID string
@@ -41,7 +44,9 @@ func (p *Plugin) OnActivate() error {
if appErr != nil { if appErr != nil {
p.API.LogWarn("Failed to ensure bot user", "error", appErr.Error()) p.API.LogWarn("Failed to ensure bot user", "error", appErr.Error())
} else { } else {
p.botUserIDLock.Lock()
p.botUserID = botID p.botUserID = botID
p.botUserIDLock.Unlock()
p.API.LogInfo("Plugin bot user ensured", "botUserID", botID) p.API.LogInfo("Plugin bot user ensured", "botUserID", botID)
} }
@@ -75,8 +80,10 @@ func (p *Plugin) sessionCleanupLoop() {
} }
} }
// getBotUserID returns the plugin's bot user ID. // getBotUserID returns the plugin's bot user ID (thread-safe).
func (p *Plugin) getBotUserID() string { func (p *Plugin) getBotUserID() string {
p.botUserIDLock.RLock()
defer p.botUserIDLock.RUnlock()
return p.botUserID return p.botUserID
} }
@@ -93,6 +100,10 @@ func (p *Plugin) OnDeactivate() error {
p.API.LogWarn("Failed to list sessions on deactivate", "error", err.Error()) p.API.LogWarn("Failed to list sessions on deactivate", "error", err.Error())
} else { } else {
for _, s := range sessions { for _, s := range sessions {
// Skip sessions already in a terminal state — do not overwrite done/error
if s.Status == "done" || s.Status == "error" {
continue
}
s.Status = "interrupted" s.Status = "interrupted"
_ = p.store.SaveSession(s.SessionKey, s) _ = p.store.SaveSession(s.SessionKey, s)
p.broadcastUpdate(s.ChannelID, s) p.broadcastUpdate(s.ChannelID, s)

View File

@@ -61,6 +61,13 @@ class LiveStatusPlugin {
window.__livestatus_updates[data.post_id] = update; window.__livestatus_updates[data.post_id] = update;
// Evict completed sessions from the update cache after 60s to prevent unbounded growth
if (data.status === 'done' || data.status === 'error' || data.status === 'interrupted') {
setTimeout(() => {
delete window.__livestatus_updates[data.post_id];
}, 60000);
}
// Notify post-specific listeners // Notify post-specific listeners
const listeners = window.__livestatus_listeners[data.post_id]; const listeners = window.__livestatus_listeners[data.post_id];
if (listeners) { if (listeners) {
@@ -82,7 +89,16 @@ class LiveStatusPlugin {
} }
uninitialize(): void { uninitialize(): void {
// Cleanup handled by Mattermost plugin framework // Clear global listener and update stores to prevent accumulation across reloads
window.__livestatus_listeners = {};
window.__livestatus_updates = {};
// Unregister the custom post type component if it was registered
if (this.postTypeComponentId) {
// registry is not available here — Mattermost framework cleans up on deactivate.
// Clearing postTypeComponentId prevents stale references.
this.postTypeComponentId = null;
}
} }
} }

View File

@@ -46,7 +46,7 @@ function buildConfig() {
}, },
// Transcript directory (OpenClaw agents) // Transcript directory (OpenClaw agents)
transcriptDir: getEnv('TRANSCRIPT_DIR', '/home/node/.openclaw/agents'), transcriptDir: getEnv('TRANSCRIPT_DIR', '/root/.openclaw/agents'),
// Timing // Timing
throttleMs: getEnvInt('THROTTLE_MS', 500), throttleMs: getEnvInt('THROTTLE_MS', 500),

View File

@@ -311,6 +311,10 @@ class StatusBox extends EventEmitter {
}); });
}); });
req.setTimeout(30000, () => {
req.destroy(new Error('HTTP request timed out after 30s'));
});
req.on('error', reject); req.on('error', reject);
if (bodyStr) req.write(bodyStr); if (bodyStr) req.write(bodyStr);
req.end(); req.end();

View File

@@ -82,6 +82,10 @@ class StatusWatcher extends EventEmitter {
tokenCount: 0, tokenCount: 0,
children: [], children: [],
idleTimer: null, idleTimer: null,
_lineBuffer: '',
_lockActive: false,
_lockExists: undefined,
_lockPollTimer: null,
}; };
this.sessions.set(sessionKey, state); this.sessions.set(sessionKey, state);
@@ -98,6 +102,9 @@ class StatusWatcher extends EventEmitter {
// Start file polling as fallback (fs.watch may not work on bind mounts in Docker) // Start file polling as fallback (fs.watch may not work on bind mounts in Docker)
this._startFilePoll(sessionKey, state); this._startFilePoll(sessionKey, state);
// Start lock file poll fallback (inotify misses on Docker bind mounts)
this._startLockPoll(sessionKey, state);
} }
/** /**
@@ -130,6 +137,60 @@ class StatusWatcher extends EventEmitter {
} }
} }
/**
* Start polling a lock file for changes (fallback for fs.watch on Docker bind mounts).
* Idempotent with fs.watch — if fs.watch already fired, _lockActive is already updated
* and no duplicate events are emitted.
* @private
*/
_startLockPoll(sessionKey, state) {
var self = this;
var lockFile = state.transcriptFile + '.lock';
state._lockPollTimer = setInterval(function () {
try {
var exists = fs.existsSync(lockFile);
// First poll: just initialize state, don't emit
if (state._lockExists === undefined) {
state._lockExists = exists;
return;
}
var changed = exists !== state._lockExists;
state._lockExists = exists;
if (!changed) return;
if (exists) {
// Lock file appeared — session activated by user message
if (!state._lockActive) {
state._lockActive = true;
if (self.logger) {
self.logger.info({ sessionKey }, 'Lock poll: lock file appeared — emitting session-lock');
}
self.emit('session-lock', sessionKey);
}
} else {
// Lock file disappeared — turn complete
if (state._lockActive) {
state._lockActive = false;
if (self.logger) {
self.logger.info({ sessionKey }, 'Lock poll: lock file removed — emitting session-lock-released');
}
self.emit('session-lock-released', sessionKey);
}
}
} catch (_e) {
// Ignore stat errors (file/dir may not exist yet)
}
}, 1000);
if (self.logger) {
self.logger.info({ sessionKey }, 'Lock poll timer started');
}
}
/** /**
* Remove a session from watching. * Remove a session from watching.
* @param {string} sessionKey * @param {string} sessionKey
@@ -140,6 +201,7 @@ class StatusWatcher extends EventEmitter {
if (state.idleTimer) clearTimeout(state.idleTimer); if (state.idleTimer) clearTimeout(state.idleTimer);
if (state._filePollTimer) clearInterval(state._filePollTimer); if (state._filePollTimer) clearInterval(state._filePollTimer);
if (state._lockPollTimer) clearInterval(state._lockPollTimer);
// Keep fileToSession mapping alive so fs.watch still fires for this file. // Keep fileToSession mapping alive so fs.watch still fires for this file.
// Mark it as a "ghost" — changes trigger 'session-file-changed' so the // Mark it as a "ghost" — changes trigger 'session-file-changed' so the
@@ -211,6 +273,10 @@ class StatusWatcher extends EventEmitter {
clearInterval(state._filePollTimer); clearInterval(state._filePollTimer);
state._filePollTimer = null; state._filePollTimer = null;
} }
if (state._lockPollTimer) {
clearInterval(state._lockPollTimer);
state._lockPollTimer = null;
}
} }
if (this.logger) this.logger.info('StatusWatcher stopped'); if (this.logger) this.logger.info('StatusWatcher stopped');
} }
@@ -236,6 +302,12 @@ class StatusWatcher extends EventEmitter {
// Lock file CREATED — gateway started processing user message. // Lock file CREATED — gateway started processing user message.
// Earliest possible signal: fires before any JSONL write. // Earliest possible signal: fires before any JSONL write.
if (sessionKey) { if (sessionKey) {
const sessionState = this.sessions.get(sessionKey);
// Sync _lockActive so poll fallback stays idempotent
if (sessionState && !sessionState._lockActive) {
sessionState._lockActive = true;
sessionState._lockExists = true;
}
if (this.logger) this.logger.info({ sessionKey }, 'Lock file created — session active, triggering early reactivation'); if (this.logger) this.logger.info({ sessionKey }, 'Lock file created — session active, triggering early reactivation');
this.emit('session-lock', sessionKey); this.emit('session-lock', sessionKey);
} else { } else {
@@ -245,6 +317,12 @@ class StatusWatcher extends EventEmitter {
// Lock file DELETED — gateway finished the turn and sent final reply. // Lock file DELETED — gateway finished the turn and sent final reply.
// Immediate idle signal: no need to wait for cache-ttl or 60s timeout. // Immediate idle signal: no need to wait for cache-ttl or 60s timeout.
if (sessionKey) { if (sessionKey) {
const sessionState = this.sessions.get(sessionKey);
// Sync _lockActive so poll fallback stays idempotent
if (sessionState && sessionState._lockActive) {
sessionState._lockActive = false;
sessionState._lockExists = false;
}
if (this.logger) this.logger.info({ sessionKey }, 'Lock file deleted — turn complete, marking session done immediately'); if (this.logger) this.logger.info({ sessionKey }, 'Lock file deleted — turn complete, marking session done immediately');
this.emit('session-lock-released', sessionKey); this.emit('session-lock-released', sessionKey);
} else { } else {
@@ -263,12 +341,11 @@ class StatusWatcher extends EventEmitter {
// Ghost watch: file changed for a completed session — signal immediate re-detection // Ghost watch: file changed for a completed session — signal immediate re-detection
if (sessionKey.startsWith('\x00ghost:')) { if (sessionKey.startsWith('\x00ghost:')) {
const originalKey = sessionKey.slice(7); const originalKey = sessionKey.slice(7);
// Remove ghost so we don't fire repeatedly // Do NOT delete ghost entry here — let caller clean up after pollNow confirms the session
this.fileToSession.delete(fullPath);
if (this.logger) { if (this.logger) {
this.logger.info({ sessionKey: originalKey }, 'fs.watch: file change on completed session — triggering reactivation'); this.logger.info({ sessionKey: originalKey }, 'fs.watch: file change on completed session — triggering reactivation');
} }
this.emit('session-reactivate', originalKey); this.emit('session-reactivate', originalKey, fullPath);
return; return;
} }
@@ -320,9 +397,11 @@ class StatusWatcher extends EventEmitter {
state.lastOffset += bytesRead; state.lastOffset += bytesRead;
// Parse JSONL lines // Parse JSONL lines — handle partial lines at chunk boundary
const chunk = buffer.toString('utf8', 0, bytesRead); const chunk = buffer.toString('utf8', 0, bytesRead);
const lines = chunk.split('\n').filter((l) => l.trim()); const raw = (state._lineBuffer || '') + chunk;
state._lineBuffer = raw.endsWith('\n') ? '' : raw.split('\n').pop();
const lines = raw.split('\n').slice(0, raw.endsWith('\n') ? undefined : -1).filter((l) => l.trim());
for (const line of lines) { for (const line of lines) {
this._parseLine(sessionKey, state, line); this._parseLine(sessionKey, state, line);
@@ -539,6 +618,17 @@ class StatusWatcher extends EventEmitter {
const elapsed = Date.now() - state.lastActivityAt; const elapsed = Date.now() - state.lastActivityAt;
const idleMs = this.idleTimeoutS * 1000; const idleMs = this.idleTimeoutS * 1000;
// Safeguard: if pendingToolCalls is stuck > 0 for more than 30s, clamp to 0
if (state.pendingToolCalls > 0 && elapsed > 30000) {
if (this.logger) {
this.logger.warn(
{ sessionKey, pendingToolCalls: state.pendingToolCalls, elapsedS: Math.floor(elapsed / 1000) },
'_checkIdle: pendingToolCalls stuck > 30s — clamping to 0 to unblock idle detection',
);
}
state.pendingToolCalls = 0;
}
if (elapsed >= idleMs && state.pendingToolCalls === 0) { if (elapsed >= idleMs && state.pendingToolCalls === 0) {
if (this.logger) { if (this.logger) {
this.logger.info({ sessionKey, elapsedS: Math.floor(elapsed / 1000) }, 'Session idle'); this.logger.info({ sessionKey, elapsedS: Math.floor(elapsed / 1000) }, 'Session idle');

View File

@@ -47,8 +47,19 @@ if (cmd === 'start') {
// ---- PID File helpers ---- // ---- PID File helpers ----
function writePidFile(pidFile) { function writePidFile(pidFile) {
// eslint-disable-next-line security/detect-non-literal-fs-filename try {
fs.writeFileSync(pidFile, String(process.pid), 'utf8'); // eslint-disable-next-line security/detect-non-literal-fs-filename
const fd = fs.openSync(pidFile, 'wx');
fs.writeSync(fd, String(process.pid));
fs.closeSync(fd);
} catch (err) {
if (err.code === 'EEXIST') {
// Another process won the race — bail out
console.error('PID file already exists — another daemon may be running');
process.exit(1);
}
throw err;
}
} }
function readPidFile(pidFile) { function readPidFile(pidFile) {
@@ -100,8 +111,10 @@ function saveOffsets(offsetFile, sessions) {
try { try {
// eslint-disable-next-line security/detect-non-literal-fs-filename // eslint-disable-next-line security/detect-non-literal-fs-filename
fs.writeFileSync(offsetFile, JSON.stringify(offsets, null, 2), 'utf8'); fs.writeFileSync(offsetFile, JSON.stringify(offsets, null, 2), 'utf8');
} catch (_e) { } catch (writeErr) {
/* ignore write error */ // Log disk-full / permission errors so they are visible in daemon logs
// eslint-disable-next-line no-console
console.warn('[saveOffsets] Failed to write offset file:', writeErr.message);
} }
} }
@@ -115,6 +128,16 @@ async function startDaemon() {
const config = getConfig(); const config = getConfig();
const logger = getLogger(); const logger = getLogger();
// Global error handlers — prevent silent daemon death from unhandled rejections
process.on('unhandledRejection', (reason) => {
logger.error({ reason }, 'Unhandled promise rejection — shutting down');
shutdown().finally(() => process.exit(1));
});
process.on('uncaughtException', (err) => {
logger.error({ err }, 'Uncaught exception — shutting down');
shutdown().finally(() => process.exit(1));
});
// Check if already running // Check if already running
const existingPid = readPidFile(config.pidFile); const existingPid = readPidFile(config.pidFile);
if (existingPid && isProcessRunning(existingPid)) { if (existingPid && isProcessRunning(existingPid)) {
@@ -137,6 +160,8 @@ async function startDaemon() {
// Shared state // Shared state
// Map<sessionKey, { postId, agentId, channelId, rootPostId, children: Map }> // Map<sessionKey, { postId, agentId, channelId, rootPostId, children: Map }>
const activeBoxes = new Map(); const activeBoxes = new Map();
// Guard against concurrent session-added events for the same key (e.g. lock + ghost fire simultaneously)
const sessionAddInProgress = new Set();
// Completed sessions: Map<sessionKey, { postId, lastOffset }> // Completed sessions: Map<sessionKey, { postId, lastOffset }>
// Tracks sessions that went idle so we can reuse their post on reactivation // Tracks sessions that went idle so we can reuse their post on reactivation
@@ -233,12 +258,59 @@ async function startDaemon() {
monitor.on('session-added', async (info) => { monitor.on('session-added', async (info) => {
const { sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId } = info; const { sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId } = info;
// Guard: prevent duplicate concurrent session-added for same key.
// Happens when lock file event + ghost watch both fire simultaneously,
// both call pollNow(), and both session-added events land before activeBoxes is updated.
if (sessionAddInProgress.has(sessionKey)) {
logger.debug({ sessionKey }, 'session-added already in progress — dedup skip');
return;
}
if (activeBoxes.has(sessionKey)) {
logger.debug({ sessionKey }, 'session-added for already-active session — skip');
return;
}
sessionAddInProgress.add(sessionKey);
// Skip if no channel // Skip if no channel
if (!channelId) { if (!channelId) {
logger.debug({ sessionKey }, 'No channel for session — skipping'); logger.debug({ sessionKey }, 'No channel for session — skipping');
return; return;
} }
// Skip heartbeat/internal sessions (agent:main:main, agent:main:cli, etc.)
// These have no real Mattermost conversation context and produce spurious status boxes.
// A heartbeat session key ends with ':main' and has no channel/thread suffix.
if (/^agent:[^:]+:main$/.test(sessionKey) || /^agent:[^:]+:cli$/.test(sessionKey)) {
logger.debug({ sessionKey }, 'Heartbeat/internal session — skipping status box');
return;
}
// Dedup: skip if another active session already owns this channel.
// This prevents duplicate status boxes when a threadless parent session and a
// thread-specific child session both resolve to the same MM channel/DM.
// Thread sessions (containing ':thread:') take priority over bare channel sessions.
const isThreadSession = sessionKey.includes(':thread:');
for (const [existingKey, existingBox] of activeBoxes) {
if (existingBox.channelId === channelId) {
const existingIsThread = existingKey.includes(':thread:');
if (isThreadSession && !existingIsThread) {
// New session is a thread — it takes priority. Remove the parent box.
logger.info({ sessionKey, displaced: existingKey }, 'Thread session displacing bare channel session');
activeBoxes.delete(existingKey);
watcher.removeSession(existingKey);
} else if (!isThreadSession && existingIsThread) {
// Existing session is a thread — skip this bare channel session.
logger.debug({ sessionKey, existingKey }, 'Bare channel session skipped — thread session already owns this channel');
return;
} else {
// Same type — skip the newcomer, first-in wins.
logger.debug({ sessionKey, existingKey }, 'Duplicate channel session skipped');
return;
}
break;
}
}
// Enforce MAX_ACTIVE_SESSIONS // Enforce MAX_ACTIVE_SESSIONS
if (activeBoxes.size >= config.maxActiveSessions) { if (activeBoxes.size >= config.maxActiveSessions) {
logger.warn( logger.warn(
@@ -317,6 +389,7 @@ async function startDaemon() {
} catch (err) { } catch (err) {
logger.error({ sessionKey, err }, 'Failed to create status post'); logger.error({ sessionKey, err }, 'Failed to create status post');
globalMetrics.lastError = err.message; globalMetrics.lastError = err.message;
sessionAddInProgress.delete(sessionKey);
return; return;
} }
} }
@@ -329,6 +402,7 @@ async function startDaemon() {
usePlugin: usePlugin && !!pluginClient, // track which mode this session uses usePlugin: usePlugin && !!pluginClient, // track which mode this session uses
children: new Map(), children: new Map(),
}); });
sessionAddInProgress.delete(sessionKey);
globalMetrics.activeSessions = activeBoxes.size; globalMetrics.activeSessions = activeBoxes.size;
// Register in watcher. // Register in watcher.
@@ -373,11 +447,13 @@ async function startDaemon() {
// ---- Ghost reactivation (from watcher fs.watch on completed session file) ---- // ---- Ghost reactivation (from watcher fs.watch on completed session file) ----
// Fires immediately when the transcript file changes after a session completes. // Fires immediately when the transcript file changes after a session completes.
// Clears the completedSessions cooldown so the next monitor poll re-detects instantly. // Clears the completedSessions cooldown so the next monitor poll re-detects instantly.
watcher.on('session-reactivate', (sessionKey) => { watcher.on('session-reactivate', (sessionKey, ghostPath) => {
logger.info({ sessionKey }, 'Ghost watch triggered reactivation — clearing completed cooldown'); logger.info({ sessionKey }, 'Ghost watch triggered reactivation — clearing completed cooldown');
monitor.clearCompleted(sessionKey); monitor.clearCompleted(sessionKey);
// Force an immediate poll so the session is re-added without waiting 2s // Force an immediate poll so the session is re-added without waiting 2s
monitor.pollNow(); monitor.pollNow();
// Clean up ghost entry now — clearCompleted+pollNow is sufficient, ghost served its purpose
if (ghostPath) watcher.fileToSession.delete(ghostPath);
}); });
// ---- Lock file reactivation (earliest possible trigger) ---- // ---- Lock file reactivation (earliest possible trigger) ----
@@ -437,11 +513,17 @@ async function startDaemon() {
}), }),
start_time_ms: state.startTime, start_time_ms: state.startTime,
}).catch(function (err) { }).catch(function (err) {
// If plugin rejects (e.g. session not found), fall back to REST for this session // Only permanently disable plugin mode for hard failures (non-retryable errors).
logger.warn({ sessionKey, err: err.message }, 'Plugin update failed — falling back to REST'); // 429 and 5xx are transient — keep plugin mode and retry on next update.
box.usePlugin = false; if (err.statusCode === 429 || (err.statusCode >= 500 && err.statusCode < 600)) {
var fallbackText = buildStatusText(box, state, activeBoxes, watcher, sessionKey); logger.warn({ sessionKey, statusCode: err.statusCode }, 'Plugin API transient error — keeping plugin mode, will retry next update');
sharedStatusBox.updatePost(box.postId, fallbackText).catch(function () {}); // do NOT set box.usePlugin = false
} else {
logger.warn({ sessionKey, err: err.message }, 'Plugin API hard failure — falling back to REST');
box.usePlugin = false;
var fallbackText = buildStatusText(box, state, activeBoxes, watcher, sessionKey);
sharedStatusBox.updatePost(box.postId, fallbackText).catch(function () {});
}
}); });
} else { } else {
// REST API fallback: format text and PUT update post // REST API fallback: format text and PUT update post
@@ -459,7 +541,7 @@ async function startDaemon() {
// ---- Session Idle (from watcher) ---- // ---- Session Idle (from watcher) ----
watcher.on('session-idle', async (sessionKey, state) => { watcher.on('session-idle', async (sessionKey, state) => {
const box = activeBoxes.get(sessionKey); const box = activeBoxes.get(sessionKey); // snapshot at entry
if (!box) { if (!box) {
// Sub-agent completed // Sub-agent completed
updateParentWithChild(activeBoxes, watcher, sharedStatusBox, pluginClient, sessionKey, state, logger); updateParentWithChild(activeBoxes, watcher, sharedStatusBox, pluginClient, sessionKey, state, logger);
@@ -492,12 +574,23 @@ async function startDaemon() {
logger.error({ sessionKey, err }, 'Failed to update final status'); logger.error({ sessionKey, err }, 'Failed to update final status');
} }
// Guard: if the box was replaced during the awaits above (new session reactivated),
// skip cleanup to avoid killing the newly re-added session.
if (activeBoxes.get(sessionKey) !== box) {
logger.info({ sessionKey }, 'session-idle: box replaced during await — skipping cleanup (new session active)');
return;
}
// Save to completedBoxes so we can reuse the post ID if the session reactivates // Save to completedBoxes so we can reuse the post ID if the session reactivates
completedBoxes.set(sessionKey, { completedBoxes.set(sessionKey, {
postId: box.postId, postId: box.postId,
lastOffset: state.lastOffset || 0, lastOffset: state.lastOffset || 0,
}); });
// Persist final offsets BEFORE removing session from watcher so the
// completed session's last offset is captured in the snapshot.
saveOffsets(config.offsetFile, watcher.sessions);
// Clean up active tracking // Clean up active tracking
activeBoxes.delete(sessionKey); activeBoxes.delete(sessionKey);
watcher.removeSession(sessionKey); watcher.removeSession(sessionKey);
@@ -505,9 +598,6 @@ async function startDaemon() {
// ensures we reuse the existing post instead of creating a new one. // ensures we reuse the existing post instead of creating a new one.
monitor.forgetSession(sessionKey); monitor.forgetSession(sessionKey);
globalMetrics.activeSessions = activeBoxes.size; globalMetrics.activeSessions = activeBoxes.size;
// Persist final offsets
saveOffsets(config.offsetFile, watcher.sessions);
}); });
// ---- Start all subsystems ---- // ---- Start all subsystems ----
@@ -534,6 +624,9 @@ async function startDaemon() {
logger.info({ signal }, 'Shutting down gracefully'); logger.info({ signal }, 'Shutting down gracefully');
// Persist offsets for all active sessions before stopping the watcher
saveOffsets(config.offsetFile, watcher.sessions);
// Stop accepting new sessions // Stop accepting new sessions
monitor.stop(); monitor.stop();
watcher.stop(); watcher.stop();

View File

@@ -22,7 +22,7 @@ fi
export MM_BOT_TOKEN="${MM_BOT_TOKEN:?MM_BOT_TOKEN is required}" export MM_BOT_TOKEN="${MM_BOT_TOKEN:?MM_BOT_TOKEN is required}"
export MM_BASE_URL="${MM_BASE_URL:-https://slack.solio.tech}" export MM_BASE_URL="${MM_BASE_URL:-https://slack.solio.tech}"
export MM_BOT_USER_ID="${MM_BOT_USER_ID:-eqtkymoej7rw7dp8xbh7hywzrr}" export MM_BOT_USER_ID="${MM_BOT_USER_ID:-eqtkymoej7rw7dp8xbh7hywzrr}"
export TRANSCRIPT_DIR="${TRANSCRIPT_DIR:-/home/node/.openclaw/agents}" export TRANSCRIPT_DIR="${TRANSCRIPT_DIR:-/root/.openclaw/agents}"
export LOG_LEVEL="${LOG_LEVEL:-info}" export LOG_LEVEL="${LOG_LEVEL:-info}"
export IDLE_TIMEOUT_S="${IDLE_TIMEOUT_S:-60}" export IDLE_TIMEOUT_S="${IDLE_TIMEOUT_S:-60}"
export SESSION_POLL_MS="${SESSION_POLL_MS:-2000}" export SESSION_POLL_MS="${SESSION_POLL_MS:-2000}"