fix: concurrent session-added dedup via sessionAddInProgress set
Root cause of double status boxes: lock file event + ghost watch both fire at the same time on reactivation. Both call clearCompleted+pollNow, both session-added events reach the handler before activeBoxes.has() returns true for either, so two status boxes are created. Fix: sessionAddInProgress Set gates the handler. First caller proceeds, second caller sees the key in-progress and returns immediately. Cleared on success (after activeBoxes.set) and on error (before return).
This commit is contained in:
@@ -160,6 +160,8 @@ async function startDaemon() {
|
|||||||
// Shared state
|
// Shared state
|
||||||
// Map<sessionKey, { postId, agentId, channelId, rootPostId, children: Map }>
|
// Map<sessionKey, { postId, agentId, channelId, rootPostId, children: Map }>
|
||||||
const activeBoxes = new Map();
|
const activeBoxes = new Map();
|
||||||
|
// Guard against concurrent session-added events for the same key (e.g. lock + ghost fire simultaneously)
|
||||||
|
const sessionAddInProgress = new Set();
|
||||||
|
|
||||||
// Completed sessions: Map<sessionKey, { postId, lastOffset }>
|
// Completed sessions: Map<sessionKey, { postId, lastOffset }>
|
||||||
// Tracks sessions that went idle so we can reuse their post on reactivation
|
// Tracks sessions that went idle so we can reuse their post on reactivation
|
||||||
@@ -256,6 +258,19 @@ async function startDaemon() {
|
|||||||
monitor.on('session-added', async (info) => {
|
monitor.on('session-added', async (info) => {
|
||||||
const { sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId } = info;
|
const { sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId } = info;
|
||||||
|
|
||||||
|
// Guard: prevent duplicate concurrent session-added for same key.
|
||||||
|
// Happens when lock file event + ghost watch both fire simultaneously,
|
||||||
|
// both call pollNow(), and both session-added events land before activeBoxes is updated.
|
||||||
|
if (sessionAddInProgress.has(sessionKey)) {
|
||||||
|
logger.debug({ sessionKey }, 'session-added already in progress — dedup skip');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (activeBoxes.has(sessionKey)) {
|
||||||
|
logger.debug({ sessionKey }, 'session-added for already-active session — skip');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sessionAddInProgress.add(sessionKey);
|
||||||
|
|
||||||
// Skip if no channel
|
// Skip if no channel
|
||||||
if (!channelId) {
|
if (!channelId) {
|
||||||
logger.debug({ sessionKey }, 'No channel for session — skipping');
|
logger.debug({ sessionKey }, 'No channel for session — skipping');
|
||||||
@@ -374,6 +389,7 @@ async function startDaemon() {
|
|||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error({ sessionKey, err }, 'Failed to create status post');
|
logger.error({ sessionKey, err }, 'Failed to create status post');
|
||||||
globalMetrics.lastError = err.message;
|
globalMetrics.lastError = err.message;
|
||||||
|
sessionAddInProgress.delete(sessionKey);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -386,6 +402,7 @@ async function startDaemon() {
|
|||||||
usePlugin: usePlugin && !!pluginClient, // track which mode this session uses
|
usePlugin: usePlugin && !!pluginClient, // track which mode this session uses
|
||||||
children: new Map(),
|
children: new Map(),
|
||||||
});
|
});
|
||||||
|
sessionAddInProgress.delete(sessionKey);
|
||||||
globalMetrics.activeSessions = activeBoxes.size;
|
globalMetrics.activeSessions = activeBoxes.size;
|
||||||
|
|
||||||
// Register in watcher.
|
// Register in watcher.
|
||||||
|
|||||||
Reference in New Issue
Block a user