Compare commits
8 Commits
backup/pre
...
backup-202
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
07c6fd701b | ||
|
|
5c4a665fb9 | ||
|
|
f1d3ae9c4c | ||
|
|
3fbd46c2d2 | ||
|
|
e483b0bc42 | ||
|
|
888e8af784 | ||
|
|
897abf0a9a | ||
|
|
0b39b39f3b |
@@ -122,6 +122,16 @@ func (p *Plugin) handleCreateSession(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Validate KV key length — Mattermost enforces a 50-char limit.
|
||||
// Encoded key = kvPrefix (11 chars) + url.PathEscape(sessionKey).
|
||||
// Exceeding the limit causes KVSet to silently succeed but never store data.
|
||||
if len(encodeKey(req.SessionKey)) > 50 {
|
||||
writeJSON(w, http.StatusBadRequest, map[string]string{
|
||||
"error": fmt.Sprintf("session_key too long: encoded key length %d exceeds 50-char KV limit", len(encodeKey(req.SessionKey))),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Check max active sessions
|
||||
config := p.getConfiguration()
|
||||
allSessions, _ := p.store.ListAllSessions()
|
||||
|
||||
@@ -21,6 +21,9 @@ type Plugin struct {
|
||||
// store wraps KV store operations for session persistence.
|
||||
store *Store
|
||||
|
||||
// botUserIDLock synchronizes access to botUserID.
|
||||
botUserIDLock sync.RWMutex
|
||||
|
||||
// botUserID is the plugin's bot user ID (created on activation).
|
||||
botUserID string
|
||||
|
||||
@@ -41,7 +44,9 @@ func (p *Plugin) OnActivate() error {
|
||||
if appErr != nil {
|
||||
p.API.LogWarn("Failed to ensure bot user", "error", appErr.Error())
|
||||
} else {
|
||||
p.botUserIDLock.Lock()
|
||||
p.botUserID = botID
|
||||
p.botUserIDLock.Unlock()
|
||||
p.API.LogInfo("Plugin bot user ensured", "botUserID", botID)
|
||||
}
|
||||
|
||||
@@ -75,8 +80,10 @@ func (p *Plugin) sessionCleanupLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
// getBotUserID returns the plugin's bot user ID.
|
||||
// getBotUserID returns the plugin's bot user ID (thread-safe).
|
||||
func (p *Plugin) getBotUserID() string {
|
||||
p.botUserIDLock.RLock()
|
||||
defer p.botUserIDLock.RUnlock()
|
||||
return p.botUserID
|
||||
}
|
||||
|
||||
@@ -93,6 +100,10 @@ func (p *Plugin) OnDeactivate() error {
|
||||
p.API.LogWarn("Failed to list sessions on deactivate", "error", err.Error())
|
||||
} else {
|
||||
for _, s := range sessions {
|
||||
// Skip sessions already in a terminal state — do not overwrite done/error
|
||||
if s.Status == "done" || s.Status == "error" {
|
||||
continue
|
||||
}
|
||||
s.Status = "interrupted"
|
||||
_ = p.store.SaveSession(s.SessionKey, s)
|
||||
p.broadcastUpdate(s.ChannelID, s)
|
||||
|
||||
@@ -61,6 +61,13 @@ class LiveStatusPlugin {
|
||||
|
||||
window.__livestatus_updates[data.post_id] = update;
|
||||
|
||||
// Evict completed sessions from the update cache after 60s to prevent unbounded growth
|
||||
if (data.status === 'done' || data.status === 'error' || data.status === 'interrupted') {
|
||||
setTimeout(() => {
|
||||
delete window.__livestatus_updates[data.post_id];
|
||||
}, 60000);
|
||||
}
|
||||
|
||||
// Notify post-specific listeners
|
||||
const listeners = window.__livestatus_listeners[data.post_id];
|
||||
if (listeners) {
|
||||
@@ -82,7 +89,16 @@ class LiveStatusPlugin {
|
||||
}
|
||||
|
||||
uninitialize(): void {
|
||||
// Cleanup handled by Mattermost plugin framework
|
||||
// Clear global listener and update stores to prevent accumulation across reloads
|
||||
window.__livestatus_listeners = {};
|
||||
window.__livestatus_updates = {};
|
||||
|
||||
// Unregister the custom post type component if it was registered
|
||||
if (this.postTypeComponentId) {
|
||||
// registry is not available here — Mattermost framework cleans up on deactivate.
|
||||
// Clearing postTypeComponentId prevents stale references.
|
||||
this.postTypeComponentId = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -311,6 +311,10 @@ class StatusBox extends EventEmitter {
|
||||
});
|
||||
});
|
||||
|
||||
req.setTimeout(30000, () => {
|
||||
req.destroy(new Error('HTTP request timed out after 30s'));
|
||||
});
|
||||
|
||||
req.on('error', reject);
|
||||
if (bodyStr) req.write(bodyStr);
|
||||
req.end();
|
||||
|
||||
@@ -82,6 +82,10 @@ class StatusWatcher extends EventEmitter {
|
||||
tokenCount: 0,
|
||||
children: [],
|
||||
idleTimer: null,
|
||||
_lineBuffer: '',
|
||||
_lockActive: false,
|
||||
_lockExists: undefined,
|
||||
_lockPollTimer: null,
|
||||
};
|
||||
|
||||
this.sessions.set(sessionKey, state);
|
||||
@@ -98,6 +102,9 @@ class StatusWatcher extends EventEmitter {
|
||||
|
||||
// Start file polling as fallback (fs.watch may not work on bind mounts in Docker)
|
||||
this._startFilePoll(sessionKey, state);
|
||||
|
||||
// Start lock file poll fallback (inotify misses on Docker bind mounts)
|
||||
this._startLockPoll(sessionKey, state);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -130,6 +137,60 @@ class StatusWatcher extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start polling a lock file for changes (fallback for fs.watch on Docker bind mounts).
|
||||
* Idempotent with fs.watch — if fs.watch already fired, _lockActive is already updated
|
||||
* and no duplicate events are emitted.
|
||||
* @private
|
||||
*/
|
||||
_startLockPoll(sessionKey, state) {
|
||||
var self = this;
|
||||
var lockFile = state.transcriptFile + '.lock';
|
||||
|
||||
state._lockPollTimer = setInterval(function () {
|
||||
try {
|
||||
var exists = fs.existsSync(lockFile);
|
||||
|
||||
// First poll: just initialize state, don't emit
|
||||
if (state._lockExists === undefined) {
|
||||
state._lockExists = exists;
|
||||
return;
|
||||
}
|
||||
|
||||
var changed = exists !== state._lockExists;
|
||||
state._lockExists = exists;
|
||||
|
||||
if (!changed) return;
|
||||
|
||||
if (exists) {
|
||||
// Lock file appeared — session activated by user message
|
||||
if (!state._lockActive) {
|
||||
state._lockActive = true;
|
||||
if (self.logger) {
|
||||
self.logger.info({ sessionKey }, 'Lock poll: lock file appeared — emitting session-lock');
|
||||
}
|
||||
self.emit('session-lock', sessionKey);
|
||||
}
|
||||
} else {
|
||||
// Lock file disappeared — turn complete
|
||||
if (state._lockActive) {
|
||||
state._lockActive = false;
|
||||
if (self.logger) {
|
||||
self.logger.info({ sessionKey }, 'Lock poll: lock file removed — emitting session-lock-released');
|
||||
}
|
||||
self.emit('session-lock-released', sessionKey);
|
||||
}
|
||||
}
|
||||
} catch (_e) {
|
||||
// Ignore stat errors (file/dir may not exist yet)
|
||||
}
|
||||
}, 1000);
|
||||
|
||||
if (self.logger) {
|
||||
self.logger.info({ sessionKey }, 'Lock poll timer started');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a session from watching.
|
||||
* @param {string} sessionKey
|
||||
@@ -140,6 +201,7 @@ class StatusWatcher extends EventEmitter {
|
||||
|
||||
if (state.idleTimer) clearTimeout(state.idleTimer);
|
||||
if (state._filePollTimer) clearInterval(state._filePollTimer);
|
||||
if (state._lockPollTimer) clearInterval(state._lockPollTimer);
|
||||
|
||||
// Keep fileToSession mapping alive so fs.watch still fires for this file.
|
||||
// Mark it as a "ghost" — changes trigger 'session-file-changed' so the
|
||||
@@ -211,6 +273,10 @@ class StatusWatcher extends EventEmitter {
|
||||
clearInterval(state._filePollTimer);
|
||||
state._filePollTimer = null;
|
||||
}
|
||||
if (state._lockPollTimer) {
|
||||
clearInterval(state._lockPollTimer);
|
||||
state._lockPollTimer = null;
|
||||
}
|
||||
}
|
||||
if (this.logger) this.logger.info('StatusWatcher stopped');
|
||||
}
|
||||
@@ -236,6 +302,12 @@ class StatusWatcher extends EventEmitter {
|
||||
// Lock file CREATED — gateway started processing user message.
|
||||
// Earliest possible signal: fires before any JSONL write.
|
||||
if (sessionKey) {
|
||||
const sessionState = this.sessions.get(sessionKey);
|
||||
// Sync _lockActive so poll fallback stays idempotent
|
||||
if (sessionState && !sessionState._lockActive) {
|
||||
sessionState._lockActive = true;
|
||||
sessionState._lockExists = true;
|
||||
}
|
||||
if (this.logger) this.logger.info({ sessionKey }, 'Lock file created — session active, triggering early reactivation');
|
||||
this.emit('session-lock', sessionKey);
|
||||
} else {
|
||||
@@ -245,6 +317,12 @@ class StatusWatcher extends EventEmitter {
|
||||
// Lock file DELETED — gateway finished the turn and sent final reply.
|
||||
// Immediate idle signal: no need to wait for cache-ttl or 60s timeout.
|
||||
if (sessionKey) {
|
||||
const sessionState = this.sessions.get(sessionKey);
|
||||
// Sync _lockActive so poll fallback stays idempotent
|
||||
if (sessionState && sessionState._lockActive) {
|
||||
sessionState._lockActive = false;
|
||||
sessionState._lockExists = false;
|
||||
}
|
||||
if (this.logger) this.logger.info({ sessionKey }, 'Lock file deleted — turn complete, marking session done immediately');
|
||||
this.emit('session-lock-released', sessionKey);
|
||||
} else {
|
||||
@@ -263,12 +341,11 @@ class StatusWatcher extends EventEmitter {
|
||||
// Ghost watch: file changed for a completed session — signal immediate re-detection
|
||||
if (sessionKey.startsWith('\x00ghost:')) {
|
||||
const originalKey = sessionKey.slice(7);
|
||||
// Remove ghost so we don't fire repeatedly
|
||||
this.fileToSession.delete(fullPath);
|
||||
// Do NOT delete ghost entry here — let caller clean up after pollNow confirms the session
|
||||
if (this.logger) {
|
||||
this.logger.info({ sessionKey: originalKey }, 'fs.watch: file change on completed session — triggering reactivation');
|
||||
}
|
||||
this.emit('session-reactivate', originalKey);
|
||||
this.emit('session-reactivate', originalKey, fullPath);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -320,9 +397,11 @@ class StatusWatcher extends EventEmitter {
|
||||
|
||||
state.lastOffset += bytesRead;
|
||||
|
||||
// Parse JSONL lines
|
||||
// Parse JSONL lines — handle partial lines at chunk boundary
|
||||
const chunk = buffer.toString('utf8', 0, bytesRead);
|
||||
const lines = chunk.split('\n').filter((l) => l.trim());
|
||||
const raw = (state._lineBuffer || '') + chunk;
|
||||
state._lineBuffer = raw.endsWith('\n') ? '' : raw.split('\n').pop();
|
||||
const lines = raw.split('\n').slice(0, raw.endsWith('\n') ? undefined : -1).filter((l) => l.trim());
|
||||
|
||||
for (const line of lines) {
|
||||
this._parseLine(sessionKey, state, line);
|
||||
@@ -539,6 +618,17 @@ class StatusWatcher extends EventEmitter {
|
||||
const elapsed = Date.now() - state.lastActivityAt;
|
||||
const idleMs = this.idleTimeoutS * 1000;
|
||||
|
||||
// Safeguard: if pendingToolCalls is stuck > 0 for more than 30s, clamp to 0
|
||||
if (state.pendingToolCalls > 0 && elapsed > 30000) {
|
||||
if (this.logger) {
|
||||
this.logger.warn(
|
||||
{ sessionKey, pendingToolCalls: state.pendingToolCalls, elapsedS: Math.floor(elapsed / 1000) },
|
||||
'_checkIdle: pendingToolCalls stuck > 30s — clamping to 0 to unblock idle detection',
|
||||
);
|
||||
}
|
||||
state.pendingToolCalls = 0;
|
||||
}
|
||||
|
||||
if (elapsed >= idleMs && state.pendingToolCalls === 0) {
|
||||
if (this.logger) {
|
||||
this.logger.info({ sessionKey, elapsedS: Math.floor(elapsed / 1000) }, 'Session idle');
|
||||
|
||||
@@ -47,8 +47,19 @@ if (cmd === 'start') {
|
||||
|
||||
// ---- PID File helpers ----
|
||||
function writePidFile(pidFile) {
|
||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||
fs.writeFileSync(pidFile, String(process.pid), 'utf8');
|
||||
try {
|
||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||
const fd = fs.openSync(pidFile, 'wx');
|
||||
fs.writeSync(fd, String(process.pid));
|
||||
fs.closeSync(fd);
|
||||
} catch (err) {
|
||||
if (err.code === 'EEXIST') {
|
||||
// Another process won the race — bail out
|
||||
console.error('PID file already exists — another daemon may be running');
|
||||
process.exit(1);
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
function readPidFile(pidFile) {
|
||||
@@ -100,8 +111,10 @@ function saveOffsets(offsetFile, sessions) {
|
||||
try {
|
||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||
fs.writeFileSync(offsetFile, JSON.stringify(offsets, null, 2), 'utf8');
|
||||
} catch (_e) {
|
||||
/* ignore write error */
|
||||
} catch (writeErr) {
|
||||
// Log disk-full / permission errors so they are visible in daemon logs
|
||||
// eslint-disable-next-line no-console
|
||||
console.warn('[saveOffsets] Failed to write offset file:', writeErr.message);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,6 +128,16 @@ async function startDaemon() {
|
||||
const config = getConfig();
|
||||
const logger = getLogger();
|
||||
|
||||
// Global error handlers — prevent silent daemon death from unhandled rejections
|
||||
process.on('unhandledRejection', (reason) => {
|
||||
logger.error({ reason }, 'Unhandled promise rejection — shutting down');
|
||||
shutdown().finally(() => process.exit(1));
|
||||
});
|
||||
process.on('uncaughtException', (err) => {
|
||||
logger.error({ err }, 'Uncaught exception — shutting down');
|
||||
shutdown().finally(() => process.exit(1));
|
||||
});
|
||||
|
||||
// Check if already running
|
||||
const existingPid = readPidFile(config.pidFile);
|
||||
if (existingPid && isProcessRunning(existingPid)) {
|
||||
@@ -137,6 +160,8 @@ async function startDaemon() {
|
||||
// Shared state
|
||||
// Map<sessionKey, { postId, agentId, channelId, rootPostId, children: Map }>
|
||||
const activeBoxes = new Map();
|
||||
// Guard against concurrent session-added events for the same key (e.g. lock + ghost fire simultaneously)
|
||||
const sessionAddInProgress = new Set();
|
||||
|
||||
// Completed sessions: Map<sessionKey, { postId, lastOffset }>
|
||||
// Tracks sessions that went idle so we can reuse their post on reactivation
|
||||
@@ -233,12 +258,59 @@ async function startDaemon() {
|
||||
monitor.on('session-added', async (info) => {
|
||||
const { sessionKey, transcriptFile, spawnedBy, channelId, rootPostId, agentId } = info;
|
||||
|
||||
// Guard: prevent duplicate concurrent session-added for same key.
|
||||
// Happens when lock file event + ghost watch both fire simultaneously,
|
||||
// both call pollNow(), and both session-added events land before activeBoxes is updated.
|
||||
if (sessionAddInProgress.has(sessionKey)) {
|
||||
logger.debug({ sessionKey }, 'session-added already in progress — dedup skip');
|
||||
return;
|
||||
}
|
||||
if (activeBoxes.has(sessionKey)) {
|
||||
logger.debug({ sessionKey }, 'session-added for already-active session — skip');
|
||||
return;
|
||||
}
|
||||
sessionAddInProgress.add(sessionKey);
|
||||
|
||||
// Skip if no channel
|
||||
if (!channelId) {
|
||||
logger.debug({ sessionKey }, 'No channel for session — skipping');
|
||||
return;
|
||||
}
|
||||
|
||||
// Skip heartbeat/internal sessions (agent:main:main, agent:main:cli, etc.)
|
||||
// These have no real Mattermost conversation context and produce spurious status boxes.
|
||||
// A heartbeat session key ends with ':main' and has no channel/thread suffix.
|
||||
if (/^agent:[^:]+:main$/.test(sessionKey) || /^agent:[^:]+:cli$/.test(sessionKey)) {
|
||||
logger.debug({ sessionKey }, 'Heartbeat/internal session — skipping status box');
|
||||
return;
|
||||
}
|
||||
|
||||
// Dedup: skip if another active session already owns this channel.
|
||||
// This prevents duplicate status boxes when a threadless parent session and a
|
||||
// thread-specific child session both resolve to the same MM channel/DM.
|
||||
// Thread sessions (containing ':thread:') take priority over bare channel sessions.
|
||||
const isThreadSession = sessionKey.includes(':thread:');
|
||||
for (const [existingKey, existingBox] of activeBoxes) {
|
||||
if (existingBox.channelId === channelId) {
|
||||
const existingIsThread = existingKey.includes(':thread:');
|
||||
if (isThreadSession && !existingIsThread) {
|
||||
// New session is a thread — it takes priority. Remove the parent box.
|
||||
logger.info({ sessionKey, displaced: existingKey }, 'Thread session displacing bare channel session');
|
||||
activeBoxes.delete(existingKey);
|
||||
watcher.removeSession(existingKey);
|
||||
} else if (!isThreadSession && existingIsThread) {
|
||||
// Existing session is a thread — skip this bare channel session.
|
||||
logger.debug({ sessionKey, existingKey }, 'Bare channel session skipped — thread session already owns this channel');
|
||||
return;
|
||||
} else {
|
||||
// Same type — skip the newcomer, first-in wins.
|
||||
logger.debug({ sessionKey, existingKey }, 'Duplicate channel session skipped');
|
||||
return;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Enforce MAX_ACTIVE_SESSIONS
|
||||
if (activeBoxes.size >= config.maxActiveSessions) {
|
||||
logger.warn(
|
||||
@@ -317,6 +389,7 @@ async function startDaemon() {
|
||||
} catch (err) {
|
||||
logger.error({ sessionKey, err }, 'Failed to create status post');
|
||||
globalMetrics.lastError = err.message;
|
||||
sessionAddInProgress.delete(sessionKey);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -329,6 +402,7 @@ async function startDaemon() {
|
||||
usePlugin: usePlugin && !!pluginClient, // track which mode this session uses
|
||||
children: new Map(),
|
||||
});
|
||||
sessionAddInProgress.delete(sessionKey);
|
||||
globalMetrics.activeSessions = activeBoxes.size;
|
||||
|
||||
// Register in watcher.
|
||||
@@ -373,11 +447,13 @@ async function startDaemon() {
|
||||
// ---- Ghost reactivation (from watcher fs.watch on completed session file) ----
|
||||
// Fires immediately when the transcript file changes after a session completes.
|
||||
// Clears the completedSessions cooldown so the next monitor poll re-detects instantly.
|
||||
watcher.on('session-reactivate', (sessionKey) => {
|
||||
watcher.on('session-reactivate', (sessionKey, ghostPath) => {
|
||||
logger.info({ sessionKey }, 'Ghost watch triggered reactivation — clearing completed cooldown');
|
||||
monitor.clearCompleted(sessionKey);
|
||||
// Force an immediate poll so the session is re-added without waiting 2s
|
||||
monitor.pollNow();
|
||||
// Clean up ghost entry now — clearCompleted+pollNow is sufficient, ghost served its purpose
|
||||
if (ghostPath) watcher.fileToSession.delete(ghostPath);
|
||||
});
|
||||
|
||||
// ---- Lock file reactivation (earliest possible trigger) ----
|
||||
@@ -437,11 +513,17 @@ async function startDaemon() {
|
||||
}),
|
||||
start_time_ms: state.startTime,
|
||||
}).catch(function (err) {
|
||||
// If plugin rejects (e.g. session not found), fall back to REST for this session
|
||||
logger.warn({ sessionKey, err: err.message }, 'Plugin update failed — falling back to REST');
|
||||
box.usePlugin = false;
|
||||
var fallbackText = buildStatusText(box, state, activeBoxes, watcher, sessionKey);
|
||||
sharedStatusBox.updatePost(box.postId, fallbackText).catch(function () {});
|
||||
// Only permanently disable plugin mode for hard failures (non-retryable errors).
|
||||
// 429 and 5xx are transient — keep plugin mode and retry on next update.
|
||||
if (err.statusCode === 429 || (err.statusCode >= 500 && err.statusCode < 600)) {
|
||||
logger.warn({ sessionKey, statusCode: err.statusCode }, 'Plugin API transient error — keeping plugin mode, will retry next update');
|
||||
// do NOT set box.usePlugin = false
|
||||
} else {
|
||||
logger.warn({ sessionKey, err: err.message }, 'Plugin API hard failure — falling back to REST');
|
||||
box.usePlugin = false;
|
||||
var fallbackText = buildStatusText(box, state, activeBoxes, watcher, sessionKey);
|
||||
sharedStatusBox.updatePost(box.postId, fallbackText).catch(function () {});
|
||||
}
|
||||
});
|
||||
} else {
|
||||
// REST API fallback: format text and PUT update post
|
||||
@@ -459,7 +541,7 @@ async function startDaemon() {
|
||||
|
||||
// ---- Session Idle (from watcher) ----
|
||||
watcher.on('session-idle', async (sessionKey, state) => {
|
||||
const box = activeBoxes.get(sessionKey);
|
||||
const box = activeBoxes.get(sessionKey); // snapshot at entry
|
||||
if (!box) {
|
||||
// Sub-agent completed
|
||||
updateParentWithChild(activeBoxes, watcher, sharedStatusBox, pluginClient, sessionKey, state, logger);
|
||||
@@ -492,12 +574,23 @@ async function startDaemon() {
|
||||
logger.error({ sessionKey, err }, 'Failed to update final status');
|
||||
}
|
||||
|
||||
// Guard: if the box was replaced during the awaits above (new session reactivated),
|
||||
// skip cleanup to avoid killing the newly re-added session.
|
||||
if (activeBoxes.get(sessionKey) !== box) {
|
||||
logger.info({ sessionKey }, 'session-idle: box replaced during await — skipping cleanup (new session active)');
|
||||
return;
|
||||
}
|
||||
|
||||
// Save to completedBoxes so we can reuse the post ID if the session reactivates
|
||||
completedBoxes.set(sessionKey, {
|
||||
postId: box.postId,
|
||||
lastOffset: state.lastOffset || 0,
|
||||
});
|
||||
|
||||
// Persist final offsets BEFORE removing session from watcher so the
|
||||
// completed session's last offset is captured in the snapshot.
|
||||
saveOffsets(config.offsetFile, watcher.sessions);
|
||||
|
||||
// Clean up active tracking
|
||||
activeBoxes.delete(sessionKey);
|
||||
watcher.removeSession(sessionKey);
|
||||
@@ -505,9 +598,6 @@ async function startDaemon() {
|
||||
// ensures we reuse the existing post instead of creating a new one.
|
||||
monitor.forgetSession(sessionKey);
|
||||
globalMetrics.activeSessions = activeBoxes.size;
|
||||
|
||||
// Persist final offsets
|
||||
saveOffsets(config.offsetFile, watcher.sessions);
|
||||
});
|
||||
|
||||
// ---- Start all subsystems ----
|
||||
@@ -534,6 +624,9 @@ async function startDaemon() {
|
||||
|
||||
logger.info({ signal }, 'Shutting down gracefully');
|
||||
|
||||
// Persist offsets for all active sessions before stopping the watcher
|
||||
saveOffsets(config.offsetFile, watcher.sessions);
|
||||
|
||||
// Stop accepting new sessions
|
||||
monitor.stop();
|
||||
watcher.stop();
|
||||
|
||||
Reference in New Issue
Block a user