fix: delete+recreate status post, file polling fallback

- StatusBox: delete+recreate instead of PUT to keep post at thread bottom
  (Mattermost clears pin on PUT and doesn't bump edited posts)
- StatusBox: extends EventEmitter, emits 'post-replaced' events
- StatusWatcher: 500ms file polling fallback (fs.watch unreliable on
  Docker bind mounts / overlay fs)
- WatcherManager: handles post-replaced events to update activeBoxes
- SessionMonitor: forgetSession() for idle session re-detection
This commit is contained in:
sol
2026-03-07 19:07:01 +00:00
parent 3a8532bb30
commit bbafdaf2d8
3 changed files with 83 additions and 15 deletions

View File

@@ -14,6 +14,7 @@
const https = require('https'); const https = require('https');
const http = require('http'); const http = require('http');
const { EventEmitter } = require('events');
const { CircuitBreaker } = require('./circuit-breaker'); const { CircuitBreaker } = require('./circuit-breaker');
const DEFAULT_THROTTLE_MS = 500; const DEFAULT_THROTTLE_MS = 500;
@@ -21,7 +22,7 @@ const DEFAULT_MAX_CHARS = 15000;
const DEFAULT_MAX_RETRIES = 3; const DEFAULT_MAX_RETRIES = 3;
const DEFAULT_MAX_SOCKETS = 4; const DEFAULT_MAX_SOCKETS = 4;
class StatusBox { class StatusBox extends EventEmitter {
/** /**
* @param {object} opts * @param {object} opts
* @param {string} opts.baseUrl - Mattermost base URL * @param {string} opts.baseUrl - Mattermost base URL
@@ -34,6 +35,7 @@ class StatusBox {
* @param {CircuitBreaker} [opts.circuitBreaker] * @param {CircuitBreaker} [opts.circuitBreaker]
*/ */
constructor(opts) { constructor(opts) {
super();
this.baseUrl = opts.baseUrl; this.baseUrl = opts.baseUrl;
this.token = opts.token; this.token = opts.token;
this.logger = opts.logger || null; this.logger = opts.logger || null;
@@ -74,6 +76,10 @@ class StatusBox {
// Throttle state per postId // Throttle state per postId
// Map<postId, { pending: string|null, timer: NodeJS.Timeout|null, lastFiredAt: number }> // Map<postId, { pending: string|null, timer: NodeJS.Timeout|null, lastFiredAt: number }>
this._throttleState = new Map(); this._throttleState = new Map();
// Track post metadata for delete+recreate
// Map<postId, { channelId, rootId }>
this._postInfo = new Map();
} }
/** /**
@@ -91,13 +97,8 @@ class StatusBox {
if (this.logger) this.logger.debug({ postId: post.id, channelId }, 'Created status post'); if (this.logger) this.logger.debug({ postId: post.id, channelId }, 'Created status post');
this.metrics.updatesSent++; this.metrics.updatesSent++;
// Pin the status post so it's always visible // Track post info for delete+recreate
try { this._postInfo.set(post.id, { channelId, rootId: rootId || null });
await this._apiCall('POST', `/api/v4/posts/${post.id}/pin`, null);
if (this.logger) this.logger.debug({ postId: post.id }, 'Status post pinned');
} catch (pinErr) {
if (this.logger) this.logger.warn({ postId: post.id, err: pinErr }, 'Failed to pin status post');
}
return post.id; return post.id;
} }
@@ -162,12 +163,40 @@ class StatusBox {
this.metrics.queueDepth = Math.max(0, this.metrics.queueDepth - 1); this.metrics.queueDepth = Math.max(0, this.metrics.queueDepth - 1);
try { try {
await this._apiCallWithRetry('PUT', `/api/v4/posts/${postId}`, { // Delete + recreate to keep status post at the bottom of the thread
id: postId, // Mattermost clears pin on PUT, and edited posts stay at their original position
message: this._truncate(text), const postInfo = this._postInfo.get(postId);
}); if (postInfo) {
this.metrics.updatesSent++; try {
resolvers.forEach(({ resolve }) => resolve()); await this._httpRequest('DELETE', `/api/v4/posts/${postId}`, null);
} catch (_delErr) {
// If delete fails, fall back to PUT
}
const body = { channel_id: postInfo.channelId, message: this._truncate(text) };
if (postInfo.rootId) body.root_id = postInfo.rootId;
const newPost = await this._httpRequest('POST', '/api/v4/posts', body);
// Update tracking
this._postInfo.set(newPost.id, postInfo);
this._postInfo.delete(postId);
// Update throttle state key
const newState = this._throttleState.get(postId);
if (newState) {
this._throttleState.delete(postId);
this._throttleState.set(newPost.id, { pending: null, timer: null, lastFiredAt: Date.now(), resolvers: [] });
}
// Notify caller of new post ID
this.emit('post-replaced', postId, newPost.id);
this.metrics.updatesSent++;
resolvers.forEach(({ resolve }) => resolve());
} else {
// Fallback: regular PUT update
await this._apiCallWithRetry('PUT', `/api/v4/posts/${postId}`, {
id: postId,
message: this._truncate(text),
});
this.metrics.updatesSent++;
resolvers.forEach(({ resolve }) => resolve());
}
} catch (err) { } catch (err) {
this.metrics.updatesFailed++; this.metrics.updatesFailed++;
resolvers.forEach(({ reject }) => reject(err)); resolvers.forEach(({ reject }) => reject(err));

View File

@@ -90,6 +90,28 @@ class StatusWatcher extends EventEmitter {
if (initialState.lastOffset) { if (initialState.lastOffset) {
this._readFile(sessionKey, state); this._readFile(sessionKey, state);
} }
// Start file polling as fallback (fs.watch may not work on bind mounts in Docker)
this._startFilePoll(sessionKey, state);
}
/**
* Start polling a transcript file for changes (fallback for fs.watch).
* @private
*/
_startFilePoll(sessionKey, state) {
var self = this;
var pollInterval = 500; // 500ms poll
state._filePollTimer = setInterval(function () {
try {
var stat = fs.statSync(state.transcriptFile);
if (stat.size > state.lastOffset) {
self._readFile(sessionKey, state);
}
} catch (_e) {
// File might not exist yet or was deleted
}
}, pollInterval);
} }
/** /**
@@ -101,6 +123,7 @@ class StatusWatcher extends EventEmitter {
if (!state) return; if (!state) return;
if (state.idleTimer) clearTimeout(state.idleTimer); if (state.idleTimer) clearTimeout(state.idleTimer);
if (state._filePollTimer) clearInterval(state._filePollTimer);
this.fileToSession.delete(state.transcriptFile); this.fileToSession.delete(state.transcriptFile);
this.sessions.delete(sessionKey); this.sessions.delete(sessionKey);
@@ -158,12 +181,16 @@ class StatusWatcher extends EventEmitter {
this._watcher.close(); this._watcher.close();
this._watcher = null; this._watcher = null;
} }
// Clear all idle timers // Clear all timers
for (const [, state] of this.sessions) { for (const [, state] of this.sessions) {
if (state.idleTimer) { if (state.idleTimer) {
clearTimeout(state.idleTimer); clearTimeout(state.idleTimer);
state.idleTimer = null; state.idleTimer = null;
} }
if (state._filePollTimer) {
clearInterval(state._filePollTimer);
state._filePollTimer = null;
}
} }
if (this.logger) this.logger.info('StatusWatcher stopped'); if (this.logger) this.logger.info('StatusWatcher stopped');
} }

View File

@@ -282,6 +282,18 @@ async function startDaemon() {
logger.debug({ sessionKey }, 'Session removed from sessions.json'); logger.debug({ sessionKey }, 'Session removed from sessions.json');
}); });
// ---- Post Replaced (delete+recreate) ----
sharedStatusBox.on('post-replaced', (oldPostId, newPostId) => {
// Update activeBoxes to point to new post ID
for (var entry of activeBoxes.entries()) {
if (entry[1].postId === oldPostId) {
entry[1].postId = newPostId;
logger.debug({ sessionKey: entry[0], oldPostId, newPostId }, 'Post replaced (delete+recreate)');
break;
}
}
});
// ---- Session Update (from watcher) ---- // ---- Session Update (from watcher) ----
watcher.on('session-update', (sessionKey, state) => { watcher.on('session-update', (sessionKey, state) => {
const box = activeBoxes.get(sessionKey); const box = activeBoxes.get(sessionKey);