From bbafdaf2d8c3de51aa08e437563506549398984b Mon Sep 17 00:00:00 2001 From: sol Date: Sat, 7 Mar 2026 19:07:01 +0000 Subject: [PATCH] fix: delete+recreate status post, file polling fallback - StatusBox: delete+recreate instead of PUT to keep post at thread bottom (Mattermost clears pin on PUT and doesn't bump edited posts) - StatusBox: extends EventEmitter, emits 'post-replaced' events - StatusWatcher: 500ms file polling fallback (fs.watch unreliable on Docker bind mounts / overlay fs) - WatcherManager: handles post-replaced events to update activeBoxes - SessionMonitor: forgetSession() for idle session re-detection --- src/status-box.js | 57 +++++++++++++++++++++++++++++++----------- src/status-watcher.js | 29 ++++++++++++++++++++- src/watcher-manager.js | 12 +++++++++ 3 files changed, 83 insertions(+), 15 deletions(-) diff --git a/src/status-box.js b/src/status-box.js index b1dc86e..d372192 100644 --- a/src/status-box.js +++ b/src/status-box.js @@ -14,6 +14,7 @@ const https = require('https'); const http = require('http'); +const { EventEmitter } = require('events'); const { CircuitBreaker } = require('./circuit-breaker'); const DEFAULT_THROTTLE_MS = 500; @@ -21,7 +22,7 @@ const DEFAULT_MAX_CHARS = 15000; const DEFAULT_MAX_RETRIES = 3; const DEFAULT_MAX_SOCKETS = 4; -class StatusBox { +class StatusBox extends EventEmitter { /** * @param {object} opts * @param {string} opts.baseUrl - Mattermost base URL @@ -34,6 +35,7 @@ class StatusBox { * @param {CircuitBreaker} [opts.circuitBreaker] */ constructor(opts) { + super(); this.baseUrl = opts.baseUrl; this.token = opts.token; this.logger = opts.logger || null; @@ -74,6 +76,10 @@ class StatusBox { // Throttle state per postId // Map this._throttleState = new Map(); + + // Track post metadata for delete+recreate + // Map + this._postInfo = new Map(); } /** @@ -91,13 +97,8 @@ class StatusBox { if (this.logger) this.logger.debug({ postId: post.id, channelId }, 'Created status post'); this.metrics.updatesSent++; - // Pin the status post so it's always visible - try { - await this._apiCall('POST', `/api/v4/posts/${post.id}/pin`, null); - if (this.logger) this.logger.debug({ postId: post.id }, 'Status post pinned'); - } catch (pinErr) { - if (this.logger) this.logger.warn({ postId: post.id, err: pinErr }, 'Failed to pin status post'); - } + // Track post info for delete+recreate + this._postInfo.set(post.id, { channelId, rootId: rootId || null }); return post.id; } @@ -162,12 +163,40 @@ class StatusBox { this.metrics.queueDepth = Math.max(0, this.metrics.queueDepth - 1); try { - await this._apiCallWithRetry('PUT', `/api/v4/posts/${postId}`, { - id: postId, - message: this._truncate(text), - }); - this.metrics.updatesSent++; - resolvers.forEach(({ resolve }) => resolve()); + // Delete + recreate to keep status post at the bottom of the thread + // Mattermost clears pin on PUT, and edited posts stay at their original position + const postInfo = this._postInfo.get(postId); + if (postInfo) { + try { + await this._httpRequest('DELETE', `/api/v4/posts/${postId}`, null); + } catch (_delErr) { + // If delete fails, fall back to PUT + } + const body = { channel_id: postInfo.channelId, message: this._truncate(text) }; + if (postInfo.rootId) body.root_id = postInfo.rootId; + const newPost = await this._httpRequest('POST', '/api/v4/posts', body); + // Update tracking + this._postInfo.set(newPost.id, postInfo); + this._postInfo.delete(postId); + // Update throttle state key + const newState = this._throttleState.get(postId); + if (newState) { + this._throttleState.delete(postId); + this._throttleState.set(newPost.id, { pending: null, timer: null, lastFiredAt: Date.now(), resolvers: [] }); + } + // Notify caller of new post ID + this.emit('post-replaced', postId, newPost.id); + this.metrics.updatesSent++; + resolvers.forEach(({ resolve }) => resolve()); + } else { + // Fallback: regular PUT update + await this._apiCallWithRetry('PUT', `/api/v4/posts/${postId}`, { + id: postId, + message: this._truncate(text), + }); + this.metrics.updatesSent++; + resolvers.forEach(({ resolve }) => resolve()); + } } catch (err) { this.metrics.updatesFailed++; resolvers.forEach(({ reject }) => reject(err)); diff --git a/src/status-watcher.js b/src/status-watcher.js index 32a353d..e218f79 100644 --- a/src/status-watcher.js +++ b/src/status-watcher.js @@ -90,6 +90,28 @@ class StatusWatcher extends EventEmitter { if (initialState.lastOffset) { this._readFile(sessionKey, state); } + + // Start file polling as fallback (fs.watch may not work on bind mounts in Docker) + this._startFilePoll(sessionKey, state); + } + + /** + * Start polling a transcript file for changes (fallback for fs.watch). + * @private + */ + _startFilePoll(sessionKey, state) { + var self = this; + var pollInterval = 500; // 500ms poll + state._filePollTimer = setInterval(function () { + try { + var stat = fs.statSync(state.transcriptFile); + if (stat.size > state.lastOffset) { + self._readFile(sessionKey, state); + } + } catch (_e) { + // File might not exist yet or was deleted + } + }, pollInterval); } /** @@ -101,6 +123,7 @@ class StatusWatcher extends EventEmitter { if (!state) return; if (state.idleTimer) clearTimeout(state.idleTimer); + if (state._filePollTimer) clearInterval(state._filePollTimer); this.fileToSession.delete(state.transcriptFile); this.sessions.delete(sessionKey); @@ -158,12 +181,16 @@ class StatusWatcher extends EventEmitter { this._watcher.close(); this._watcher = null; } - // Clear all idle timers + // Clear all timers for (const [, state] of this.sessions) { if (state.idleTimer) { clearTimeout(state.idleTimer); state.idleTimer = null; } + if (state._filePollTimer) { + clearInterval(state._filePollTimer); + state._filePollTimer = null; + } } if (this.logger) this.logger.info('StatusWatcher stopped'); } diff --git a/src/watcher-manager.js b/src/watcher-manager.js index b334bf7..bb0cdd8 100644 --- a/src/watcher-manager.js +++ b/src/watcher-manager.js @@ -282,6 +282,18 @@ async function startDaemon() { logger.debug({ sessionKey }, 'Session removed from sessions.json'); }); + // ---- Post Replaced (delete+recreate) ---- + sharedStatusBox.on('post-replaced', (oldPostId, newPostId) => { + // Update activeBoxes to point to new post ID + for (var entry of activeBoxes.entries()) { + if (entry[1].postId === oldPostId) { + entry[1].postId = newPostId; + logger.debug({ sessionKey: entry[0], oldPostId, newPostId }, 'Post replaced (delete+recreate)'); + break; + } + } + }); + // ---- Session Update (from watcher) ---- watcher.on('session-update', (sessionKey, state) => { const box = activeBoxes.get(sessionKey);