From 897abf0a9ab25f3519acdcfd46830938b5121f61 Mon Sep 17 00:00:00 2001 From: Xen Date: Mon, 9 Mar 2026 19:43:43 +0000 Subject: [PATCH] fix(batch2): JSONL line buffering, session-idle race guard, ghost watch deferred cleanup --- src/status-watcher.js | 23 ++++++++++++++++++----- src/watcher-manager.js | 4 +++- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/status-watcher.js b/src/status-watcher.js index 358e25e..ce7e8e5 100644 --- a/src/status-watcher.js +++ b/src/status-watcher.js @@ -82,6 +82,7 @@ class StatusWatcher extends EventEmitter { tokenCount: 0, children: [], idleTimer: null, + _lineBuffer: '', }; this.sessions.set(sessionKey, state); @@ -263,12 +264,11 @@ class StatusWatcher extends EventEmitter { // Ghost watch: file changed for a completed session — signal immediate re-detection if (sessionKey.startsWith('\x00ghost:')) { const originalKey = sessionKey.slice(7); - // Remove ghost so we don't fire repeatedly - this.fileToSession.delete(fullPath); + // Do NOT delete ghost entry here — let caller clean up after pollNow confirms the session if (this.logger) { this.logger.info({ sessionKey: originalKey }, 'fs.watch: file change on completed session — triggering reactivation'); } - this.emit('session-reactivate', originalKey); + this.emit('session-reactivate', originalKey, fullPath); return; } @@ -320,9 +320,11 @@ class StatusWatcher extends EventEmitter { state.lastOffset += bytesRead; - // Parse JSONL lines + // Parse JSONL lines — handle partial lines at chunk boundary const chunk = buffer.toString('utf8', 0, bytesRead); - const lines = chunk.split('\n').filter((l) => l.trim()); + const raw = (state._lineBuffer || '') + chunk; + state._lineBuffer = raw.endsWith('\n') ? '' : raw.split('\n').pop(); + const lines = raw.split('\n').slice(0, raw.endsWith('\n') ? undefined : -1).filter((l) => l.trim()); for (const line of lines) { this._parseLine(sessionKey, state, line); @@ -539,6 +541,17 @@ class StatusWatcher extends EventEmitter { const elapsed = Date.now() - state.lastActivityAt; const idleMs = this.idleTimeoutS * 1000; + // Safeguard: if pendingToolCalls is stuck > 0 for more than 30s, clamp to 0 + if (state.pendingToolCalls > 0 && elapsed > 30000) { + if (this.logger) { + this.logger.warn( + { sessionKey, pendingToolCalls: state.pendingToolCalls, elapsedS: Math.floor(elapsed / 1000) }, + '_checkIdle: pendingToolCalls stuck > 30s — clamping to 0 to unblock idle detection', + ); + } + state.pendingToolCalls = 0; + } + if (elapsed >= idleMs && state.pendingToolCalls === 0) { if (this.logger) { this.logger.info({ sessionKey, elapsedS: Math.floor(elapsed / 1000) }, 'Session idle'); diff --git a/src/watcher-manager.js b/src/watcher-manager.js index 3ce7dac..01b38e2 100644 --- a/src/watcher-manager.js +++ b/src/watcher-manager.js @@ -396,11 +396,13 @@ async function startDaemon() { // ---- Ghost reactivation (from watcher fs.watch on completed session file) ---- // Fires immediately when the transcript file changes after a session completes. // Clears the completedSessions cooldown so the next monitor poll re-detects instantly. - watcher.on('session-reactivate', (sessionKey) => { + watcher.on('session-reactivate', (sessionKey, ghostPath) => { logger.info({ sessionKey }, 'Ghost watch triggered reactivation — clearing completed cooldown'); monitor.clearCompleted(sessionKey); // Force an immediate poll so the session is re-added without waiting 2s monitor.pollNow(); + // Clean up ghost entry now — clearCompleted+pollNow is sufficient, ghost served its purpose + if (ghostPath) watcher.fileToSession.delete(ghostPath); }); // ---- Lock file reactivation (earliest possible trigger) ----