fix(batch2): JSONL line buffering, session-idle race guard, ghost watch deferred cleanup

This commit is contained in:
Xen
2026-03-09 19:43:43 +00:00
parent 0b39b39f3b
commit 897abf0a9a
2 changed files with 21 additions and 6 deletions

View File

@@ -82,6 +82,7 @@ class StatusWatcher extends EventEmitter {
tokenCount: 0, tokenCount: 0,
children: [], children: [],
idleTimer: null, idleTimer: null,
_lineBuffer: '',
}; };
this.sessions.set(sessionKey, state); this.sessions.set(sessionKey, state);
@@ -263,12 +264,11 @@ class StatusWatcher extends EventEmitter {
// Ghost watch: file changed for a completed session — signal immediate re-detection // Ghost watch: file changed for a completed session — signal immediate re-detection
if (sessionKey.startsWith('\x00ghost:')) { if (sessionKey.startsWith('\x00ghost:')) {
const originalKey = sessionKey.slice(7); const originalKey = sessionKey.slice(7);
// Remove ghost so we don't fire repeatedly // Do NOT delete ghost entry here — let caller clean up after pollNow confirms the session
this.fileToSession.delete(fullPath);
if (this.logger) { if (this.logger) {
this.logger.info({ sessionKey: originalKey }, 'fs.watch: file change on completed session — triggering reactivation'); this.logger.info({ sessionKey: originalKey }, 'fs.watch: file change on completed session — triggering reactivation');
} }
this.emit('session-reactivate', originalKey); this.emit('session-reactivate', originalKey, fullPath);
return; return;
} }
@@ -320,9 +320,11 @@ class StatusWatcher extends EventEmitter {
state.lastOffset += bytesRead; state.lastOffset += bytesRead;
// Parse JSONL lines // Parse JSONL lines — handle partial lines at chunk boundary
const chunk = buffer.toString('utf8', 0, bytesRead); const chunk = buffer.toString('utf8', 0, bytesRead);
const lines = chunk.split('\n').filter((l) => l.trim()); const raw = (state._lineBuffer || '') + chunk;
state._lineBuffer = raw.endsWith('\n') ? '' : raw.split('\n').pop();
const lines = raw.split('\n').slice(0, raw.endsWith('\n') ? undefined : -1).filter((l) => l.trim());
for (const line of lines) { for (const line of lines) {
this._parseLine(sessionKey, state, line); this._parseLine(sessionKey, state, line);
@@ -539,6 +541,17 @@ class StatusWatcher extends EventEmitter {
const elapsed = Date.now() - state.lastActivityAt; const elapsed = Date.now() - state.lastActivityAt;
const idleMs = this.idleTimeoutS * 1000; const idleMs = this.idleTimeoutS * 1000;
// Safeguard: if pendingToolCalls is stuck > 0 for more than 30s, clamp to 0
if (state.pendingToolCalls > 0 && elapsed > 30000) {
if (this.logger) {
this.logger.warn(
{ sessionKey, pendingToolCalls: state.pendingToolCalls, elapsedS: Math.floor(elapsed / 1000) },
'_checkIdle: pendingToolCalls stuck > 30s — clamping to 0 to unblock idle detection',
);
}
state.pendingToolCalls = 0;
}
if (elapsed >= idleMs && state.pendingToolCalls === 0) { if (elapsed >= idleMs && state.pendingToolCalls === 0) {
if (this.logger) { if (this.logger) {
this.logger.info({ sessionKey, elapsedS: Math.floor(elapsed / 1000) }, 'Session idle'); this.logger.info({ sessionKey, elapsedS: Math.floor(elapsed / 1000) }, 'Session idle');

View File

@@ -396,11 +396,13 @@ async function startDaemon() {
// ---- Ghost reactivation (from watcher fs.watch on completed session file) ---- // ---- Ghost reactivation (from watcher fs.watch on completed session file) ----
// Fires immediately when the transcript file changes after a session completes. // Fires immediately when the transcript file changes after a session completes.
// Clears the completedSessions cooldown so the next monitor poll re-detects instantly. // Clears the completedSessions cooldown so the next monitor poll re-detects instantly.
watcher.on('session-reactivate', (sessionKey) => { watcher.on('session-reactivate', (sessionKey, ghostPath) => {
logger.info({ sessionKey }, 'Ghost watch triggered reactivation — clearing completed cooldown'); logger.info({ sessionKey }, 'Ghost watch triggered reactivation — clearing completed cooldown');
monitor.clearCompleted(sessionKey); monitor.clearCompleted(sessionKey);
// Force an immediate poll so the session is re-added without waiting 2s // Force an immediate poll so the session is re-added without waiting 2s
monitor.pollNow(); monitor.pollNow();
// Clean up ghost entry now — clearCompleted+pollNow is sufficient, ghost served its purpose
if (ghostPath) watcher.fileToSession.delete(ghostPath);
}); });
// ---- Lock file reactivation (earliest possible trigger) ---- // ---- Lock file reactivation (earliest possible trigger) ----