fix(batch3): HTTP timeout, transient 429 handling, lock file poll fallback

This commit is contained in:
Xen
2026-03-09 19:48:27 +00:00
parent 897abf0a9a
commit 888e8af784
3 changed files with 92 additions and 5 deletions

View File

@@ -311,6 +311,10 @@ class StatusBox extends EventEmitter {
});
});
req.setTimeout(30000, () => {
req.destroy(new Error('HTTP request timed out after 30s'));
});
req.on('error', reject);
if (bodyStr) req.write(bodyStr);
req.end();

View File

@@ -83,6 +83,9 @@ class StatusWatcher extends EventEmitter {
children: [],
idleTimer: null,
_lineBuffer: '',
_lockActive: false,
_lockExists: undefined,
_lockPollTimer: null,
};
this.sessions.set(sessionKey, state);
@@ -99,6 +102,9 @@ class StatusWatcher extends EventEmitter {
// Start file polling as fallback (fs.watch may not work on bind mounts in Docker)
this._startFilePoll(sessionKey, state);
// Start lock file poll fallback (inotify misses on Docker bind mounts)
this._startLockPoll(sessionKey, state);
}
/**
@@ -131,6 +137,60 @@ class StatusWatcher extends EventEmitter {
}
}
/**
* Start polling a lock file for changes (fallback for fs.watch on Docker bind mounts).
* Idempotent with fs.watch — if fs.watch already fired, _lockActive is already updated
* and no duplicate events are emitted.
* @private
*/
_startLockPoll(sessionKey, state) {
var self = this;
var lockFile = state.transcriptFile + '.lock';
state._lockPollTimer = setInterval(function () {
try {
var exists = fs.existsSync(lockFile);
// First poll: just initialize state, don't emit
if (state._lockExists === undefined) {
state._lockExists = exists;
return;
}
var changed = exists !== state._lockExists;
state._lockExists = exists;
if (!changed) return;
if (exists) {
// Lock file appeared — session activated by user message
if (!state._lockActive) {
state._lockActive = true;
if (self.logger) {
self.logger.info({ sessionKey }, 'Lock poll: lock file appeared — emitting session-lock');
}
self.emit('session-lock', sessionKey);
}
} else {
// Lock file disappeared — turn complete
if (state._lockActive) {
state._lockActive = false;
if (self.logger) {
self.logger.info({ sessionKey }, 'Lock poll: lock file removed — emitting session-lock-released');
}
self.emit('session-lock-released', sessionKey);
}
}
} catch (_e) {
// Ignore stat errors (file/dir may not exist yet)
}
}, 1000);
if (self.logger) {
self.logger.info({ sessionKey }, 'Lock poll timer started');
}
}
/**
* Remove a session from watching.
* @param {string} sessionKey
@@ -141,6 +201,7 @@ class StatusWatcher extends EventEmitter {
if (state.idleTimer) clearTimeout(state.idleTimer);
if (state._filePollTimer) clearInterval(state._filePollTimer);
if (state._lockPollTimer) clearInterval(state._lockPollTimer);
// Keep fileToSession mapping alive so fs.watch still fires for this file.
// Mark it as a "ghost" — changes trigger 'session-file-changed' so the
@@ -212,6 +273,10 @@ class StatusWatcher extends EventEmitter {
clearInterval(state._filePollTimer);
state._filePollTimer = null;
}
if (state._lockPollTimer) {
clearInterval(state._lockPollTimer);
state._lockPollTimer = null;
}
}
if (this.logger) this.logger.info('StatusWatcher stopped');
}
@@ -237,6 +302,12 @@ class StatusWatcher extends EventEmitter {
// Lock file CREATED — gateway started processing user message.
// Earliest possible signal: fires before any JSONL write.
if (sessionKey) {
const sessionState = this.sessions.get(sessionKey);
// Sync _lockActive so poll fallback stays idempotent
if (sessionState && !sessionState._lockActive) {
sessionState._lockActive = true;
sessionState._lockExists = true;
}
if (this.logger) this.logger.info({ sessionKey }, 'Lock file created — session active, triggering early reactivation');
this.emit('session-lock', sessionKey);
} else {
@@ -246,6 +317,12 @@ class StatusWatcher extends EventEmitter {
// Lock file DELETED — gateway finished the turn and sent final reply.
// Immediate idle signal: no need to wait for cache-ttl or 60s timeout.
if (sessionKey) {
const sessionState = this.sessions.get(sessionKey);
// Sync _lockActive so poll fallback stays idempotent
if (sessionState && sessionState._lockActive) {
sessionState._lockActive = false;
sessionState._lockExists = false;
}
if (this.logger) this.logger.info({ sessionKey }, 'Lock file deleted — turn complete, marking session done immediately');
this.emit('session-lock-released', sessionKey);
} else {

View File

@@ -462,11 +462,17 @@ async function startDaemon() {
}),
start_time_ms: state.startTime,
}).catch(function (err) {
// If plugin rejects (e.g. session not found), fall back to REST for this session
logger.warn({ sessionKey, err: err.message }, 'Plugin update failed — falling back to REST');
box.usePlugin = false;
var fallbackText = buildStatusText(box, state, activeBoxes, watcher, sessionKey);
sharedStatusBox.updatePost(box.postId, fallbackText).catch(function () {});
// Only permanently disable plugin mode for hard failures (non-retryable errors).
// 429 and 5xx are transient — keep plugin mode and retry on next update.
if (err.statusCode === 429 || (err.statusCode >= 500 && err.statusCode < 600)) {
logger.warn({ sessionKey, statusCode: err.statusCode }, 'Plugin API transient error — keeping plugin mode, will retry next update');
// do NOT set box.usePlugin = false
} else {
logger.warn({ sessionKey, err: err.message }, 'Plugin API hard failure — falling back to REST');
box.usePlugin = false;
var fallbackText = buildStatusText(box, state, activeBoxes, watcher, sessionKey);
sharedStatusBox.updatePost(box.postId, fallbackText).catch(function () {});
}
});
} else {
// REST API fallback: format text and PUT update post