fix(batch1): safe fixes — saveOffsets ordering, pid atomic, error handlers, go mutex, frontend cleanup
This commit is contained in:
@@ -122,6 +122,16 @@ func (p *Plugin) handleCreateSession(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Validate KV key length — Mattermost enforces a 50-char limit.
|
||||||
|
// Encoded key = kvPrefix (11 chars) + url.PathEscape(sessionKey).
|
||||||
|
// Exceeding the limit causes KVSet to silently succeed but never store data.
|
||||||
|
if len(encodeKey(req.SessionKey)) > 50 {
|
||||||
|
writeJSON(w, http.StatusBadRequest, map[string]string{
|
||||||
|
"error": fmt.Sprintf("session_key too long: encoded key length %d exceeds 50-char KV limit", len(encodeKey(req.SessionKey))),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Check max active sessions
|
// Check max active sessions
|
||||||
config := p.getConfiguration()
|
config := p.getConfiguration()
|
||||||
allSessions, _ := p.store.ListAllSessions()
|
allSessions, _ := p.store.ListAllSessions()
|
||||||
|
|||||||
@@ -21,6 +21,9 @@ type Plugin struct {
|
|||||||
// store wraps KV store operations for session persistence.
|
// store wraps KV store operations for session persistence.
|
||||||
store *Store
|
store *Store
|
||||||
|
|
||||||
|
// botUserIDLock synchronizes access to botUserID.
|
||||||
|
botUserIDLock sync.RWMutex
|
||||||
|
|
||||||
// botUserID is the plugin's bot user ID (created on activation).
|
// botUserID is the plugin's bot user ID (created on activation).
|
||||||
botUserID string
|
botUserID string
|
||||||
|
|
||||||
@@ -41,7 +44,9 @@ func (p *Plugin) OnActivate() error {
|
|||||||
if appErr != nil {
|
if appErr != nil {
|
||||||
p.API.LogWarn("Failed to ensure bot user", "error", appErr.Error())
|
p.API.LogWarn("Failed to ensure bot user", "error", appErr.Error())
|
||||||
} else {
|
} else {
|
||||||
|
p.botUserIDLock.Lock()
|
||||||
p.botUserID = botID
|
p.botUserID = botID
|
||||||
|
p.botUserIDLock.Unlock()
|
||||||
p.API.LogInfo("Plugin bot user ensured", "botUserID", botID)
|
p.API.LogInfo("Plugin bot user ensured", "botUserID", botID)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -75,8 +80,10 @@ func (p *Plugin) sessionCleanupLoop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getBotUserID returns the plugin's bot user ID.
|
// getBotUserID returns the plugin's bot user ID (thread-safe).
|
||||||
func (p *Plugin) getBotUserID() string {
|
func (p *Plugin) getBotUserID() string {
|
||||||
|
p.botUserIDLock.RLock()
|
||||||
|
defer p.botUserIDLock.RUnlock()
|
||||||
return p.botUserID
|
return p.botUserID
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -93,6 +100,10 @@ func (p *Plugin) OnDeactivate() error {
|
|||||||
p.API.LogWarn("Failed to list sessions on deactivate", "error", err.Error())
|
p.API.LogWarn("Failed to list sessions on deactivate", "error", err.Error())
|
||||||
} else {
|
} else {
|
||||||
for _, s := range sessions {
|
for _, s := range sessions {
|
||||||
|
// Skip sessions already in a terminal state — do not overwrite done/error
|
||||||
|
if s.Status == "done" || s.Status == "error" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
s.Status = "interrupted"
|
s.Status = "interrupted"
|
||||||
_ = p.store.SaveSession(s.SessionKey, s)
|
_ = p.store.SaveSession(s.SessionKey, s)
|
||||||
p.broadcastUpdate(s.ChannelID, s)
|
p.broadcastUpdate(s.ChannelID, s)
|
||||||
|
|||||||
@@ -61,6 +61,13 @@ class LiveStatusPlugin {
|
|||||||
|
|
||||||
window.__livestatus_updates[data.post_id] = update;
|
window.__livestatus_updates[data.post_id] = update;
|
||||||
|
|
||||||
|
// Evict completed sessions from the update cache after 60s to prevent unbounded growth
|
||||||
|
if (data.status === 'done' || data.status === 'error' || data.status === 'interrupted') {
|
||||||
|
setTimeout(() => {
|
||||||
|
delete window.__livestatus_updates[data.post_id];
|
||||||
|
}, 60000);
|
||||||
|
}
|
||||||
|
|
||||||
// Notify post-specific listeners
|
// Notify post-specific listeners
|
||||||
const listeners = window.__livestatus_listeners[data.post_id];
|
const listeners = window.__livestatus_listeners[data.post_id];
|
||||||
if (listeners) {
|
if (listeners) {
|
||||||
@@ -82,7 +89,16 @@ class LiveStatusPlugin {
|
|||||||
}
|
}
|
||||||
|
|
||||||
uninitialize(): void {
|
uninitialize(): void {
|
||||||
// Cleanup handled by Mattermost plugin framework
|
// Clear global listener and update stores to prevent accumulation across reloads
|
||||||
|
window.__livestatus_listeners = {};
|
||||||
|
window.__livestatus_updates = {};
|
||||||
|
|
||||||
|
// Unregister the custom post type component if it was registered
|
||||||
|
if (this.postTypeComponentId) {
|
||||||
|
// registry is not available here — Mattermost framework cleans up on deactivate.
|
||||||
|
// Clearing postTypeComponentId prevents stale references.
|
||||||
|
this.postTypeComponentId = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -47,8 +47,19 @@ if (cmd === 'start') {
|
|||||||
|
|
||||||
// ---- PID File helpers ----
|
// ---- PID File helpers ----
|
||||||
function writePidFile(pidFile) {
|
function writePidFile(pidFile) {
|
||||||
|
try {
|
||||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
fs.writeFileSync(pidFile, String(process.pid), 'utf8');
|
const fd = fs.openSync(pidFile, 'wx');
|
||||||
|
fs.writeSync(fd, String(process.pid));
|
||||||
|
fs.closeSync(fd);
|
||||||
|
} catch (err) {
|
||||||
|
if (err.code === 'EEXIST') {
|
||||||
|
// Another process won the race — bail out
|
||||||
|
console.error('PID file already exists — another daemon may be running');
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function readPidFile(pidFile) {
|
function readPidFile(pidFile) {
|
||||||
@@ -100,8 +111,10 @@ function saveOffsets(offsetFile, sessions) {
|
|||||||
try {
|
try {
|
||||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
fs.writeFileSync(offsetFile, JSON.stringify(offsets, null, 2), 'utf8');
|
fs.writeFileSync(offsetFile, JSON.stringify(offsets, null, 2), 'utf8');
|
||||||
} catch (_e) {
|
} catch (writeErr) {
|
||||||
/* ignore write error */
|
// Log disk-full / permission errors so they are visible in daemon logs
|
||||||
|
// eslint-disable-next-line no-console
|
||||||
|
console.warn('[saveOffsets] Failed to write offset file:', writeErr.message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,6 +128,16 @@ async function startDaemon() {
|
|||||||
const config = getConfig();
|
const config = getConfig();
|
||||||
const logger = getLogger();
|
const logger = getLogger();
|
||||||
|
|
||||||
|
// Global error handlers — prevent silent daemon death from unhandled rejections
|
||||||
|
process.on('unhandledRejection', (reason) => {
|
||||||
|
logger.error({ reason }, 'Unhandled promise rejection — shutting down');
|
||||||
|
shutdown().finally(() => process.exit(1));
|
||||||
|
});
|
||||||
|
process.on('uncaughtException', (err) => {
|
||||||
|
logger.error({ err }, 'Uncaught exception — shutting down');
|
||||||
|
shutdown().finally(() => process.exit(1));
|
||||||
|
});
|
||||||
|
|
||||||
// Check if already running
|
// Check if already running
|
||||||
const existingPid = readPidFile(config.pidFile);
|
const existingPid = readPidFile(config.pidFile);
|
||||||
if (existingPid && isProcessRunning(existingPid)) {
|
if (existingPid && isProcessRunning(existingPid)) {
|
||||||
@@ -459,7 +482,7 @@ async function startDaemon() {
|
|||||||
|
|
||||||
// ---- Session Idle (from watcher) ----
|
// ---- Session Idle (from watcher) ----
|
||||||
watcher.on('session-idle', async (sessionKey, state) => {
|
watcher.on('session-idle', async (sessionKey, state) => {
|
||||||
const box = activeBoxes.get(sessionKey);
|
const box = activeBoxes.get(sessionKey); // snapshot at entry
|
||||||
if (!box) {
|
if (!box) {
|
||||||
// Sub-agent completed
|
// Sub-agent completed
|
||||||
updateParentWithChild(activeBoxes, watcher, sharedStatusBox, pluginClient, sessionKey, state, logger);
|
updateParentWithChild(activeBoxes, watcher, sharedStatusBox, pluginClient, sessionKey, state, logger);
|
||||||
@@ -492,12 +515,23 @@ async function startDaemon() {
|
|||||||
logger.error({ sessionKey, err }, 'Failed to update final status');
|
logger.error({ sessionKey, err }, 'Failed to update final status');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Guard: if the box was replaced during the awaits above (new session reactivated),
|
||||||
|
// skip cleanup to avoid killing the newly re-added session.
|
||||||
|
if (activeBoxes.get(sessionKey) !== box) {
|
||||||
|
logger.info({ sessionKey }, 'session-idle: box replaced during await — skipping cleanup (new session active)');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Save to completedBoxes so we can reuse the post ID if the session reactivates
|
// Save to completedBoxes so we can reuse the post ID if the session reactivates
|
||||||
completedBoxes.set(sessionKey, {
|
completedBoxes.set(sessionKey, {
|
||||||
postId: box.postId,
|
postId: box.postId,
|
||||||
lastOffset: state.lastOffset || 0,
|
lastOffset: state.lastOffset || 0,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Persist final offsets BEFORE removing session from watcher so the
|
||||||
|
// completed session's last offset is captured in the snapshot.
|
||||||
|
saveOffsets(config.offsetFile, watcher.sessions);
|
||||||
|
|
||||||
// Clean up active tracking
|
// Clean up active tracking
|
||||||
activeBoxes.delete(sessionKey);
|
activeBoxes.delete(sessionKey);
|
||||||
watcher.removeSession(sessionKey);
|
watcher.removeSession(sessionKey);
|
||||||
@@ -505,9 +539,6 @@ async function startDaemon() {
|
|||||||
// ensures we reuse the existing post instead of creating a new one.
|
// ensures we reuse the existing post instead of creating a new one.
|
||||||
monitor.forgetSession(sessionKey);
|
monitor.forgetSession(sessionKey);
|
||||||
globalMetrics.activeSessions = activeBoxes.size;
|
globalMetrics.activeSessions = activeBoxes.size;
|
||||||
|
|
||||||
// Persist final offsets
|
|
||||||
saveOffsets(config.offsetFile, watcher.sessions);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// ---- Start all subsystems ----
|
// ---- Start all subsystems ----
|
||||||
@@ -534,6 +565,9 @@ async function startDaemon() {
|
|||||||
|
|
||||||
logger.info({ signal }, 'Shutting down gracefully');
|
logger.info({ signal }, 'Shutting down gracefully');
|
||||||
|
|
||||||
|
// Persist offsets for all active sessions before stopping the watcher
|
||||||
|
saveOffsets(config.offsetFile, watcher.sessions);
|
||||||
|
|
||||||
// Stop accepting new sessions
|
// Stop accepting new sessions
|
||||||
monitor.stop();
|
monitor.stop();
|
||||||
watcher.stop();
|
watcher.stop();
|
||||||
|
|||||||
Reference in New Issue
Block a user