diff --git a/Makefile b/Makefile index 9fd0f41..f5176a1 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ export NODE_ENV := development +export NODE_PATH := /usr/local/lib/node_modules .PHONY: check install test lint fmt fmt-check secret-scan @@ -8,16 +9,16 @@ install: npm install test: - @echo "[SKIP] No tests found" + node --test test/unit/*.test.js lint: - npx eslint . + eslint . fmt: - npx prettier --write . + prettier --write . fmt-check: - npx prettier --check . + prettier --check . secret-scan: bash tools/secret-scan.sh . diff --git a/PLAN.md b/PLAN.md index a61910a..1d086a7 100644 --- a/PLAN.md +++ b/PLAN.md @@ -1,4 +1,5 @@ # Implementation Plan: Live Status v4 (Production-Grade) + > Generated: 2026-03-07 | Agent: planner:proj035-v2 | Status: DRAFT > Revised: Incorporates production-grade changes from scalability/efficiency review (comment #11402) @@ -65,16 +66,16 @@ OpenClaw Gateway ## 3. Tech Stack -| Layer | Technology | Version | Reason | -|-------|-----------|---------|--------| -| Runtime | Node.js | 22.x (system) | Already installed; inotify recursive fs.watch supported | -| File watching | fs.watch recursive | built-in | inotify on Linux/Node22; efficient, no polling | -| Session discovery | setInterval poll | built-in | sessions.json polling for new session detection | -| HTTP client | http.Agent | built-in | keepAlive, maxSockets; no extra dependency | -| Structured logging | pino | ^9.x | Fast JSON logging; single new dependency | -| Config | process.env | built-in | 12-factor; validated at startup | -| Health check | http.createServer | built-in | Lightweight health endpoint | -| Process management | PID file + signals | built-in | Simple, no supervisor dependency | +| Layer | Technology | Version | Reason | +| ------------------ | ------------------ | ------------- | ------------------------------------------------------- | +| Runtime | Node.js | 22.x (system) | Already installed; inotify recursive fs.watch supported | +| File watching | fs.watch recursive | built-in | inotify on Linux/Node22; efficient, no polling | +| Session discovery | setInterval poll | built-in | sessions.json polling for new session detection | +| HTTP client | http.Agent | built-in | keepAlive, maxSockets; no extra dependency | +| Structured logging | pino | ^9.x | Fast JSON logging; single new dependency | +| Config | process.env | built-in | 12-factor; validated at startup | +| Health check | http.createServer | built-in | Lightweight health endpoint | +| Process management | PID file + signals | built-in | Simple, no supervisor dependency | **New npm dependencies:** `pino` only. Everything else uses Node.js built-ins. @@ -124,17 +125,18 @@ MATTERMOST_OPENCLAW_LIVESTATUS/ ## 5. Dependencies -| Package | Version | Purpose | New/Existing | -|---------|---------|---------|-------------| -| pino | ^9.x | Structured JSON logging | NEW | -| node.js | 22.x | Runtime | Existing (system) | -| http, fs, path, child_process | built-in | All other functionality | Existing | +| Package | Version | Purpose | New/Existing | +| ----------------------------- | -------- | ----------------------- | ----------------- | +| pino | ^9.x | Structured JSON logging | NEW | +| node.js | 22.x | Runtime | Existing (system) | +| http, fs, path, child_process | built-in | All other functionality | Existing | One new npm dependency only. Minimal footprint. ## 6. Data Model ### sessions.json entry (relevant fields) + ```json { "agent:main:subagent:uuid": { @@ -150,6 +152,7 @@ One new npm dependency only. Minimal footprint. ``` ### JSONL event schema + ``` type=session -> id (UUID), version (3), cwd — first line only type=message -> role=user|assistant|toolResult; content[]=text|toolCall|toolResult|thinking @@ -158,6 +161,7 @@ type=model_change -> provider, modelId ``` ### SessionState (in-memory per active session) + ```json { "sessionKey": "agent:main:subagent:uuid", @@ -177,6 +181,7 @@ type=model_change -> provider, modelId ``` ### Configuration (env vars) + ``` MM_TOKEN (required) Mattermost bot token MM_URL (required) Mattermost base URL @@ -198,6 +203,7 @@ DEFAULT_CHANNEL null Fallback channel for non-MM sessions (null = sk ``` ### Status box format (rendered Mattermost text) + ``` [ACTIVE] main | 38s Reading live-status source code... @@ -216,7 +222,9 @@ Plan ready. Awaiting approval. ## 7. Task Checklist ### Phase 0: Repo Sync + Environment Verification ⏱️ 30min + > Parallelizable: no | Dependencies: none + - [ ] 0.1: Sync workspace live-status.js (283-line v2) to remote repo — git push → remote matches workspace copy - [ ] 0.2: Fix existing lint errors in live-status.js (43 issues: empty catch blocks, console statements) — replace empty catches with error logging, add eslint-disable comments for intentional console.log → make lint passes - [ ] 0.3: Run `make check` — verify all Makefile targets pass (lint/fmt/test/secret-scan) → clean run, zero failures @@ -225,6 +233,7 @@ Plan ready. Awaiting approval. - [ ] 0.6: Document exact transcript directory path and sessions.json path from the running gateway → constants confirmed for config.js (transcript dir: /home/node/.openclaw/agents/{agent}/sessions/, sessions.json: same path) ### Phase 1: Core Components ⏱️ 8-12h + > Parallelizable: partially (config/logger/circuit-breaker are independent) | Dependencies: Phase 0 - [ ] 1.1: Create `src/config.js` — reads all env vars with validation; throws clear error on missing required vars; exports typed config object → unit testable, fails fast @@ -240,7 +249,7 @@ Plan ready. Awaiting approval. - Circuit breaker wrapping all API calls - Retry with exponential backoff on 429/5xx (up to 3 retries) - Structured logs for every API call - → unit tested with mock HTTP server + → unit tested with mock HTTP server - [ ] 1.6: Create `src/status-formatter.js` — pure function; input: SessionState; output: formatted Mattermost text string (compact, MAX_STATUS_LINES, sub-agent nesting, status prefix, timestamps) → unit tested with varied inputs - [ ] 1.7: Create `src/health.js` — HTTP server on HEALTH_PORT; GET /health returns JSON {status, activeSessions, uptime, lastError, metrics: {updates_sent, updates_failed, circuit_state, queue_depth}} → manually tested with curl - [ ] 1.8: Create `src/status-watcher.js` — core JSONL watcher: @@ -255,10 +264,11 @@ Plan ready. Awaiting approval. - Detect file truncation (stat.size < bytesRead) -> reset offset, log warning - Debounce updates via status-box.js throttle - Idle detection: when pendingToolCalls==0 and no new lines for IDLE_TIMEOUT_S - → integration tested with real JSONL sample files + → integration tested with real JSONL sample files - [ ] 1.9: Unit test suite (`test/unit/`) — parser, tool-labels, circuit-breaker, throttle, status-formatter → `npm test` green ### Phase 2: Session Monitor + Lifecycle ⏱️ 4-6h + > Parallelizable: no | Dependencies: Phase 1 - [ ] 2.1: Create `src/session-monitor.js` — polls sessions.json every 2s: @@ -268,7 +278,7 @@ Plan ready. Awaiting approval. - Resolves channelId from session key (format: `agent:main:mattermost:channel:{id}:...`) - Resolves rootPostId from session key (format: `...thread:{id}`) - Falls back to DEFAULT_CHANNEL for non-MM sessions (or null to skip) - → integration tested with mock sessions.json writes + → integration tested with mock sessions.json writes - [ ] 2.2: Persist session offsets to disk — on each status update, write { sessionKey: bytesRead } to `/tmp/status-watcher-offsets.json`; on startup, load and restore existing sessions → restart recovery working - [ ] 2.3: Post recovery on restart — on startup, for each restored session, search channel history for status post with marker comment ``; if found, resume updating it; if not, create new post → tested by killing and restarting daemon mid-session - [ ] 2.4: Create `src/watcher-manager.js` — top-level orchestrator: @@ -280,10 +290,11 @@ Plan ready. Awaiting approval. - Registers SIGTERM/SIGINT handlers: - On signal: mark all active status boxes "interrupted", flush all pending updates, remove PID file, exit 0 - CLI: `node watcher-manager.js start|stop|status` → process management - → smoke tested end-to-end + → smoke tested end-to-end - [ ] 2.5: Integration test suite (`test/integration/`) — lifecycle events, restart recovery → `npm run test:integration` green ### Phase 3: Sub-Agent Support ⏱️ 3-4h + > Parallelizable: no | Dependencies: Phase 2 - [ ] 3.1: Sub-agent detection — session-monitor detects entries with `spawnedBy` field; links child SessionState to parent via `parentSessionKey` → linked correctly @@ -293,6 +304,7 @@ Plan ready. Awaiting approval. - [ ] 3.5: Integration test — spawn mock sub-agent transcript, verify parent status box shows nested child progress → manual verification in Mattermost ### Phase 4: Hook Integration ⏱️ 1h + > Parallelizable: no | Dependencies: Phase 2 (watcher-manager CLI working) - [ ] 4.1: Create `hooks/status-watcher-hook/HOOK.md` — events: ["gateway:startup"], description, required env vars listed → OpenClaw discovers hook @@ -301,6 +313,7 @@ Plan ready. Awaiting approval. - [ ] 4.4: Test: gateway restart -> watcher starts, PID file written, health endpoint responds → verified ### Phase 5: Polish + Deployment ⏱️ 3-4h + > Parallelizable: yes (docs, deploy scripts, skill rewrite are independent) | Dependencies: Phase 4 - [ ] 5.1: Rewrite `skill/SKILL.md` — 10-line file: "Live status updates are automatic. You do not need to call live-status manually. Focus on your task." → no protocol injection @@ -314,6 +327,7 @@ Plan ready. Awaiting approval. - [ ] 5.9: Run `make check` → zero lint/format errors; `npm test` → green ### Phase 6: Remove v1 Injection from AGENTS.md ⏱️ 30min + > Parallelizable: no | Dependencies: Phase 5 fully verified + watcher confirmed running > SAFETY: Do not execute this phase until watcher has been running successfully for at least 1 hour @@ -324,67 +338,67 @@ Plan ready. Awaiting approval. ## 8. Testing Strategy -| What | Type | How | Success Criteria | -|------|------|-----|-----------------| -| config.js | Unit | Env var injection, missing var detection | Throws on missing required vars; correct defaults | -| logger.js | Unit | Log output format | JSON output, levels respected | -| circuit-breaker.js | Unit | Simulate N failures, verify state transitions | open after threshold, half-open after cooldown | -| tool-labels.js | Unit | 30+ tool names (exact, prefix, regex, unmapped) | Correct labels returned; default for unknown | -| status-formatter.js | Unit | Various SessionState inputs | Correct compact output; MAX_LINES enforced | -| status-box.js | Unit | Mock HTTP server | create/update called correctly; throttle works; circuit fires | -| session-monitor.js | Integration | Write test sessions.json; verify events emitted | session-added/removed within 2s | -| status-watcher.js | Integration | Append to JSONL file; verify Mattermost update | Update within 1.5s of new line | -| Idle detection | Integration | Stop writing; verify complete after IDLE_TIMEOUT+5s | Status box marked done | -| Session compaction | Integration | Truncate JSONL file mid-session | No crash; offset reset; no duplicate events | -| Restart recovery | Integration | Kill daemon mid-session; restart | Existing post updated, not new post created | -| Sub-agent nesting | Integration | Mock parent + child transcripts | Child visible in parent status box | -| Cascade completion | Integration | Child completes; verify parent waits | Parent marks done after last child | -| Health endpoint | Manual | curl localhost:9090/health | JSON with correct metrics | -| E2E smoke test | Manual | Real agent task in Mattermost | Real-time updates; no spam; done on completion | +| What | Type | How | Success Criteria | +| ------------------- | ----------- | --------------------------------------------------- | ------------------------------------------------------------- | +| config.js | Unit | Env var injection, missing var detection | Throws on missing required vars; correct defaults | +| logger.js | Unit | Log output format | JSON output, levels respected | +| circuit-breaker.js | Unit | Simulate N failures, verify state transitions | open after threshold, half-open after cooldown | +| tool-labels.js | Unit | 30+ tool names (exact, prefix, regex, unmapped) | Correct labels returned; default for unknown | +| status-formatter.js | Unit | Various SessionState inputs | Correct compact output; MAX_LINES enforced | +| status-box.js | Unit | Mock HTTP server | create/update called correctly; throttle works; circuit fires | +| session-monitor.js | Integration | Write test sessions.json; verify events emitted | session-added/removed within 2s | +| status-watcher.js | Integration | Append to JSONL file; verify Mattermost update | Update within 1.5s of new line | +| Idle detection | Integration | Stop writing; verify complete after IDLE_TIMEOUT+5s | Status box marked done | +| Session compaction | Integration | Truncate JSONL file mid-session | No crash; offset reset; no duplicate events | +| Restart recovery | Integration | Kill daemon mid-session; restart | Existing post updated, not new post created | +| Sub-agent nesting | Integration | Mock parent + child transcripts | Child visible in parent status box | +| Cascade completion | Integration | Child completes; verify parent waits | Parent marks done after last child | +| Health endpoint | Manual | curl localhost:9090/health | JSON with correct metrics | +| E2E smoke test | Manual | Real agent task in Mattermost | Real-time updates; no spam; done on completion | ## 9. Risks & Mitigations -| Risk | Impact | Mitigation | -|------|--------|-----------| -| fs.watch recursive not reliable on this kernel | High | Detect at startup; fall back to polling if watch fails (setInterval 2s on directory listing) | -| sessions.json write race causes parse error | Medium | Try/catch on JSON.parse; retry next poll cycle; log warning | -| Mattermost rate limit (10 req/s default) | Medium | Throttle to max 2 req/s per session; circuit breaker; exponential backoff on 429 | -| Session compaction truncates JSONL | Medium | Detect stat.size < bytesRead on each read; reset offset; dedup by tracking last processed line index | -| Multiple gateway restarts create duplicate watchers | Medium | PID file check + SIGTERM old process before spawning new one | -| Non-MM sessions (hook, cron) generate noise | Low | Channel resolver returns null; watcher skips session gracefully | -| pino dependency unavailable | Low | If npm install fails, fallback to console.log (degrade gracefully, log warning) | -| Status box exceeds Mattermost post size limit | Low | Hard truncate at MAX_MESSAGE_CHARS (15000); tested with message size guard | -| JSONL format changes in future OpenClaw | Medium | Abstract parser behind EventParser interface; version check on session record | -| Daemon crashes mid-session | Medium | Health check via systemd/Docker; restart policy; offset persistence enables recovery | +| Risk | Impact | Mitigation | +| --------------------------------------------------- | ------ | ---------------------------------------------------------------------------------------------------- | +| fs.watch recursive not reliable on this kernel | High | Detect at startup; fall back to polling if watch fails (setInterval 2s on directory listing) | +| sessions.json write race causes parse error | Medium | Try/catch on JSON.parse; retry next poll cycle; log warning | +| Mattermost rate limit (10 req/s default) | Medium | Throttle to max 2 req/s per session; circuit breaker; exponential backoff on 429 | +| Session compaction truncates JSONL | Medium | Detect stat.size < bytesRead on each read; reset offset; dedup by tracking last processed line index | +| Multiple gateway restarts create duplicate watchers | Medium | PID file check + SIGTERM old process before spawning new one | +| Non-MM sessions (hook, cron) generate noise | Low | Channel resolver returns null; watcher skips session gracefully | +| pino dependency unavailable | Low | If npm install fails, fallback to console.log (degrade gracefully, log warning) | +| Status box exceeds Mattermost post size limit | Low | Hard truncate at MAX_MESSAGE_CHARS (15000); tested with message size guard | +| JSONL format changes in future OpenClaw | Medium | Abstract parser behind EventParser interface; version check on session record | +| Daemon crashes mid-session | Medium | Health check via systemd/Docker; restart policy; offset persistence enables recovery | ## 10. Effort Estimate -| Phase | Time | Can Parallelize? | Depends On | -|-------|------|-------------------|-----------| -| Phase 0: Repo + Env Verification | 15min | No | — | -| Phase 1: Core Components | 8-12h | Partially (config/logger/circuit-breaker) | Phase 0 | -| Phase 2: Session Monitor + Lifecycle | 4-6h | No | Phase 1 | -| Phase 3: Sub-Agent Support | 3-4h | No | Phase 2 | -| Phase 4: Hook Integration | 1h | No | Phase 2+3 | -| Phase 5: Polish + Deployment | 3-4h | Yes (docs, deploy, skill) | Phase 4 | -| Phase 6: Remove v1 AGENTS.md Injection | 30min | No | Phase 5 verified | -| **Total** | **20-28h** | | | +| Phase | Time | Can Parallelize? | Depends On | +| -------------------------------------- | ---------- | ----------------------------------------- | ---------------- | +| Phase 0: Repo + Env Verification | 15min | No | — | +| Phase 1: Core Components | 8-12h | Partially (config/logger/circuit-breaker) | Phase 0 | +| Phase 2: Session Monitor + Lifecycle | 4-6h | No | Phase 1 | +| Phase 3: Sub-Agent Support | 3-4h | No | Phase 2 | +| Phase 4: Hook Integration | 1h | No | Phase 2+3 | +| Phase 5: Polish + Deployment | 3-4h | Yes (docs, deploy, skill) | Phase 4 | +| Phase 6: Remove v1 AGENTS.md Injection | 30min | No | Phase 5 verified | +| **Total** | **20-28h** | | | ## 11. Open Questions All questions have defaults that allow execution to proceed without answers. - [ ] **Q1 (informational): Idle timeout tuning.** 60s default may still cause premature completion for very long exec calls (e.g., a 3-minute build). Smart heuristic (pendingToolCalls tracking) should handle this correctly, but production data may reveal edge cases. - **Default:** Use smart heuristic (pendingToolCalls + IDLE_TIMEOUT_S=60). Log false-positives for tuning. + **Default:** Use smart heuristic (pendingToolCalls + IDLE_TIMEOUT_S=60). Log false-positives for tuning. - [ ] **Q2 (informational): Non-MM session behavior.** Hook sessions, cron sessions, and xen sessions don't have a Mattermost channel. Currently skipped. - **Default:** Skip non-MM sessions (no status box). Log at debug level. Can revisit for Phase 7. + **Default:** Skip non-MM sessions (no status box). Log at debug level. Can revisit for Phase 7. - [ ] **Q3 (informational): Status box per-request vs per-session.** Currently: one status box per user message (reset on new user turn). This is the most natural UX. - **Default:** Per-request. New user message starts new status cycle. Works correctly with smart idle detection. + **Default:** Per-request. New user message starts new status cycle. Works correctly with smart idle detection. - [ ] **Q4 (informational): Compaction dedup strategy.** When JSONL is truncated, we reset offset and re-read. We may re-process events already posted to Mattermost. - **Default:** Track last processed line count (not just byte offset). Skip lines already processed on re-read. OR: detect compaction and do not re-append old events (since they were already shown). Simplest: mark box as "session compacted - continuing" and reset the visible lines in the status box. + **Default:** Track last processed line count (not just byte offset). Skip lines already processed on re-read. OR: detect compaction and do not re-append old events (since they were already shown). Simplest: mark box as "session compacted - continuing" and reset the visible lines in the status box. - [ ] **Q5 (blocking if no): AGENTS.md modification scope.** Phase 6 removes Live Status Protocol section from all agent AGENTS.md files. Confirm Rooh wants all instances removed (not just main agent). - **Default if not answered:** Remove from all agents. This is the stated goal — removing v1 injection everywhere. + **Default if not answered:** Remove from all agents. This is the stated goal — removing v1 injection everywhere. diff --git a/discoveries/README.md b/discoveries/README.md index fed75f1..ab667a1 100644 --- a/discoveries/README.md +++ b/discoveries/README.md @@ -9,6 +9,7 @@ Planner sub-agent (proj035-planner) conducted inline discovery before drafting t **Confirmed format (JSONL, version 3):** Each line is a JSON object with `type` field: + - `session` — First line only. Contains `id` (UUID), `version: 3`, `cwd` - `model_change` — `provider`, `modelId` change events - `thinking_level_change` — thinking on/off @@ -16,6 +17,7 @@ Each line is a JSON object with `type` field: - `message` — Main event type. `role` = `user`, `assistant`, or `toolResult` Message content types: + - `{type: "text", text: "..."}` — plain text from any role - `{type: "toolCall", id, name, arguments: {...}}` — tool invocations in assistant messages - `{type: "thinking", thinking: "..."}` — internal reasoning (thinking mode) @@ -25,6 +27,7 @@ Assistant messages carry extra fields: `api`, `provider`, `model`, `usage`, `sto ToolResult messages carry: `toolCallId`, `toolName`, `isError`, `content: [{type, text}]` **Key signals for watcher:** + - `stopReason: "stop"` + no new lines → agent turn complete → idle - `stopReason: "toolUse"` → agent waiting for tool results → NOT idle - `custom.customType: "openclaw.cache-ttl"` → turn boundary marker @@ -34,6 +37,7 @@ ToolResult messages carry: `toolCallId`, `toolName`, `isError`, `content: [{type Session keys in sessions.json follow the pattern: `agent:{agentId}:{context}` Examples: + - `agent:main:main` — direct session - `agent:main:mattermost:channel:{channelId}` — channel session - `agent:main:mattermost:channel:{channelId}:thread:{threadId}` — thread session @@ -42,6 +46,7 @@ Examples: - `agent:main:cron:{name}` — cron session Sub-agent entry fields relevant to watcher: + - `sessionId` — maps to `{sessionId}.jsonl` filename - `spawnedBy` — parent session key (for nesting) - `spawnDepth` — nesting depth (1 = direct child of main) @@ -49,6 +54,7 @@ Sub-agent entry fields relevant to watcher: - `channel` — delivery channel (mattermost, etc.) Sessions files: `/home/node/.openclaw/agents/{agentId}/sessions/` + - `sessions.json` — registry (updated on every message) - `{uuid}.jsonl` — transcript files - `{uuid}-topic-{topicId}.jsonl` — topic-scoped transcripts @@ -56,6 +62,7 @@ Sessions files: `/home/node/.openclaw/agents/{agentId}/sessions/` ## Discovery 3: OpenClaw Hook Events Available internal hook events (confirmed from source): + - `command:new`, `command:reset`, `command:stop` — user commands - `command` — all commands - `agent:bootstrap` — before workspace files injected diff --git a/package-lock.json b/package-lock.json index 93682c9..e20a819 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,15 +1,18 @@ { "name": "mattermost-openclaw-livestatus", - "version": "1.0.0", + "version": "4.0.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "mattermost-openclaw-livestatus", - "version": "1.0.0", + "version": "4.0.0", + "dependencies": { + "pino": "^9.14.0" + }, "devDependencies": { - "eslint": "^8.56.0", - "eslint-plugin-security": "^2.1.0", + "eslint": "^8.57.1", + "eslint-plugin-security": "^2.1.1", "prettier": "^3.2.0" } }, @@ -152,6 +155,12 @@ "node": ">= 8" } }, + "node_modules/@pinojs/redact": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/@pinojs/redact/-/redact-0.4.0.tgz", + "integrity": "sha512-k2ENnmBugE/rzQfEcdWHcCY+/FM3VLzH9cYEsbdsoqrvzAKRhUZeRNhAZvB8OitQJ1TBed3yqWtdjzS6wJKBwg==", + "license": "MIT" + }, "node_modules/@ungap/structured-clone": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/@ungap/structured-clone/-/structured-clone-1.3.0.tgz", @@ -232,6 +241,15 @@ "dev": true, "license": "Python-2.0" }, + "node_modules/atomic-sleep": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/atomic-sleep/-/atomic-sleep-1.0.0.tgz", + "integrity": "sha512-kNOjDqAh7px0XWNI+4QbzoiR/nTkHAWNud2uvnJquD1/x5a7EQZMJT0AczqK0Qn67oY/TTQ1LbUKajZpp3I9tQ==", + "license": "MIT", + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/balanced-match": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz", @@ -608,9 +626,9 @@ } }, "node_modules/flatted": { - "version": "3.3.3", - "resolved": "https://registry.npmjs.org/flatted/-/flatted-3.3.3.tgz", - "integrity": "sha512-GX+ysw4PBCz0PzosHDepZGANEuFCMLrnRTiEy9McGjmkCQYwRq4A/X786G/fjM/+OjsWSU1ZrY5qyARZmO/uwg==", + "version": "3.3.4", + "resolved": "https://registry.npmjs.org/flatted/-/flatted-3.3.4.tgz", + "integrity": "sha512-3+mMldrTAPdta5kjX2G2J7iX4zxtnwpdA8Tr2ZSjkyPSanvbZAcy6flmtnXbEybHrDcU9641lxrMfFuUxVz9vA==", "dev": true, "license": "ISC" }, @@ -893,6 +911,15 @@ "dev": true, "license": "MIT" }, + "node_modules/on-exit-leak-free": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/on-exit-leak-free/-/on-exit-leak-free-2.1.2.tgz", + "integrity": "sha512-0eJJY6hXLGf1udHwfNftBqH+g73EU4B504nZeKpz1sYRKafAghwxEJunB2O7rDZkL4PGfsMVnTXZ2EjibbqcsA==", + "license": "MIT", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -996,6 +1023,43 @@ "node": ">=8" } }, + "node_modules/pino": { + "version": "9.14.0", + "resolved": "https://registry.npmjs.org/pino/-/pino-9.14.0.tgz", + "integrity": "sha512-8OEwKp5juEvb/MjpIc4hjqfgCNysrS94RIOMXYvpYCdm/jglrKEiAYmiumbmGhCvs+IcInsphYDFwqrjr7398w==", + "license": "MIT", + "dependencies": { + "@pinojs/redact": "^0.4.0", + "atomic-sleep": "^1.0.0", + "on-exit-leak-free": "^2.1.0", + "pino-abstract-transport": "^2.0.0", + "pino-std-serializers": "^7.0.0", + "process-warning": "^5.0.0", + "quick-format-unescaped": "^4.0.3", + "real-require": "^0.2.0", + "safe-stable-stringify": "^2.3.1", + "sonic-boom": "^4.0.1", + "thread-stream": "^3.0.0" + }, + "bin": { + "pino": "bin.js" + } + }, + "node_modules/pino-abstract-transport": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/pino-abstract-transport/-/pino-abstract-transport-2.0.0.tgz", + "integrity": "sha512-F63x5tizV6WCh4R6RHyi2Ml+M70DNRXt/+HANowMflpgGFMAym/VKm6G7ZOQRjqN7XbGxK1Lg9t6ZrtzOaivMw==", + "license": "MIT", + "dependencies": { + "split2": "^4.0.0" + } + }, + "node_modules/pino-std-serializers": { + "version": "7.1.0", + "resolved": "https://registry.npmjs.org/pino-std-serializers/-/pino-std-serializers-7.1.0.tgz", + "integrity": "sha512-BndPH67/JxGExRgiX1dX0w1FvZck5Wa4aal9198SrRhZjH3GxKQUKIBnYJTdj2HDN3UQAS06HlfcSbQj2OHmaw==", + "license": "MIT" + }, "node_modules/prelude-ls": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/prelude-ls/-/prelude-ls-1.2.1.tgz", @@ -1007,9 +1071,9 @@ } }, "node_modules/prettier": { - "version": "3.8.1", - "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.8.1.tgz", - "integrity": "sha512-UOnG6LftzbdaHZcKoPFtOcCKztrQ57WkHDeRD9t/PTQtmT0NHSeWWepj6pS0z/N7+08BHFDQVUrfmfMRcZwbMg==", + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.2.0.tgz", + "integrity": "sha512-/vBUecTGaPlRVwyZVROVC58bYIScqaoEJzZmzQXXrZOzqn0TwWz0EnOozOlFO/YAImRnb7XsKpTCd3m1SjS2Ww==", "dev": true, "license": "MIT", "bin": { @@ -1022,6 +1086,22 @@ "url": "https://github.com/prettier/prettier?sponsor=1" } }, + "node_modules/process-warning": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/process-warning/-/process-warning-5.0.0.tgz", + "integrity": "sha512-a39t9ApHNx2L4+HBnQKqxxHNs1r7KF+Intd8Q/g1bUh6q0WIp9voPXJ/x0j+ZL45KF1pJd9+q2jLIRMfvEshkA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], + "license": "MIT" + }, "node_modules/punycode": { "version": "2.3.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz", @@ -1053,6 +1133,21 @@ ], "license": "MIT" }, + "node_modules/quick-format-unescaped": { + "version": "4.0.4", + "resolved": "https://registry.npmjs.org/quick-format-unescaped/-/quick-format-unescaped-4.0.4.tgz", + "integrity": "sha512-tYC1Q1hgyRuHgloV/YXs2w15unPVh8qfu/qCTfhTYamaw7fyhumKa2yGpdSo87vY32rIclj+4fWYQXUMs9EHvg==", + "license": "MIT" + }, + "node_modules/real-require": { + "version": "0.2.0", + "resolved": "https://registry.npmjs.org/real-require/-/real-require-0.2.0.tgz", + "integrity": "sha512-57frrGM/OCTLqLOAh0mhVA9VBMHd+9U7Zb2THMGdBUoZVOtGbJzjxsYGDJ3A9AYYCP4hn6y1TVbaOfzWtm5GFg==", + "license": "MIT", + "engines": { + "node": ">= 12.13.0" + } + }, "node_modules/regexp-tree": { "version": "0.1.27", "resolved": "https://registry.npmjs.org/regexp-tree/-/regexp-tree-0.1.27.tgz", @@ -1135,6 +1230,15 @@ "regexp-tree": "~0.1.1" } }, + "node_modules/safe-stable-stringify": { + "version": "2.5.0", + "resolved": "https://registry.npmjs.org/safe-stable-stringify/-/safe-stable-stringify-2.5.0.tgz", + "integrity": "sha512-b3rppTKm9T+PsVCBEOUR46GWI7fdOs00VKZ1+9c1EWDaDMvjQc6tUwuFyIprgGgTcWoVHSKrU8H31ZHA2e0RHA==", + "license": "MIT", + "engines": { + "node": ">=10" + } + }, "node_modules/shebang-command": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/shebang-command/-/shebang-command-2.0.0.tgz", @@ -1158,6 +1262,24 @@ "node": ">=8" } }, + "node_modules/sonic-boom": { + "version": "4.2.1", + "resolved": "https://registry.npmjs.org/sonic-boom/-/sonic-boom-4.2.1.tgz", + "integrity": "sha512-w6AxtubXa2wTXAUsZMMWERrsIRAdrK0Sc+FUytWvYAhBJLyuI4llrMIC1DtlNSdI99EI86KZum2MMq3EAZlF9Q==", + "license": "MIT", + "dependencies": { + "atomic-sleep": "^1.0.0" + } + }, + "node_modules/split2": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/split2/-/split2-4.2.0.tgz", + "integrity": "sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==", + "license": "ISC", + "engines": { + "node": ">= 10.x" + } + }, "node_modules/strip-ansi": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-6.0.1.tgz", @@ -1204,6 +1326,15 @@ "dev": true, "license": "MIT" }, + "node_modules/thread-stream": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/thread-stream/-/thread-stream-3.1.0.tgz", + "integrity": "sha512-OqyPZ9u96VohAyMfJykzmivOrY2wfMSf3C5TtFJVgN+Hm6aj+voFhlK+kZEIv2FBh1X6Xp3DlnCOfEQ3B2J86A==", + "license": "MIT", + "dependencies": { + "real-require": "^0.2.0" + } + }, "node_modules/type-check": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz", diff --git a/package.json b/package.json index 315f676..79c72f3 100644 --- a/package.json +++ b/package.json @@ -1,11 +1,22 @@ { "name": "mattermost-openclaw-livestatus", - "version": "1.0.0", + "version": "4.0.0", "private": true, "description": "OpenClaw Live Status Tool for Mattermost", + "main": "src/watcher-manager.js", + "scripts": { + "start": "node src/watcher-manager.js start", + "stop": "node src/watcher-manager.js stop", + "status": "node src/watcher-manager.js status", + "test": "node --test test/unit/*.test.js", + "test:integration": "node --test test/integration/*.test.js" + }, + "dependencies": { + "pino": "^9.14.0" + }, "devDependencies": { - "eslint": "^8.56.0", - "eslint-plugin-security": "^2.1.0", + "eslint": "^8.57.1", + "eslint-plugin-security": "^2.1.1", "prettier": "^3.2.0" } } diff --git a/skill/SKILL.md b/skill/SKILL.md index 96071d7..ecbd76a 100644 --- a/skill/SKILL.md +++ b/skill/SKILL.md @@ -6,17 +6,21 @@ Creates a single "status box" post and updates it repeatedly — no chat spam. ## Usage ### Create a status box + ```bash live-status --channel create "🚀 **Task Started:** Initializing..." ``` + Returns the `POST_ID` (26-char string). **Capture it.** ### Create in a thread + ```bash live-status --channel --reply-to create "🚀 Starting..." ``` ### Update the status box + ```bash live-status update "🚀 **Task Running** \`\`\` @@ -26,6 +30,7 @@ live-status update "🚀 **Task Running** ``` ### Mark complete + ```bash live-status update "✅ **Task Complete** \`\`\` @@ -36,6 +41,7 @@ live-status update "✅ **Task Complete** ``` ### Delete a status box + ```bash live-status delete ``` @@ -43,32 +49,36 @@ live-status delete ## Multi-Agent Support When multiple agents share a channel, each creates its **own** status box: + ```bash # Agent A BOX_A=$(live-status --channel $CH --agent god-agent create "🤖 God Agent working...") # Agent B BOX_B=$(live-status --channel $CH --agent nutrition-agent create "🥗 Nutrition Agent working...") ``` + Each agent updates only its own box by ID. No conflicts. ## Options -| Flag | Purpose | -|---|---| -| `--channel ID` | Target channel (or set `MM_CHANNEL_ID`) | -| `--reply-to ID` | Post as thread reply (sets `root_id`) | -| `--agent NAME` | Use bot token mapped to this agent in openclaw.json | -| `--token TOKEN` | Explicit bot token (overrides everything) | -| `--host HOST` | Mattermost hostname | +| Flag | Purpose | +| --------------- | --------------------------------------------------- | +| `--channel ID` | Target channel (or set `MM_CHANNEL_ID`) | +| `--reply-to ID` | Post as thread reply (sets `root_id`) | +| `--agent NAME` | Use bot token mapped to this agent in openclaw.json | +| `--token TOKEN` | Explicit bot token (overrides everything) | +| `--host HOST` | Mattermost hostname | ## Auto-Detection The tool reads `openclaw.json` automatically for: + - **Host** — from `mattermost.baseUrl` - **Token** — from `mattermost.accounts` (mapped via `--agent` or defaults) - No env vars or manual config needed in most cases. ## Protocol + 1. **Always** capture the `POST_ID` from `create`. 2. **Always** append to previous log (maintain full history in the message). 3. **Use code blocks** for technical logs. diff --git a/src/circuit-breaker.js b/src/circuit-breaker.js new file mode 100644 index 0000000..5357d96 --- /dev/null +++ b/src/circuit-breaker.js @@ -0,0 +1,143 @@ +'use strict'; + +/** + * circuit-breaker.js — Circuit breaker for API resilience. + * + * States: + * CLOSED — Normal operation. Failures tracked. + * OPEN — Too many failures. Calls rejected immediately. + * HALF_OPEN — Cooldown expired. One probe call allowed. + * + * Transition rules: + * CLOSED -> OPEN: failures >= threshold + * OPEN -> HALF_OPEN: cooldown expired + * HALF_OPEN -> CLOSED: probe succeeds + * HALF_OPEN -> OPEN: probe fails + */ + +const STATE = { + CLOSED: 'closed', + OPEN: 'open', + HALF_OPEN: 'half_open', +}; + +class CircuitBreaker { + /** + * @param {object} opts + * @param {number} opts.threshold - Consecutive failures to open (default 5) + * @param {number} opts.cooldownMs - Milliseconds before half-open probe (default 30000) + * @param {Function} [opts.onStateChange] - Called with (newState, oldState) + * @param {object} [opts.logger] - Optional logger + */ + constructor(opts = {}) { + this.threshold = opts.threshold || 5; + this.cooldownMs = opts.cooldownMs || 30000; + this.onStateChange = opts.onStateChange || null; + this.logger = opts.logger || null; + + this.state = STATE.CLOSED; + this.failures = 0; + this.openedAt = null; + this.lastError = null; + } + + /** + * Execute a function through the circuit breaker. + * Throws CircuitOpenError if the circuit is open. + * @param {Function} fn - Async function to execute + * @returns {Promise<*>} + */ + async execute(fn) { + if (this.state === STATE.OPEN) { + const elapsed = Date.now() - this.openedAt; + if (elapsed >= this.cooldownMs) { + this._transition(STATE.HALF_OPEN); + } else { + throw new CircuitOpenError( + `Circuit open (${Math.ceil((this.cooldownMs - elapsed) / 1000)}s remaining)`, + this.lastError, + ); + } + } + + try { + const result = await fn(); + this._onSuccess(); + return result; + } catch (err) { + this._onFailure(err); + throw err; + } + } + + _onSuccess() { + if (this.state === STATE.HALF_OPEN) { + this._transition(STATE.CLOSED); + } + this.failures = 0; + this.lastError = null; + } + + _onFailure(err) { + this.lastError = err; + this.failures++; + + if (this.state === STATE.HALF_OPEN) { + // Probe failed — reopen + this.openedAt = Date.now(); + this._transition(STATE.OPEN); + } else if (this.state === STATE.CLOSED && this.failures >= this.threshold) { + this.openedAt = Date.now(); + this._transition(STATE.OPEN); + } + } + + _transition(newState) { + const oldState = this.state; + this.state = newState; + + if (newState === STATE.CLOSED) { + this.failures = 0; + this.openedAt = null; + } + + if (this.logger) { + this.logger.warn({ from: oldState, to: newState }, 'Circuit breaker state change'); + } + + if (this.onStateChange) { + this.onStateChange(newState, oldState); + } + } + + getState() { + return this.state; + } + + getMetrics() { + return { + state: this.state, + failures: this.failures, + threshold: this.threshold, + openedAt: this.openedAt, + lastError: this.lastError ? this.lastError.message : null, + }; + } + + reset() { + this.state = STATE.CLOSED; + this.failures = 0; + this.openedAt = null; + this.lastError = null; + } +} + +class CircuitOpenError extends Error { + constructor(message, cause) { + super(message); + this.name = 'CircuitOpenError'; + this.cause = cause; + } +} + +module.exports = { CircuitBreaker, CircuitOpenError, STATE }; diff --git a/src/config.js b/src/config.js new file mode 100644 index 0000000..cabdd1e --- /dev/null +++ b/src/config.js @@ -0,0 +1,112 @@ +'use strict'; + +/** + * config.js — Centralized env-var config with validation. + * All config is read from environment variables. + * Throws on missing required variables at startup. + */ + +function getEnv(name, defaultValue, required = false) { + const val = process.env[name]; + if (val === undefined || val === '') { + if (required) { + throw new Error(`Required environment variable ${name} is not set`); + } + return defaultValue; + } + return val; +} + +function getEnvInt(name, defaultValue, required = false) { + const val = getEnv(name, undefined, required); + if (val === undefined) return defaultValue; + const n = parseInt(val, 10); + if (isNaN(n)) throw new Error(`Environment variable ${name} must be an integer, got: ${val}`); + return n; +} + +function getEnvBool(name, defaultValue) { + const val = process.env[name]; + if (val === undefined || val === '') return defaultValue; + return val === '1' || val.toLowerCase() === 'true' || val.toLowerCase() === 'yes'; +} + +/** + * Build and validate the config object. + * Called once at startup; throws on invalid config. + */ +function buildConfig() { + const config = { + // Mattermost API + mm: { + token: getEnv('MM_BOT_TOKEN', null, true), + baseUrl: getEnv('MM_BASE_URL', 'https://slack.solio.tech'), + maxSockets: getEnvInt('MM_MAX_SOCKETS', 4), + }, + + // Transcript directory (OpenClaw agents) + transcriptDir: getEnv('TRANSCRIPT_DIR', '/home/node/.openclaw/agents'), + + // Timing + throttleMs: getEnvInt('THROTTLE_MS', 500), + idleTimeoutS: getEnvInt('IDLE_TIMEOUT_S', 60), + sessionPollMs: getEnvInt('SESSION_POLL_MS', 2000), + + // Limits + maxActiveSessions: getEnvInt('MAX_ACTIVE_SESSIONS', 20), + maxMessageChars: getEnvInt('MAX_MESSAGE_CHARS', 15000), + maxStatusLines: getEnvInt('MAX_STATUS_LINES', 20), + maxRetries: getEnvInt('MAX_RETRIES', 3), + + // Circuit breaker + circuitBreakerThreshold: getEnvInt('CIRCUIT_BREAKER_THRESHOLD', 5), + circuitBreakerCooldownS: getEnvInt('CIRCUIT_BREAKER_COOLDOWN_S', 30), + + // Health check + healthPort: getEnvInt('HEALTH_PORT', 9090), + + // Logging + logLevel: getEnv('LOG_LEVEL', 'info'), + + // PID file + pidFile: getEnv('PID_FILE', '/tmp/status-watcher.pid'), + + // Offset persistence + offsetFile: getEnv('OFFSET_FILE', '/tmp/status-watcher-offsets.json'), + + // Optional external tool labels override + toolLabelsFile: getEnv('TOOL_LABELS_FILE', null), + + // Fallback channel for non-MM sessions (null = skip) + defaultChannel: getEnv('DEFAULT_CHANNEL', null), + + // Feature flags + enableFsWatch: getEnvBool('ENABLE_FS_WATCH', true), + }; + + // Validate MM base URL + try { + new URL(config.mm.baseUrl); + } catch (_e) { + throw new Error(`MM_BASE_URL is not a valid URL: ${config.mm.baseUrl}`); + } + + return config; +} + +// Singleton — built once, exported +let _config = null; + +function getConfig() { + if (!_config) { + _config = buildConfig(); + } + return _config; +} + +// Allow resetting config in tests +function resetConfig() { + _config = null; +} + +module.exports = { getConfig, resetConfig, buildConfig }; diff --git a/src/health.js b/src/health.js new file mode 100644 index 0000000..8816d4e --- /dev/null +++ b/src/health.js @@ -0,0 +1,118 @@ +'use strict'; + +/** + * health.js — HTTP health endpoint + metrics. + * + * GET /health -> JSON { status, activeSessions, uptime, lastError, metrics } + * GET /metrics -> JSON { detailed metrics } + */ + +/* eslint-disable no-console */ + +const http = require('http'); + +class HealthServer { + /** + * @param {object} opts + * @param {number} opts.port - Port to listen on (0 = disabled) + * @param {Function} opts.getMetrics - Callback that returns metrics object + * @param {object} [opts.logger] - pino logger + */ + constructor(opts) { + this.port = opts.port; + this.getMetrics = opts.getMetrics; + this.logger = opts.logger || null; + this.server = null; + this.startTime = Date.now(); + } + + start() { + if (this.port === 0) { + if (this.logger) this.logger.info('Health server disabled (port=0)'); + return Promise.resolve(); + } + + return new Promise((resolve, reject) => { + this.server = http.createServer((req, res) => { + this._handleRequest(req, res); + }); + + this.server.on('error', (err) => { + if (this.logger) { + this.logger.error({ err }, 'Health server error'); + } else { + console.error('Health server error:', err.message); + } + reject(err); + }); + + this.server.listen(this.port, '127.0.0.1', () => { + if (this.logger) { + this.logger.info({ port: this.port }, 'Health server listening'); + } + resolve(); + }); + }); + } + + stop() { + return new Promise((resolve) => { + if (!this.server) { + resolve(); + return; + } + this.server.close(() => resolve()); + }); + } + + _handleRequest(req, res) { + const url = new URL(req.url, `http://localhost:${this.port}`); + + if (req.method !== 'GET') { + res.writeHead(405, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Method not allowed' })); + return; + } + + let body; + switch (url.pathname) { + case '/health': + body = this._buildHealthResponse(); + break; + case '/metrics': + body = this._buildMetricsResponse(); + break; + default: + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Not found' })); + return; + } + + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify(body, null, 2)); + } + + _buildHealthResponse() { + const metrics = this.getMetrics(); + const status = metrics.circuit && metrics.circuit.state === 'open' ? 'degraded' : 'healthy'; + + return { + status, + uptime: Math.floor((Date.now() - this.startTime) / 1000), + activeSessions: metrics.activeSessions || 0, + lastError: metrics.lastError || null, + metrics: { + updates_sent: metrics.updatesSent || 0, + updates_failed: metrics.updatesFailed || 0, + circuit_state: metrics.circuit ? metrics.circuit.state : 'unknown', + queue_depth: metrics.queueDepth || 0, + }, + }; + } + + _buildMetricsResponse() { + return this.getMetrics(); + } +} + +module.exports = { HealthServer }; diff --git a/src/live-status.js b/src/live-status.js index f721118..211d222 100755 --- a/src/live-status.js +++ b/src/live-status.js @@ -1,5 +1,7 @@ #!/usr/bin/env node +/* eslint-disable no-console */ + const https = require('https'); const http = require('http'); const fs = require('fs'); @@ -12,272 +14,315 @@ const options = {}; const otherArgs = []; for (let i = 0; i < args.length; i++) { - const arg = args[i]; - const next = args[i + 1]; - if (arg === '--channel' && next) { options.channel = next; i++; } - else if (arg === '--reply-to' && next) { options.replyTo = next; i++; } - else if (arg === '--agent' && next) { options.agent = next; i++; } - else if (arg === '--token' && next) { options.token = next; i++; } - else if (arg === '--host' && next) { options.host = next; i++; } - else if (arg === '--rich') { options.rich = true; } - else if (!command && ['create', 'update', 'complete', 'error', 'delete'].includes(arg)) { - command = arg; - } else { - otherArgs.push(arg); - } + const arg = args[i]; // eslint-disable-line security/detect-object-injection + const next = args[i + 1]; // eslint-disable-line security/detect-object-injection + if (arg === '--channel' && next) { + options.channel = next; + i++; + } else if (arg === '--reply-to' && next) { + options.replyTo = next; + i++; + } else if (arg === '--agent' && next) { + options.agent = next; + i++; + } else if (arg === '--token' && next) { + options.token = next; + i++; + } else if (arg === '--host' && next) { + options.host = next; + i++; + } else if (arg === '--rich') { + options.rich = true; + } else if (!command && ['create', 'update', 'complete', 'error', 'delete'].includes(arg)) { + command = arg; + } else { + otherArgs.push(arg); + } } // --- LOAD CONFIG --- function loadConfig() { - const searchPaths = [ - process.env.OPENCLAW_CONFIG_DIR && path.join(process.env.OPENCLAW_CONFIG_DIR, 'openclaw.json'), - process.env.XDG_CONFIG_HOME && path.join(process.env.XDG_CONFIG_HOME, 'openclaw.json'), - path.join(process.env.HOME || '/root', '.openclaw', 'openclaw.json'), - '/home/node/.openclaw/openclaw.json' - ].filter(Boolean); + const searchPaths = [ + process.env.OPENCLAW_CONFIG_DIR && path.join(process.env.OPENCLAW_CONFIG_DIR, 'openclaw.json'), + process.env.XDG_CONFIG_HOME && path.join(process.env.XDG_CONFIG_HOME, 'openclaw.json'), + path.join(process.env.HOME || '/root', '.openclaw', 'openclaw.json'), + '/home/node/.openclaw/openclaw.json', + ].filter(Boolean); - for (const p of searchPaths) { - try { return JSON.parse(fs.readFileSync(p, 'utf8')); } - catch (_) {} + for (const p of searchPaths) { + try { + // eslint-disable-next-line security/detect-non-literal-fs-filename + return JSON.parse(fs.readFileSync(p, 'utf8')); + } catch (_e) { + /* file not found or invalid JSON — try next path */ } - return null; + } + return null; } function resolveToken(config) { - if (options.token) return options.token; - if (process.env.MM_BOT_TOKEN) return process.env.MM_BOT_TOKEN; - if (!config) return null; + if (options.token) return options.token; + if (process.env.MM_BOT_TOKEN) return process.env.MM_BOT_TOKEN; + if (!config) return null; - const mm = config.mattermost || (config.channels && config.channels.mattermost) || {}; - const accounts = mm.accounts || {}; + const mm = config.mattermost || (config.channels && config.channels.mattermost) || {}; + const accounts = mm.accounts || {}; - if (options.agent) { - try { - const mapPath = path.join(__dirname, 'agent-accounts.json'); - const agentMap = JSON.parse(fs.readFileSync(mapPath, 'utf8')); - const accName = agentMap[options.agent]; - if (accName && accounts[accName] && accounts[accName].botToken) { - return accounts[accName].botToken; - } - } catch (_) {} + if (options.agent) { + try { + const mapPath = path.join(__dirname, 'agent-accounts.json'); + const agentMap = JSON.parse(fs.readFileSync(mapPath, 'utf8')); + // eslint-disable-next-line security/detect-object-injection + const accName = agentMap[options.agent]; + // eslint-disable-next-line security/detect-object-injection + if (accName && accounts[accName] && accounts[accName].botToken) { + // eslint-disable-next-line security/detect-object-injection + return accounts[accName].botToken; + } + } catch (_e) { + /* agent-accounts.json not found or agent not mapped */ } + } - if (accounts.default && accounts.default.botToken) return accounts.default.botToken; - for (const acc of Object.values(accounts)) { - if (acc.botToken) return acc.botToken; - } - return null; + if (accounts.default && accounts.default.botToken) return accounts.default.botToken; + for (const acc of Object.values(accounts)) { + if (acc.botToken) return acc.botToken; + } + return null; } function resolveHost(config) { - if (options.host) return options.host; - if (process.env.MM_HOST) return process.env.MM_HOST; - if (config) { - const mm = config.mattermost || (config.channels && config.channels.mattermost) || {}; - const baseUrl = mm.baseUrl || ''; - if (baseUrl) { try { return new URL(baseUrl).hostname; } catch (_) {} } + if (options.host) return options.host; + if (process.env.MM_HOST) return process.env.MM_HOST; + if (config) { + const mm = config.mattermost || (config.channels && config.channels.mattermost) || {}; + const baseUrl = mm.baseUrl || ''; + if (baseUrl) { + try { + return new URL(baseUrl).hostname; + } catch (_e) { + /* invalid URL */ + } } - return 'localhost'; + } + return 'localhost'; } function resolvePort(config) { - if (process.env.MM_PORT) return parseInt(process.env.MM_PORT, 10); - if (config) { - const mm = config.mattermost || (config.channels && config.channels.mattermost) || {}; - const baseUrl = mm.baseUrl || ''; - if (baseUrl) { - try { - const url = new URL(baseUrl); - return url.port ? parseInt(url.port, 10) : (url.protocol === 'https:' ? 443 : 80); - } catch (_) {} - } + if (process.env.MM_PORT) return parseInt(process.env.MM_PORT, 10); + if (config) { + const mm = config.mattermost || (config.channels && config.channels.mattermost) || {}; + const baseUrl = mm.baseUrl || ''; + if (baseUrl) { + try { + const url = new URL(baseUrl); + return url.port ? parseInt(url.port, 10) : url.protocol === 'https:' ? 443 : 80; + } catch (_e) { + /* invalid URL — use default port */ + } } - return 443; + } + return 443; } function resolveProtocol(config) { - if (config) { - const mm = config.mattermost || (config.channels && config.channels.mattermost) || {}; - if ((mm.baseUrl || '').startsWith('http://')) return 'http'; - } - return 'https'; + if (config) { + const mm = config.mattermost || (config.channels && config.channels.mattermost) || {}; + if ((mm.baseUrl || '').startsWith('http://')) return 'http'; + } + return 'https'; } // --- BUILD CONFIG --- const ocConfig = loadConfig(); const CONFIG = { - host: resolveHost(ocConfig), - port: resolvePort(ocConfig), - protocol: resolveProtocol(ocConfig), - token: resolveToken(ocConfig), - channel_id: options.channel || process.env.MM_CHANNEL_ID || process.env.CHANNEL_ID + host: resolveHost(ocConfig), + port: resolvePort(ocConfig), + protocol: resolveProtocol(ocConfig), + token: resolveToken(ocConfig), + channel_id: options.channel || process.env.MM_CHANNEL_ID || process.env.CHANNEL_ID, }; if (!CONFIG.token) { - console.error('Error: No bot token found.'); - console.error(' Set MM_BOT_TOKEN, use --token, or configure openclaw.json'); - process.exit(1); + console.error('Error: No bot token found.'); + console.error(' Set MM_BOT_TOKEN, use --token, or configure openclaw.json'); + process.exit(1); } // --- HTTP REQUEST --- function request(method, apiPath, data) { - const transport = CONFIG.protocol === 'https' ? https : http; - return new Promise((resolve, reject) => { - const req = transport.request({ - hostname: CONFIG.host, port: CONFIG.port, - path: '/api/v4' + apiPath, method, - headers: { 'Authorization': `Bearer ${CONFIG.token}`, 'Content-Type': 'application/json' } - }, (res) => { - let body = ''; - res.on('data', (chunk) => body += chunk); - res.on('end', () => { - if (res.statusCode >= 200 && res.statusCode < 300) { - try { resolve(JSON.parse(body)); } catch (_) { resolve(body); } - } else { - let msg = `HTTP ${res.statusCode}`; - try { msg = JSON.parse(body).message || msg; } catch (_) {} - reject(new Error(msg)); - } - }); + const transport = CONFIG.protocol === 'https' ? https : http; + return new Promise((resolve, reject) => { + const req = transport.request( + { + hostname: CONFIG.host, + port: CONFIG.port, + path: '/api/v4' + apiPath, + method, + headers: { Authorization: `Bearer ${CONFIG.token}`, 'Content-Type': 'application/json' }, + }, + (res) => { + let body = ''; + res.on('data', (chunk) => (body += chunk)); + res.on('end', () => { + if (res.statusCode >= 200 && res.statusCode < 300) { + try { + resolve(JSON.parse(body)); + } catch (_e) { + resolve(body); + } + } else { + let msg = `HTTP ${res.statusCode}`; + try { + msg = JSON.parse(body).message || msg; + } catch (_e) { + /* use default msg */ + } + reject(new Error(msg)); + } }); - req.on('error', (e) => reject(e)); - if (data) req.write(JSON.stringify(data)); - req.end(); - }); + }, + ); + req.on('error', (e) => reject(e)); + if (data) req.write(JSON.stringify(data)); + req.end(); + }); } // --- RICH ATTACHMENT HELPERS --- const RICH_STYLES = { - create: { color: '#FFA500', prefix: '⏳', status: '🔄 Running' }, - update: { color: '#FFA500', prefix: '🔄', status: '🔄 Running' }, - complete: { color: '#36A64F', prefix: '✅', status: '✅ Complete' }, - error: { color: '#DC3545', prefix: '❌', status: '❌ Error' } + create: { color: '#FFA500', prefix: '⏳', status: '🔄 Running' }, + update: { color: '#FFA500', prefix: '🔄', status: '🔄 Running' }, + complete: { color: '#36A64F', prefix: '✅', status: '✅ Complete' }, + error: { color: '#DC3545', prefix: '❌', status: '❌ Error' }, }; function buildAttachment(cmd, text) { - const style = RICH_STYLES[cmd] || RICH_STYLES.update; - const agentName = options.agent || 'unknown'; + const style = RICH_STYLES[cmd] || RICH_STYLES.update; // eslint-disable-line security/detect-object-injection + const agentName = options.agent || 'unknown'; - // Split text: first line = title, rest = log body - const lines = text.split('\n'); - const title = `${style.prefix} ${lines[0]}`; - const body = lines.length > 1 ? lines.slice(1).join('\n') : ''; + // Split text: first line = title, rest = log body + const lines = text.split('\n'); + const title = `${style.prefix} ${lines[0]}`; + const body = lines.length > 1 ? lines.slice(1).join('\n') : ''; - return { - color: style.color, - title: title, - text: body || undefined, - fields: [ - { short: true, title: 'Agent', value: agentName }, - { short: true, title: 'Status', value: style.status } - ] - }; + return { + color: style.color, + title: title, + text: body || undefined, + fields: [ + { short: true, title: 'Agent', value: agentName }, + { short: true, title: 'Status', value: style.status }, + ], + }; } // --- COMMANDS --- async function createPost(text, cmd) { - if (!CONFIG.channel_id) { - console.error('Error: Channel ID required. Use --channel or set MM_CHANNEL_ID.'); - process.exit(1); - } - if (!text || !text.trim()) { - console.error('Error: Message text is required for create.'); - process.exit(1); - } - try { - let payload; - if (options.rich) { - payload = { - channel_id: CONFIG.channel_id, - message: '', - props: { attachments: [buildAttachment(cmd || 'create', text)] } - }; - } else { - payload = { channel_id: CONFIG.channel_id, message: text }; - } - if (options.replyTo) payload.root_id = options.replyTo; - const result = await request('POST', '/posts', payload); - console.log(result.id); - } catch (e) { - console.error('Error (create):', e.message); - process.exit(1); + if (!CONFIG.channel_id) { + console.error('Error: Channel ID required. Use --channel or set MM_CHANNEL_ID.'); + process.exit(1); + } + if (!text || !text.trim()) { + console.error('Error: Message text is required for create.'); + process.exit(1); + } + try { + let payload; + if (options.rich) { + payload = { + channel_id: CONFIG.channel_id, + message: '', + props: { attachments: [buildAttachment(cmd || 'create', text)] }, + }; + } else { + payload = { channel_id: CONFIG.channel_id, message: text }; } + if (options.replyTo) payload.root_id = options.replyTo; + const result = await request('POST', '/posts', payload); + console.log(result.id); + } catch (e) { + console.error('Error (create):', e.message); + process.exit(1); + } } async function updatePost(postId, text, cmd) { - if (!postId) { - console.error('Error: Post ID is required for update.'); - process.exit(1); - } - if (!text || !text.trim()) { - console.error('Error: Message text is required for update.'); - process.exit(1); - } - try { - if (options.rich) { - await request('PUT', `/posts/${postId}`, { - id: postId, - message: '', - props: { attachments: [buildAttachment(cmd || 'update', text)] } - }); - } else { - const current = await request('GET', `/posts/${postId}`); - await request('PUT', `/posts/${postId}`, { - id: postId, message: text, props: current.props - }); - } - console.log('updated'); - } catch (e) { - console.error('Error (update):', e.message); - process.exit(1); + if (!postId) { + console.error('Error: Post ID is required for update.'); + process.exit(1); + } + if (!text || !text.trim()) { + console.error('Error: Message text is required for update.'); + process.exit(1); + } + try { + if (options.rich) { + await request('PUT', `/posts/${postId}`, { + id: postId, + message: '', + props: { attachments: [buildAttachment(cmd || 'update', text)] }, + }); + } else { + const current = await request('GET', `/posts/${postId}`); + await request('PUT', `/posts/${postId}`, { + id: postId, + message: text, + props: current.props, + }); } + console.log('updated'); + } catch (e) { + console.error('Error (update):', e.message); + process.exit(1); + } } async function deletePost(postId) { - if (!postId) { - console.error('Error: Post ID is required for delete.'); - process.exit(1); - } - try { - await request('DELETE', `/posts/${postId}`); - console.log('deleted'); - } catch (e) { - console.error('Error (delete):', e.message); - process.exit(1); - } + if (!postId) { + console.error('Error: Post ID is required for delete.'); + process.exit(1); + } + try { + await request('DELETE', `/posts/${postId}`); + console.log('deleted'); + } catch (e) { + console.error('Error (delete):', e.message); + process.exit(1); + } } // --- CLI ROUTER --- if (command === 'create') { - createPost(otherArgs.join(' '), 'create'); + createPost(otherArgs.join(' '), 'create'); } else if (command === 'update') { - updatePost(otherArgs[0], otherArgs.slice(1).join(' '), 'update'); + updatePost(otherArgs[0], otherArgs.slice(1).join(' '), 'update'); } else if (command === 'complete') { - updatePost(otherArgs[0], otherArgs.slice(1).join(' '), 'complete'); + updatePost(otherArgs[0], otherArgs.slice(1).join(' '), 'complete'); } else if (command === 'error') { - updatePost(otherArgs[0], otherArgs.slice(1).join(' '), 'error'); + updatePost(otherArgs[0], otherArgs.slice(1).join(' '), 'error'); } else if (command === 'delete') { - deletePost(otherArgs[0]); + deletePost(otherArgs[0]); } else { - console.log('Usage:'); - console.log(' live-status [options] create '); - console.log(' live-status [options] update '); - console.log(' live-status [options] complete '); - console.log(' live-status [options] error '); - console.log(' live-status [options] delete '); - console.log(''); - console.log('Options:'); - console.log(' --rich Use rich message attachments (colored cards)'); - console.log(' --channel ID Target channel'); - console.log(' --reply-to ID Post as thread reply'); - console.log(' --agent NAME Use bot token mapped to this agent'); - console.log(' --token TOKEN Explicit bot token (overrides all)'); - console.log(' --host HOST Mattermost hostname'); - console.log(''); - console.log('Rich mode colors:'); - console.log(' create/update → Orange (running)'); - console.log(' complete → Green (done)'); - console.log(' error → Red (failed)'); - process.exit(1); + console.log('Usage:'); + console.log(' live-status [options] create '); + console.log(' live-status [options] update '); + console.log(' live-status [options] complete '); + console.log(' live-status [options] error '); + console.log(' live-status [options] delete '); + console.log(''); + console.log('Options:'); + console.log(' --rich Use rich message attachments (colored cards)'); + console.log(' --channel ID Target channel'); + console.log(' --reply-to ID Post as thread reply'); + console.log(' --agent NAME Use bot token mapped to this agent'); + console.log(' --token TOKEN Explicit bot token (overrides all)'); + console.log(' --host HOST Mattermost hostname'); + console.log(''); + console.log('Rich mode colors:'); + console.log(' create/update → Orange (running)'); + console.log(' complete → Green (done)'); + console.log(' error → Red (failed)'); + process.exit(1); } diff --git a/src/logger.js b/src/logger.js new file mode 100644 index 0000000..bbaa3f4 --- /dev/null +++ b/src/logger.js @@ -0,0 +1,43 @@ +'use strict'; + +/** + * logger.js — pino wrapper with default config. + * Singleton logger; supports session-scoped child loggers. + */ + +const pino = require('pino'); + +let _logger = null; + +function getLogger() { + if (!_logger) { + // Get log level from env directly (avoid circular dep with config.js) + const rawLevel = process.env.LOG_LEVEL; + const validLevels = ['trace', 'debug', 'info', 'warn', 'error', 'fatal', 'silent']; + const level = rawLevel && validLevels.includes(rawLevel) ? rawLevel : 'info'; + _logger = pino({ + level, + base: { pid: process.pid }, + timestamp: pino.stdTimeFunctions.isoTime, + }); + } + return _logger; +} + +/** + * Create a child logger scoped to a session. + * @param {string} sessionKey + * @returns {pino.Logger} + */ +function sessionLogger(sessionKey) { + return getLogger().child({ sessionKey }); +} + +/** + * Reset the logger singleton (for tests). + */ +function resetLogger() { + _logger = null; +} + +module.exports = { getLogger, sessionLogger, resetLogger }; diff --git a/src/status-box.js b/src/status-box.js new file mode 100644 index 0000000..26eb1e4 --- /dev/null +++ b/src/status-box.js @@ -0,0 +1,328 @@ +'use strict'; + +/** + * status-box.js — Mattermost post manager. + * + * Features: + * - Shared http.Agent (keepAlive, maxSockets) + * - createPost / updatePost with circuit breaker + * - Throttle: leading-edge fires immediately, trailing flush after THROTTLE_MS + * - Message size guard (truncate to MAX_MESSAGE_CHARS) + * - Retry with exponential backoff on 429/5xx (up to MAX_RETRIES) + * - Structured logs for every API call + */ + +const https = require('https'); +const http = require('http'); +const { CircuitBreaker } = require('./circuit-breaker'); + +const DEFAULT_THROTTLE_MS = 500; +const DEFAULT_MAX_CHARS = 15000; +const DEFAULT_MAX_RETRIES = 3; +const DEFAULT_MAX_SOCKETS = 4; + +class StatusBox { + /** + * @param {object} opts + * @param {string} opts.baseUrl - Mattermost base URL + * @param {string} opts.token - Bot token + * @param {object} [opts.logger] - pino logger + * @param {number} [opts.throttleMs] + * @param {number} [opts.maxMessageChars] + * @param {number} [opts.maxRetries] + * @param {number} [opts.maxSockets] + * @param {CircuitBreaker} [opts.circuitBreaker] + */ + constructor(opts) { + this.baseUrl = opts.baseUrl; + this.token = opts.token; + this.logger = opts.logger || null; + this.throttleMs = opts.throttleMs || DEFAULT_THROTTLE_MS; + this.maxMessageChars = opts.maxMessageChars || DEFAULT_MAX_CHARS; + this.maxRetries = opts.maxRetries || DEFAULT_MAX_RETRIES; + + const parsedUrl = new URL(this.baseUrl); + this.hostname = parsedUrl.hostname; + this.port = parsedUrl.port + ? parseInt(parsedUrl.port, 10) + : parsedUrl.protocol === 'https:' + ? 443 + : 80; + this.isHttps = parsedUrl.protocol === 'https:'; + + const maxSockets = opts.maxSockets || DEFAULT_MAX_SOCKETS; + this.agent = new (this.isHttps ? https : http).Agent({ + keepAlive: true, + maxSockets, + }); + + this.circuitBreaker = + opts.circuitBreaker || + new CircuitBreaker({ + threshold: 5, + cooldownMs: 30000, + logger: this.logger, + }); + + // Metrics + this.metrics = { + updatesSent: 0, + updatesFailed: 0, + queueDepth: 0, + }; + + // Throttle state per postId + // Map + this._throttleState = new Map(); + } + + /** + * Create a new Mattermost post. + * @param {string} channelId + * @param {string} text + * @param {string} [rootId] - Thread root post ID + * @returns {Promise} Post ID + */ + async createPost(channelId, text, rootId) { + const body = { channel_id: channelId, message: this._truncate(text) }; + if (rootId) body.root_id = rootId; + + const post = await this._apiCall('POST', '/api/v4/posts', body); + if (this.logger) this.logger.debug({ postId: post.id, channelId }, 'Created status post'); + this.metrics.updatesSent++; + return post.id; + } + + /** + * Update a Mattermost post (throttled). + * Leading edge fires immediately; subsequent calls within throttleMs are batched. + * Guaranteed trailing flush when activity stops. + * + * @param {string} postId + * @param {string} text + * @returns {Promise} + */ + updatePost(postId, text) { + return new Promise((resolve, reject) => { + let state = this._throttleState.get(postId); + if (!state) { + state = { pending: null, timer: null, lastFiredAt: 0, resolvers: [] }; + this._throttleState.set(postId, state); + } + + state.resolvers.push({ resolve, reject }); + state.pending = text; + this.metrics.queueDepth = this._throttleState.size; + + const now = Date.now(); + const elapsed = now - state.lastFiredAt; + + if (elapsed >= this.throttleMs && !state.timer) { + // Leading edge: fire immediately + this._flushUpdate(postId); + } else { + // Trailing flush: schedule if not already scheduled + if (!state.timer) { + state.timer = setTimeout(() => { + this._flushUpdate(postId); + }, this.throttleMs - elapsed); + } + // If timer already scheduled, pending text was updated above — it will flush latest text + } + }); + } + + /** + * Flush the pending update for a postId. + * @private + */ + async _flushUpdate(postId) { + const state = this._throttleState.get(postId); + if (!state || state.pending === null) return; + + const text = state.pending; + const resolvers = [...state.resolvers]; + state.pending = null; + state.resolvers = []; + state.lastFiredAt = Date.now(); + if (state.timer) { + clearTimeout(state.timer); + state.timer = null; + } + + this.metrics.queueDepth = Math.max(0, this.metrics.queueDepth - 1); + + try { + await this._apiCallWithRetry('PUT', `/api/v4/posts/${postId}`, { + id: postId, + message: this._truncate(text), + }); + this.metrics.updatesSent++; + resolvers.forEach(({ resolve }) => resolve()); + } catch (err) { + this.metrics.updatesFailed++; + resolvers.forEach(({ reject }) => reject(err)); + } + } + + /** + * Force-flush any pending update for a postId (used on shutdown). + * @param {string} postId + * @returns {Promise} + */ + async forceFlush(postId) { + const state = this._throttleState.get(postId); + if (!state) return; + if (state.timer) { + clearTimeout(state.timer); + state.timer = null; + } + if (state.pending !== null) { + await this._flushUpdate(postId); + } + } + + /** + * Force-flush all pending updates. + * @returns {Promise} + */ + async flushAll() { + const postIds = [...this._throttleState.keys()]; + await Promise.allSettled(postIds.map((id) => this.forceFlush(id))); + } + + /** + * Delete a post. + * @param {string} postId + * @returns {Promise} + */ + async deletePost(postId) { + await this._apiCall('DELETE', `/api/v4/posts/${postId}`, null); + } + + /** + * Truncate text to maxMessageChars. + * @private + */ + _truncate(text) { + if (text.length <= this.maxMessageChars) return text; + const suffix = '\n...(truncated)'; + return text.slice(0, this.maxMessageChars - suffix.length) + suffix; + } + + /** + * Make an API call through the circuit breaker with retries. + * @private + */ + async _apiCallWithRetry(method, path, body) { + return this.circuitBreaker.execute(() => this._retryApiCall(method, path, body)); + } + + /** + * Make an API call directly (no circuit breaker, for createPost). + * @private + */ + async _apiCall(method, apiPath, body) { + return this.circuitBreaker.execute(() => this._retryApiCall(method, apiPath, body)); + } + + /** + * Retry logic for API calls. + * @private + */ + async _retryApiCall(method, apiPath, body, attempt = 0) { + try { + return await this._httpRequest(method, apiPath, body); + } catch (err) { + const isRetryable = err.statusCode === 429 || (err.statusCode >= 500 && err.statusCode < 600); + if (isRetryable && attempt < this.maxRetries) { + const delayMs = Math.min(1000 * Math.pow(2, attempt), 10000); + if (this.logger) { + this.logger.warn( + { attempt, delayMs, statusCode: err.statusCode }, + 'API call failed, retrying', + ); + } + await sleep(delayMs); + return this._retryApiCall(method, apiPath, body, attempt + 1); + } + throw err; + } + } + + /** + * Low-level HTTP request. + * @private + */ + _httpRequest(method, apiPath, body) { + const transport = this.isHttps ? https : http; + const bodyStr = body ? JSON.stringify(body) : null; + + return new Promise((resolve, reject) => { + const reqOpts = { + hostname: this.hostname, + port: this.port, + path: apiPath, + method, + agent: this.agent, + headers: { + Authorization: `Bearer ${this.token}`, + 'Content-Type': 'application/json', + }, + }; + if (bodyStr) { + reqOpts.headers['Content-Length'] = Buffer.byteLength(bodyStr); + } + + const req = transport.request(reqOpts, (res) => { + let data = ''; + res.on('data', (chunk) => (data += chunk)); + res.on('end', () => { + if (res.statusCode >= 200 && res.statusCode < 300) { + try { + resolve(data ? JSON.parse(data) : {}); + } catch (_e) { + resolve({}); + } + } else { + let msg = `HTTP ${res.statusCode}`; + try { + msg = JSON.parse(data).message || msg; + } catch (_e) { + /* use default */ + } + const err = new Error(msg); + err.statusCode = res.statusCode; + reject(err); + } + }); + }); + + req.on('error', reject); + if (bodyStr) req.write(bodyStr); + req.end(); + }); + } + + getMetrics() { + return { + ...this.metrics, + circuit: this.circuitBreaker.getMetrics(), + }; + } + + destroy() { + this.agent.destroy(); + // Clear all throttle timers + for (const [, state] of this._throttleState) { + if (state.timer) clearTimeout(state.timer); + } + this._throttleState.clear(); + } +} + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +module.exports = { StatusBox }; diff --git a/src/status-formatter.js b/src/status-formatter.js new file mode 100644 index 0000000..fc0d038 --- /dev/null +++ b/src/status-formatter.js @@ -0,0 +1,142 @@ +'use strict'; + +/** + * status-formatter.js — Pure function: SessionState -> formatted Mattermost text. + * + * Output format: + * [ACTIVE] main | 38s + * Reading live-status source code... + * exec: ls /agents/sessions [OK] + * Analyzing agent configurations... + * Sub-agent: proj035-planner + * Reading protocol... + * [DONE] 28s + * [DONE] 53s | 12.4k tokens + */ + +const MAX_STATUS_LINES = parseInt(process.env.MAX_STATUS_LINES, 10) || 20; +const MAX_LINE_CHARS = 120; + +/** + * Format a SessionState into a Mattermost text string. + * + * @param {object} sessionState + * @param {string} sessionState.sessionKey + * @param {string} sessionState.status - 'active' | 'done' | 'error' | 'interrupted' + * @param {number} sessionState.startTime - ms since epoch + * @param {Array} sessionState.lines - Status lines (most recent activity) + * @param {Array} [sessionState.children] - Child session states + * @param {number} [sessionState.tokenCount] - Token count if available + * @param {string} [sessionState.agentId] - Agent ID (e.g. "main") + * @param {number} [sessionState.depth] - Nesting depth (0 = top-level) + * @returns {string} + */ +function format(sessionState, opts = {}) { + const maxLines = opts.maxLines || MAX_STATUS_LINES; + const depth = sessionState.depth || 0; + const indent = ' '.repeat(depth); + + const lines = []; + + // Header line + const elapsed = formatElapsed(Date.now() - sessionState.startTime); + const agentId = sessionState.agentId || extractAgentId(sessionState.sessionKey); + const statusPrefix = statusIcon(sessionState.status); + lines.push(`${indent}${statusPrefix} ${agentId} | ${elapsed}`); + + // Status lines (trimmed to maxLines, most recent) + const statusLines = (sessionState.lines || []).slice(-maxLines); + for (const line of statusLines) { + lines.push(`${indent} ${truncateLine(line)}`); + } + + // Child sessions (sub-agents) + if (sessionState.children && sessionState.children.length > 0) { + for (const child of sessionState.children) { + const childLines = format(child, { maxLines: Math.floor(maxLines / 2), ...opts }).split('\n'); + lines.push(...childLines); + } + } + + // Footer line (only for done/error/interrupted) + if (sessionState.status !== 'active') { + const tokenStr = sessionState.tokenCount + ? ` | ${formatTokens(sessionState.tokenCount)} tokens` + : ''; + lines.push(`${indent} [${sessionState.status.toUpperCase()}] ${elapsed}${tokenStr}`); + } + + return lines.join('\n'); +} + +/** + * Format elapsed milliseconds as human-readable string. + * @param {number} ms + * @returns {string} + */ +function formatElapsed(ms) { + if (ms < 0) ms = 0; + const s = Math.floor(ms / 1000); + const m = Math.floor(s / 60); + const h = Math.floor(m / 60); + if (h > 0) return `${h}h${m % 60}m`; + if (m > 0) return `${m}m${s % 60}s`; + return `${s}s`; +} + +/** + * Format token count as compact string (e.g. 12400 -> "12.4k"). + * @param {number} count + * @returns {string} + */ +function formatTokens(count) { + if (count >= 1000000) return `${(count / 1000000).toFixed(1)}M`; + if (count >= 1000) return `${(count / 1000).toFixed(1)}k`; + return String(count); +} + +/** + * Get status icon prefix. + * @param {string} status + * @returns {string} + */ +function statusIcon(status) { + switch (status) { + case 'active': + return '[ACTIVE]'; + case 'done': + return '[DONE]'; + case 'error': + return '[ERROR]'; + case 'interrupted': + return '[INTERRUPTED]'; + default: + return '[UNKNOWN]'; + } +} + +/** + * Truncate a line to MAX_LINE_CHARS. + * @param {string} line + * @returns {string} + */ +function truncateLine(line) { + if (line.length <= MAX_LINE_CHARS) return line; + return line.slice(0, MAX_LINE_CHARS - 3) + '...'; +} + +/** + * Extract agent ID from session key. + * Session key format: "agent:main:mattermost:channel:abc123:thread:xyz" + * @param {string} sessionKey + * @returns {string} + */ +function extractAgentId(sessionKey) { + if (!sessionKey) return 'unknown'; + const parts = sessionKey.split(':'); + // "agent:main:..." -> "main" + if (parts[0] === 'agent' && parts[1]) return parts[1]; + return sessionKey.split(':')[0] || 'unknown'; +} + +module.exports = { format, formatElapsed, formatTokens, statusIcon, truncateLine, extractAgentId }; diff --git a/src/status-watcher.js b/src/status-watcher.js new file mode 100644 index 0000000..bfa64c4 --- /dev/null +++ b/src/status-watcher.js @@ -0,0 +1,394 @@ +'use strict'; + +/** + * status-watcher.js — Core JSONL watcher. + * + * - fs.watch on TRANSCRIPT_DIR (recursive) + * - On file change: read new bytes, parse JSONL, update SessionState + * - Map parsed events to status lines + * - Detect file truncation (compaction) -> reset offset + * - Debounce updates via status-box.js throttle + * - Idle detection: pendingToolCalls==0 AND no new lines for IDLE_TIMEOUT_S + * - Emits: 'session-update' (sessionKey, sessionState) + * 'session-idle' (sessionKey) + */ + +const fs = require('fs'); +const path = require('path'); +const { EventEmitter } = require('events'); +const { resolve: resolveLabel } = require('./tool-labels'); + +class StatusWatcher extends EventEmitter { + /** + * @param {object} opts + * @param {string} opts.transcriptDir - Base transcript directory + * @param {number} [opts.idleTimeoutS] - Idle timeout in seconds + * @param {object} [opts.logger] - pino logger + */ + constructor(opts) { + super(); + this.transcriptDir = opts.transcriptDir; + this.idleTimeoutS = opts.idleTimeoutS || 60; + this.logger = opts.logger || null; + + // Map + this.sessions = new Map(); + + // Map + this.fileToSession = new Map(); + + // fs.Watcher instance + this._watcher = null; + this._running = false; + } + + /** + * Register a session to watch. + * @param {string} sessionKey + * @param {string} transcriptFile - Absolute path to {uuid}.jsonl + * @param {object} [initialState] - Pre-populated state (from offset recovery) + */ + addSession(sessionKey, transcriptFile, initialState = {}) { + if (this.sessions.has(sessionKey)) return; + + const state = { + sessionKey, + transcriptFile, + status: 'active', + startTime: initialState.startTime || Date.now(), + lines: initialState.lines || [], + pendingToolCalls: 0, + lastOffset: initialState.lastOffset || 0, + lastActivityAt: Date.now(), + agentId: initialState.agentId || extractAgentId(sessionKey), + depth: initialState.depth || 0, + tokenCount: 0, + children: [], + idleTimer: null, + }; + + this.sessions.set(sessionKey, state); + this.fileToSession.set(transcriptFile, sessionKey); + + if (this.logger) { + this.logger.debug({ sessionKey, transcriptFile }, 'Session added to watcher'); + } + + // Immediately read any existing content + this._readFile(sessionKey, state); + } + + /** + * Remove a session from watching. + * @param {string} sessionKey + */ + removeSession(sessionKey) { + const state = this.sessions.get(sessionKey); + if (!state) return; + + if (state.idleTimer) clearTimeout(state.idleTimer); + this.fileToSession.delete(state.transcriptFile); + this.sessions.delete(sessionKey); + + if (this.logger) { + this.logger.debug({ sessionKey }, 'Session removed from watcher'); + } + } + + /** + * Get current session state (for offset persistence). + * @param {string} sessionKey + * @returns {object|null} + */ + getSessionState(sessionKey) { + return this.sessions.get(sessionKey) || null; + } + + /** + * Start the file system watcher. + */ + start() { + if (this._running) return; + this._running = true; + + try { + this._watcher = fs.watch(this.transcriptDir, { recursive: true }, (eventType, filename) => { + if (!filename) return; + const fullPath = path.resolve(this.transcriptDir, filename); + this._onFileChange(fullPath); + }); + + this._watcher.on('error', (err) => { + if (this.logger) this.logger.error({ err }, 'fs.watch error'); + this.emit('error', err); + }); + + if (this.logger) { + this.logger.info({ dir: this.transcriptDir }, 'StatusWatcher started (fs.watch)'); + } + } catch (err) { + if (this.logger) { + this.logger.error({ err }, 'Failed to start fs.watch — transcriptDir may not exist'); + } + this._running = false; + throw err; + } + } + + /** + * Stop the file system watcher. + */ + stop() { + this._running = false; + if (this._watcher) { + this._watcher.close(); + this._watcher = null; + } + // Clear all idle timers + for (const [, state] of this.sessions) { + if (state.idleTimer) { + clearTimeout(state.idleTimer); + state.idleTimer = null; + } + } + if (this.logger) this.logger.info('StatusWatcher stopped'); + } + + /** + * Handle a file change event. + * @private + */ + _onFileChange(fullPath) { + // Only process .jsonl files + if (!fullPath.endsWith('.jsonl')) return; + + const sessionKey = this.fileToSession.get(fullPath); + if (!sessionKey) return; + + const state = this.sessions.get(sessionKey); + if (!state) return; + + this._readFile(sessionKey, state); + } + + /** + * Read new bytes from a transcript file. + * @private + */ + _readFile(sessionKey, state) { + let fd; + try { + fd = fs.openSync(state.transcriptFile, 'r'); + const stat = fs.fstatSync(fd); + const fileSize = stat.size; + + // Detect file truncation (compaction) + if (fileSize < state.lastOffset) { + if (this.logger) { + this.logger.warn( + { sessionKey, fileSize, lastOffset: state.lastOffset }, + 'Transcript truncated (compaction detected) — resetting offset', + ); + } + state.lastOffset = 0; + state.lines = ['[session compacted - continuing]']; + state.pendingToolCalls = 0; + } + + if (fileSize <= state.lastOffset) { + fs.closeSync(fd); + return; + } + + // Read new bytes + const bytesToRead = fileSize - state.lastOffset; + const buffer = Buffer.allocUnsafe(bytesToRead); + const bytesRead = fs.readSync(fd, buffer, 0, bytesToRead, state.lastOffset); + fs.closeSync(fd); + + state.lastOffset += bytesRead; + + // Parse JSONL lines + const chunk = buffer.toString('utf8', 0, bytesRead); + const lines = chunk.split('\n').filter((l) => l.trim()); + + for (const line of lines) { + this._parseLine(sessionKey, state, line); + } + + // Update activity timestamp + state.lastActivityAt = Date.now(); + + // Schedule idle check + this._scheduleIdleCheck(sessionKey, state); + + // Emit update event + this.emit('session-update', sessionKey, this._sanitizeState(state)); + } catch (err) { + if (fd !== undefined) { + try { + fs.closeSync(fd); + } catch (_e) { + /* ignore close error */ + } + } + if (this.logger) { + this.logger.error({ sessionKey, err }, 'Error reading transcript file'); + } + } + } + + /** + * Parse a single JSONL line and update session state. + * @private + */ + _parseLine(sessionKey, state, line) { + let record; + try { + record = JSON.parse(line); + } catch (_e) { + // Skip malformed lines + return; + } + + const { type } = record; + + switch (type) { + case 'tool_call': { + state.pendingToolCalls++; + const toolName = record.name || record.tool || 'unknown'; + const label = resolveLabel(toolName); + const statusLine = ` ${toolName}: ${label}`; + state.lines.push(statusLine); + break; + } + + case 'tool_result': { + if (state.pendingToolCalls > 0) state.pendingToolCalls--; + const toolName = record.name || record.tool || 'unknown'; + // Update the last tool_call line for this tool to show [OK] or [ERR] + const marker = record.error ? '[ERR]' : '[OK]'; + const idx = findLastIndex(state.lines, (l) => l.includes(` ${toolName}:`)); + if (idx >= 0) { + // Replace placeholder with result + state.lines[idx] = state.lines[idx].replace(/( \[OK\]| \[ERR\])?$/, ` ${marker}`); + } + break; + } + + case 'assistant': { + // Assistant text chunk + const text = (record.text || record.content || '').trim(); + if (text) { + const truncated = text.length > 80 ? text.slice(0, 77) + '...' : text; + state.lines.push(truncated); + } + break; + } + + case 'usage': { + // Token usage update + if (record.total_tokens) state.tokenCount = record.total_tokens; + else if (record.input_tokens || record.output_tokens) { + state.tokenCount = (record.input_tokens || 0) + (record.output_tokens || 0); + } + break; + } + + case 'session_start': { + state.startTime = record.timestamp ? new Date(record.timestamp).getTime() : state.startTime; + break; + } + + default: + // Ignore unknown record types + break; + } + } + + /** + * Schedule an idle check for a session. + * @private + */ + _scheduleIdleCheck(sessionKey, state) { + if (state.idleTimer) { + clearTimeout(state.idleTimer); + } + + state.idleTimer = setTimeout(() => { + this._checkIdle(sessionKey); + }, this.idleTimeoutS * 1000); + } + + /** + * Check if a session is idle. + * @private + */ + _checkIdle(sessionKey) { + const state = this.sessions.get(sessionKey); + if (!state) return; + + const elapsed = Date.now() - state.lastActivityAt; + const idleMs = this.idleTimeoutS * 1000; + + if (elapsed >= idleMs && state.pendingToolCalls === 0) { + if (this.logger) { + this.logger.info({ sessionKey, elapsedS: Math.floor(elapsed / 1000) }, 'Session idle'); + } + state.status = 'done'; + state.idleTimer = null; + this.emit('session-idle', sessionKey, this._sanitizeState(state)); + } else { + // Reschedule + this._scheduleIdleCheck(sessionKey, state); + } + } + + /** + * Return a safe copy of session state (without circular refs, timers). + * @private + */ + _sanitizeState(state) { + return { + sessionKey: state.sessionKey, + transcriptFile: state.transcriptFile, + status: state.status, + startTime: state.startTime, + lines: [...state.lines], + pendingToolCalls: state.pendingToolCalls, + lastOffset: state.lastOffset, + lastActivityAt: state.lastActivityAt, + agentId: state.agentId, + depth: state.depth, + tokenCount: state.tokenCount, + children: state.children, + }; + } +} + +/** + * Extract agent ID from session key. + * @param {string} sessionKey + * @returns {string} + */ +function extractAgentId(sessionKey) { + if (!sessionKey) return 'unknown'; + const parts = sessionKey.split(':'); + if (parts[0] === 'agent' && parts[1]) return parts[1]; + return parts[0] || 'unknown'; +} + +/** + * Find the last index in an array satisfying a predicate. + * @param {Array} arr + * @param {Function} predicate + * @returns {number} + */ +function findLastIndex(arr, predicate) { + for (let i = arr.length - 1; i >= 0; i--) { + if (predicate(arr[i])) return i; // eslint-disable-line security/detect-object-injection + } + return -1; +} + +module.exports = { StatusWatcher }; diff --git a/src/tool-labels.js b/src/tool-labels.js new file mode 100644 index 0000000..927f453 --- /dev/null +++ b/src/tool-labels.js @@ -0,0 +1,109 @@ +'use strict'; + +/** + * tool-labels.js — Pattern-matching tool name -> label resolver. + * + * Resolution order: + * 1. Exact match (e.g. "exec" -> "Running command...") + * 2. Prefix match (e.g. "camofox_*" -> "Using browser...") + * 3. Regex match (e.g. /^claude_/ -> "Running Claude Code...") + * 4. Default label ("Working...") + */ + +const path = require('path'); +const fs = require('fs'); + +let _labels = null; +let _externalFile = null; + +/** + * Load tool labels from JSON file(s). + * Merges external override on top of built-in defaults. + * @param {string|null} externalFile - Path to external JSON override + */ +function loadLabels(externalFile = null) { + _externalFile = externalFile; + + // Load built-in defaults + const builtinPath = path.join(__dirname, 'tool-labels.json'); + let builtin = {}; + try { + // eslint-disable-next-line security/detect-non-literal-fs-filename + builtin = JSON.parse(fs.readFileSync(builtinPath, 'utf8')); + } catch (_e) { + /* use empty defaults if file missing */ + } + + let external = {}; + if (externalFile) { + try { + // eslint-disable-next-line security/detect-non-literal-fs-filename + external = JSON.parse(fs.readFileSync(externalFile, 'utf8')); + } catch (_e) { + /* external file missing or invalid — use built-in only */ + } + } + + // Merge: external overrides built-in + _labels = { + exact: Object.assign({}, builtin.exact || {}, external.exact || {}), + prefix: Object.assign({}, builtin.prefix || {}, external.prefix || {}), + regex: [...(builtin.regex || []), ...(external.regex || [])], + default: external.default !== undefined ? external.default : builtin.default || 'Working...', + }; + + return _labels; +} + +/** + * Resolve a tool name to a human-readable label. + * @param {string} toolName + * @returns {string} + */ +function resolve(toolName) { + if (!_labels) loadLabels(_externalFile); + + const labels = _labels; + + // 1. Exact match + if (Object.prototype.hasOwnProperty.call(labels.exact, toolName)) { + return labels.exact[toolName]; // eslint-disable-line security/detect-object-injection + } + + // 2. Prefix match + for (const [prefix, label] of Object.entries(labels.prefix)) { + if (toolName.startsWith(prefix)) { + return label; + } + } + + // 3. Regex match (patterns stored as strings like "/^claude_/i") + for (const entry of labels.regex || []) { + const pattern = typeof entry === 'string' ? entry : entry.pattern; + const label = typeof entry === 'string' ? labels.default : entry.label; + if (pattern) { + try { + const match = pattern.match(/^\/(.+)\/([gimuy]*)$/); + if (match) { + const re = new RegExp(match[1], match[2]); // eslint-disable-line security/detect-non-literal-regexp + if (re.test(toolName)) return label; + } + } catch (_e) { + /* invalid regex — skip */ + } + } + } + + // 4. Default + return labels.default; +} + +/** + * Reset labels (for tests). + */ +function resetLabels() { + _labels = null; + _externalFile = null; +} + +module.exports = { loadLabels, resolve, resetLabels }; diff --git a/src/tool-labels.json b/src/tool-labels.json new file mode 100644 index 0000000..d1a88db --- /dev/null +++ b/src/tool-labels.json @@ -0,0 +1,41 @@ +{ + "_comment": "Tool name to human-readable label mapping. Supports exact match, prefix match (end with *), and regex (start with /).", + "exact": { + "Read": "Reading file...", + "Write": "Writing file...", + "Edit": "Editing file...", + "exec": "Running command...", + "process": "Managing process...", + "web_search": "Searching the web...", + "web_fetch": "Fetching URL...", + "browser": "Controlling browser...", + "canvas": "Drawing canvas...", + "nodes": "Querying nodes...", + "message": "Sending message...", + "tts": "Generating speech...", + "subagents": "Managing sub-agents...", + "image": "Analyzing image...", + "camofox_create_tab": "Opening browser tab...", + "camofox_close_tab": "Closing browser tab...", + "camofox_navigate": "Navigating browser...", + "camofox_click": "Clicking element...", + "camofox_type": "Typing in browser...", + "camofox_scroll": "Scrolling page...", + "camofox_screenshot": "Taking screenshot...", + "camofox_snapshot": "Capturing page snapshot...", + "camofox_list_tabs": "Listing browser tabs...", + "camofox_import_cookies": "Importing cookies...", + "claude_code_start": "Starting Claude Code task...", + "claude_code_status": "Checking Claude Code status...", + "claude_code_output": "Reading Claude Code output...", + "claude_code_cancel": "Cancelling Claude Code task...", + "claude_code_cleanup": "Cleaning up Claude Code sessions...", + "claude_code_sessions": "Listing Claude Code sessions..." + }, + "prefix": { + "camofox_": "Using browser...", + "claude_code_": "Running Claude Code...", + "nodes_": "Querying nodes..." + }, + "default": "Working..." +} diff --git a/test/unit/circuit-breaker.test.js b/test/unit/circuit-breaker.test.js new file mode 100644 index 0000000..65ea334 --- /dev/null +++ b/test/unit/circuit-breaker.test.js @@ -0,0 +1,171 @@ +'use strict'; + +/** + * Unit tests for circuit-breaker.js + */ + +const { describe, it, beforeEach } = require('node:test'); +const assert = require('node:assert/strict'); + +const { CircuitBreaker, CircuitOpenError, STATE } = require('../../src/circuit-breaker'); + +function makeBreaker(opts = {}) { + return new CircuitBreaker({ threshold: 3, cooldownMs: 100, ...opts }); +} + +describe('CircuitBreaker', () => { + let breaker; + + beforeEach(() => { + breaker = makeBreaker(); + }); + + it('starts in CLOSED state', () => { + assert.equal(breaker.getState(), STATE.CLOSED); + assert.equal(breaker.failures, 0); + }); + + it('executes successfully in CLOSED state', async () => { + const result = await breaker.execute(async () => 42); + assert.equal(result, 42); + assert.equal(breaker.getState(), STATE.CLOSED); + assert.equal(breaker.failures, 0); + }); + + it('tracks failures below threshold', async () => { + const failFn = async () => { + throw new Error('fail'); + }; + await assert.rejects(() => breaker.execute(failFn)); + await assert.rejects(() => breaker.execute(failFn)); + assert.equal(breaker.getState(), STATE.CLOSED); + assert.equal(breaker.failures, 2); + }); + + it('transitions to OPEN after threshold failures', async () => { + const failFn = async () => { + throw new Error('fail'); + }; + for (let i = 0; i < 3; i++) { + await assert.rejects(() => breaker.execute(failFn)); + } + assert.equal(breaker.getState(), STATE.OPEN); + }); + + it('rejects calls immediately when OPEN', async () => { + const failFn = async () => { + throw new Error('fail'); + }; + for (let i = 0; i < 3; i++) { + await assert.rejects(() => breaker.execute(failFn)); + } + assert.equal(breaker.getState(), STATE.OPEN); + + await assert.rejects(() => breaker.execute(async () => 'should not run'), CircuitOpenError); + }); + + it('transitions to HALF_OPEN after cooldown', async () => { + const failFn = async () => { + throw new Error('fail'); + }; + for (let i = 0; i < 3; i++) { + await assert.rejects(() => breaker.execute(failFn)); + } + assert.equal(breaker.getState(), STATE.OPEN); + + // Wait for cooldown + await sleep(150); + + // Next call transitions to HALF_OPEN and executes + const result = await breaker.execute(async () => 'probe'); + assert.equal(result, 'probe'); + assert.equal(breaker.getState(), STATE.CLOSED); + }); + + it('transitions HALF_OPEN -> OPEN if probe fails', async () => { + const failFn = async () => { + throw new Error('fail'); + }; + for (let i = 0; i < 3; i++) { + await assert.rejects(() => breaker.execute(failFn)); + } + assert.equal(breaker.getState(), STATE.OPEN); + + await sleep(150); + + // Probe fails + await assert.rejects(() => breaker.execute(failFn)); + assert.equal(breaker.getState(), STATE.OPEN); + }); + + it('resets on success after HALF_OPEN', async () => { + const failFn = async () => { + throw new Error('fail'); + }; + for (let i = 0; i < 3; i++) { + await assert.rejects(() => breaker.execute(failFn)); + } + await sleep(150); + await breaker.execute(async () => 'ok'); + assert.equal(breaker.getState(), STATE.CLOSED); + assert.equal(breaker.failures, 0); + }); + + it('calls onStateChange callback on transitions', async () => { + const changes = []; + breaker = makeBreaker({ + onStateChange: (newState, oldState) => changes.push({ newState, oldState }), + }); + + const failFn = async () => { + throw new Error('fail'); + }; + for (let i = 0; i < 3; i++) { + await assert.rejects(() => breaker.execute(failFn)); + } + + assert.equal(changes.length, 1); + assert.equal(changes[0].newState, STATE.OPEN); + assert.equal(changes[0].oldState, STATE.CLOSED); + }); + + it('reset() returns to CLOSED', async () => { + const failFn = async () => { + throw new Error('fail'); + }; + for (let i = 0; i < 3; i++) { + await assert.rejects(() => breaker.execute(failFn)); + } + assert.equal(breaker.getState(), STATE.OPEN); + + breaker.reset(); + assert.equal(breaker.getState(), STATE.CLOSED); + assert.equal(breaker.failures, 0); + }); + + it('getMetrics() returns correct data', () => { + const metrics = breaker.getMetrics(); + assert.equal(metrics.state, STATE.CLOSED); + assert.equal(metrics.failures, 0); + assert.equal(metrics.threshold, 3); + assert.equal(metrics.openedAt, null); + assert.equal(metrics.lastError, null); + }); + + it('getMetrics() reflects open state', async () => { + const failFn = async () => { + throw new Error('test error'); + }; + for (let i = 0; i < 3; i++) { + await assert.rejects(() => breaker.execute(failFn)); + } + const metrics = breaker.getMetrics(); + assert.equal(metrics.state, STATE.OPEN); + assert.ok(metrics.openedAt > 0); + assert.equal(metrics.lastError, 'test error'); + }); +}); + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/test/unit/config.test.js b/test/unit/config.test.js new file mode 100644 index 0000000..b77cb5b --- /dev/null +++ b/test/unit/config.test.js @@ -0,0 +1,135 @@ +'use strict'; + +/** + * Unit tests for config.js + */ + +const { describe, it, beforeEach, afterEach } = require('node:test'); +const assert = require('node:assert/strict'); + +const { buildConfig, resetConfig } = require('../../src/config'); + +describe('config.js', () => { + const originalEnv = {}; + + beforeEach(() => { + // Save and clear relevant env vars + const keys = [ + 'MM_BOT_TOKEN', + 'MM_BASE_URL', + 'MM_MAX_SOCKETS', + 'TRANSCRIPT_DIR', + 'THROTTLE_MS', + 'IDLE_TIMEOUT_S', + 'SESSION_POLL_MS', + 'MAX_ACTIVE_SESSIONS', + 'MAX_MESSAGE_CHARS', + 'MAX_STATUS_LINES', + 'MAX_RETRIES', + 'CIRCUIT_BREAKER_THRESHOLD', + 'CIRCUIT_BREAKER_COOLDOWN_S', + 'HEALTH_PORT', + 'LOG_LEVEL', + 'PID_FILE', + 'OFFSET_FILE', + 'TOOL_LABELS_FILE', + 'DEFAULT_CHANNEL', + 'ENABLE_FS_WATCH', + 'MM_PORT', + ]; + for (const k of keys) { + originalEnv[k] = process.env[k]; + delete process.env[k]; + } + resetConfig(); + }); + + afterEach(() => { + // Restore env + for (const [k, v] of Object.entries(originalEnv)) { + if (v === undefined) delete process.env[k]; + else process.env[k] = v; + } + resetConfig(); + }); + + it('throws if MM_BOT_TOKEN is missing', () => { + assert.throws(() => buildConfig(), /MM_BOT_TOKEN/); + }); + + it('builds config with only required vars', () => { + process.env.MM_BOT_TOKEN = 'test-token'; + const cfg = buildConfig(); + assert.equal(cfg.mm.token, 'test-token'); + assert.equal(cfg.mm.baseUrl, 'https://slack.solio.tech'); + assert.equal(cfg.throttleMs, 500); + assert.equal(cfg.idleTimeoutS, 60); + assert.equal(cfg.maxActiveSessions, 20); + assert.equal(cfg.healthPort, 9090); + assert.equal(cfg.logLevel, 'info'); + }); + + it('reads all env vars correctly', () => { + process.env.MM_BOT_TOKEN = 'mytoken'; + process.env.MM_BASE_URL = 'https://mm.example.com'; + process.env.MM_MAX_SOCKETS = '8'; + process.env.THROTTLE_MS = '250'; + process.env.IDLE_TIMEOUT_S = '120'; + process.env.MAX_ACTIVE_SESSIONS = '10'; + process.env.MAX_MESSAGE_CHARS = '5000'; + process.env.LOG_LEVEL = 'debug'; + process.env.HEALTH_PORT = '8080'; + process.env.ENABLE_FS_WATCH = 'false'; + + const cfg = buildConfig(); + assert.equal(cfg.mm.token, 'mytoken'); + assert.equal(cfg.mm.baseUrl, 'https://mm.example.com'); + assert.equal(cfg.mm.maxSockets, 8); + assert.equal(cfg.throttleMs, 250); + assert.equal(cfg.idleTimeoutS, 120); + assert.equal(cfg.maxActiveSessions, 10); + assert.equal(cfg.maxMessageChars, 5000); + assert.equal(cfg.logLevel, 'debug'); + assert.equal(cfg.healthPort, 8080); + assert.equal(cfg.enableFsWatch, false); + }); + + it('throws on invalid MM_BASE_URL', () => { + process.env.MM_BOT_TOKEN = 'token'; + process.env.MM_BASE_URL = 'not-a-url'; + assert.throws(() => buildConfig(), /MM_BASE_URL/); + }); + + it('throws on non-integer THROTTLE_MS', () => { + process.env.MM_BOT_TOKEN = 'token'; + process.env.THROTTLE_MS = 'abc'; + assert.throws(() => buildConfig(), /THROTTLE_MS/); + }); + + it('ENABLE_FS_WATCH accepts "1", "true", "yes"', () => { + process.env.MM_BOT_TOKEN = 'token'; + + process.env.ENABLE_FS_WATCH = '1'; + assert.equal(buildConfig().enableFsWatch, true); + resetConfig(); + + process.env.ENABLE_FS_WATCH = 'true'; + assert.equal(buildConfig().enableFsWatch, true); + resetConfig(); + + process.env.ENABLE_FS_WATCH = 'yes'; + assert.equal(buildConfig().enableFsWatch, true); + resetConfig(); + + process.env.ENABLE_FS_WATCH = '0'; + assert.equal(buildConfig().enableFsWatch, false); + resetConfig(); + }); + + it('nullish defaults for optional string fields', () => { + process.env.MM_BOT_TOKEN = 'token'; + const cfg = buildConfig(); + assert.equal(cfg.toolLabelsFile, null); + assert.equal(cfg.defaultChannel, null); + }); +}); diff --git a/test/unit/logger.test.js b/test/unit/logger.test.js new file mode 100644 index 0000000..cfb8e25 --- /dev/null +++ b/test/unit/logger.test.js @@ -0,0 +1,57 @@ +'use strict'; + +/** + * Unit tests for logger.js + */ + +const { describe, it, beforeEach, afterEach } = require('node:test'); +const assert = require('node:assert/strict'); + +const { getLogger, sessionLogger, resetLogger } = require('../../src/logger'); + +describe('logger.js', () => { + beforeEach(() => { + resetLogger(); + }); + + afterEach(() => { + resetLogger(); + }); + + it('getLogger() returns a pino logger', () => { + const logger = getLogger(); + assert.ok(logger); + assert.equal(typeof logger.info, 'function'); + assert.equal(typeof logger.warn, 'function'); + assert.equal(typeof logger.error, 'function'); + assert.equal(typeof logger.debug, 'function'); + }); + + it('getLogger() returns the same instance each time (singleton)', () => { + const a = getLogger(); + const b = getLogger(); + assert.equal(a, b); + }); + + it('respects LOG_LEVEL env var', () => { + const original = process.env.LOG_LEVEL; + process.env.LOG_LEVEL = 'warn'; + const logger = getLogger(); + assert.equal(logger.level, 'warn'); + process.env.LOG_LEVEL = original; + resetLogger(); + }); + + it('sessionLogger() returns a child logger', () => { + const child = sessionLogger('agent:main:test'); + assert.ok(child); + assert.equal(typeof child.info, 'function'); + }); + + it('resetLogger() clears the singleton', () => { + const a = getLogger(); + resetLogger(); + const b = getLogger(); + assert.notEqual(a, b); + }); +}); diff --git a/test/unit/status-formatter.test.js b/test/unit/status-formatter.test.js new file mode 100644 index 0000000..1298d68 --- /dev/null +++ b/test/unit/status-formatter.test.js @@ -0,0 +1,207 @@ +'use strict'; + +/** + * Unit tests for status-formatter.js + */ + +const { describe, it } = require('node:test'); +const assert = require('node:assert/strict'); + +const { + format, + formatElapsed, + formatTokens, + statusIcon, + truncateLine, + extractAgentId, +} = require('../../src/status-formatter'); + +const NOW = Date.now(); + +function makeState(overrides = {}) { + return { + sessionKey: 'agent:main:mattermost:channel:abc:thread:xyz', + status: 'active', + startTime: NOW - 38000, // 38s ago + lines: [], + children: [], + agentId: 'main', + depth: 0, + tokenCount: 0, + ...overrides, + }; +} + +describe('status-formatter.js', () => { + describe('format()', () => { + it('formats active session with header', () => { + const state = makeState(); + const result = format(state); + assert.ok(result.includes('[ACTIVE]')); + assert.ok(result.includes('main')); + assert.ok(result.match(/\d+s/)); + }); + + it('formats done session with footer', () => { + const state = makeState({ status: 'done' }); + const result = format(state); + assert.ok(result.includes('[DONE]')); + }); + + it('formats error session', () => { + const state = makeState({ status: 'error' }); + const result = format(state); + assert.ok(result.includes('[ERROR]')); + }); + + it('formats interrupted session', () => { + const state = makeState({ status: 'interrupted' }); + const result = format(state); + assert.ok(result.includes('[INTERRUPTED]')); + }); + + it('includes status lines', () => { + const state = makeState({ + lines: ['Reading files...', ' exec: ls [OK]', 'Writing results...'], + }); + const result = format(state); + assert.ok(result.includes('Reading files...')); + assert.ok(result.includes('exec: ls [OK]')); + assert.ok(result.includes('Writing results...')); + }); + + it('limits status lines to maxLines', () => { + const lines = Array.from({ length: 30 }, (_, i) => `Line ${i + 1}`); + const state = makeState({ lines }); + const result = format(state, { maxLines: 5 }); + // Only last 5 lines should appear + assert.ok(result.includes('Line 26')); + assert.ok(result.includes('Line 30')); + assert.ok(!result.includes('Line 1')); + }); + + it('includes token count in done footer', () => { + const state = makeState({ status: 'done', tokenCount: 12400 }); + const result = format(state); + assert.ok(result.includes('12.4k')); + }); + + it('no token count in footer when zero', () => { + const state = makeState({ status: 'done', tokenCount: 0 }); + const result = format(state); + // Should not include "tokens" for zero count + assert.ok(!result.includes('tokens')); + }); + + it('renders nested child sessions', () => { + const child = makeState({ + sessionKey: 'agent:main:subagent:uuid-1', + agentId: 'proj035-planner', + depth: 1, + status: 'done', + lines: ['Reading protocol...'], + }); + const parent = makeState({ + lines: ['Starting plan...'], + children: [child], + }); + const result = format(parent); + assert.ok(result.includes('proj035-planner')); + assert.ok(result.includes('Reading protocol...')); + // Child should be indented + const childLine = result.split('\n').find((l) => l.includes('proj035-planner')); + assert.ok(childLine && childLine.startsWith(' ')); + }); + + it('active session has no done footer', () => { + const state = makeState({ status: 'active' }); + const result = format(state); + const lines = result.split('\n'); + // No line should contain [DONE], [ERROR], [INTERRUPTED] + assert.ok(!lines.some((l) => /\[(DONE|ERROR|INTERRUPTED)\]/.test(l))); + }); + }); + + describe('formatElapsed()', () => { + it('formats seconds', () => { + assert.equal(formatElapsed(0), '0s'); + assert.equal(formatElapsed(1000), '1s'); + assert.equal(formatElapsed(59000), '59s'); + }); + + it('formats minutes', () => { + assert.equal(formatElapsed(60000), '1m0s'); + assert.equal(formatElapsed(90000), '1m30s'); + assert.equal(formatElapsed(3599000), '59m59s'); + }); + + it('formats hours', () => { + assert.equal(formatElapsed(3600000), '1h0m'); + assert.equal(formatElapsed(7260000), '2h1m'); + }); + + it('handles negative values', () => { + assert.equal(formatElapsed(-1000), '0s'); + }); + }); + + describe('formatTokens()', () => { + it('formats small counts', () => { + assert.equal(formatTokens(0), '0'); + assert.equal(formatTokens(999), '999'); + }); + + it('formats thousands', () => { + assert.equal(formatTokens(1000), '1.0k'); + assert.equal(formatTokens(12400), '12.4k'); + assert.equal(formatTokens(999900), '999.9k'); + }); + + it('formats millions', () => { + assert.equal(formatTokens(1000000), '1.0M'); + assert.equal(formatTokens(2500000), '2.5M'); + }); + }); + + describe('statusIcon()', () => { + it('returns correct icons', () => { + assert.equal(statusIcon('active'), '[ACTIVE]'); + assert.equal(statusIcon('done'), '[DONE]'); + assert.equal(statusIcon('error'), '[ERROR]'); + assert.equal(statusIcon('interrupted'), '[INTERRUPTED]'); + assert.equal(statusIcon('unknown'), '[UNKNOWN]'); + assert.equal(statusIcon(''), '[UNKNOWN]'); + }); + }); + + describe('truncateLine()', () => { + it('does not truncate short lines', () => { + const line = 'Short line'; + assert.equal(truncateLine(line), line); + }); + + it('truncates long lines', () => { + const line = 'x'.repeat(200); + const result = truncateLine(line); + assert.ok(result.length <= 120); + assert.ok(result.endsWith('...')); + }); + }); + + describe('extractAgentId()', () => { + it('extracts agent ID from session key', () => { + assert.equal(extractAgentId('agent:main:mattermost:channel:abc'), 'main'); + assert.equal(extractAgentId('agent:coder-agent:session:123'), 'coder-agent'); + }); + + it('handles non-standard keys', () => { + assert.equal(extractAgentId('main'), 'main'); + assert.equal(extractAgentId(''), 'unknown'); + }); + + it('handles null/undefined', () => { + assert.equal(extractAgentId(null), 'unknown'); + assert.equal(extractAgentId(undefined), 'unknown'); + }); + }); +}); diff --git a/test/unit/tool-labels.test.js b/test/unit/tool-labels.test.js new file mode 100644 index 0000000..340f50a --- /dev/null +++ b/test/unit/tool-labels.test.js @@ -0,0 +1,185 @@ +'use strict'; + +/** + * Unit tests for tool-labels.js + */ + +const { describe, it, beforeEach, afterEach } = require('node:test'); +const assert = require('node:assert/strict'); +const path = require('path'); +const fs = require('fs'); +const os = require('os'); + +const { loadLabels, resolve, resetLabels } = require('../../src/tool-labels'); + +describe('tool-labels.js', () => { + beforeEach(() => { + resetLabels(); + }); + + afterEach(() => { + resetLabels(); + }); + + describe('exact match', () => { + it('resolves known tools by exact name', () => { + loadLabels(null); + assert.equal(resolve('exec'), 'Running command...'); + assert.equal(resolve('Read'), 'Reading file...'); + assert.equal(resolve('Write'), 'Writing file...'); + assert.equal(resolve('Edit'), 'Editing file...'); + assert.equal(resolve('web_search'), 'Searching the web...'); + assert.equal(resolve('web_fetch'), 'Fetching URL...'); + assert.equal(resolve('message'), 'Sending message...'); + assert.equal(resolve('tts'), 'Generating speech...'); + assert.equal(resolve('subagents'), 'Managing sub-agents...'); + assert.equal(resolve('image'), 'Analyzing image...'); + assert.equal(resolve('process'), 'Managing process...'); + assert.equal(resolve('browser'), 'Controlling browser...'); + }); + }); + + describe('prefix match', () => { + it('resolves camofox_ tools via prefix', () => { + loadLabels(null); + assert.equal(resolve('camofox_create_tab'), 'Opening browser tab...'); // exact takes priority + assert.equal(resolve('camofox_some_new_tool'), 'Using browser...'); + }); + + it('resolves claude_code_ tools via prefix', () => { + loadLabels(null); + assert.equal(resolve('claude_code_start'), 'Starting Claude Code task...'); // exact takes priority + assert.equal(resolve('claude_code_something_new'), 'Running Claude Code...'); + }); + }); + + describe('default label', () => { + it('returns default for unknown tools', () => { + loadLabels(null); + assert.equal(resolve('some_unknown_tool'), 'Working...'); + assert.equal(resolve(''), 'Working...'); + assert.equal(resolve('xyz'), 'Working...'); + }); + }); + + describe('external override', () => { + let tmpFile; + + beforeEach(() => { + tmpFile = path.join(os.tmpdir(), `tool-labels-test-${Date.now()}.json`); + }); + + afterEach(() => { + try { + fs.unlinkSync(tmpFile); + } catch (_e) { + /* ignore */ + } + }); + + it('external exact overrides built-in', () => { + fs.writeFileSync( + tmpFile, + JSON.stringify({ + exact: { exec: 'Custom exec label...' }, + prefix: {}, + }), + ); + loadLabels(tmpFile); + assert.equal(resolve('exec'), 'Custom exec label...'); + // Non-overridden built-in still works + assert.equal(resolve('Read'), 'Reading file...'); + }); + + it('external prefix adds new prefix', () => { + fs.writeFileSync( + tmpFile, + JSON.stringify({ + exact: {}, + prefix: { my_tool_: 'My custom tool...' }, + }), + ); + loadLabels(tmpFile); + assert.equal(resolve('my_tool_do_something'), 'My custom tool...'); + }); + + it('external default overrides built-in default', () => { + fs.writeFileSync( + tmpFile, + JSON.stringify({ + exact: {}, + prefix: {}, + default: 'Custom default...', + }), + ); + loadLabels(tmpFile); + assert.equal(resolve('completely_unknown'), 'Custom default...'); + }); + + it('handles missing external file gracefully', () => { + loadLabels('/nonexistent/path/tool-labels.json'); + // Should fall back to built-in + assert.equal(resolve('exec'), 'Running command...'); + }); + + it('handles malformed external JSON gracefully', () => { + fs.writeFileSync(tmpFile, 'not valid json {{{'); + loadLabels(tmpFile); + // Should fall back to built-in + assert.equal(resolve('exec'), 'Running command...'); + }); + }); + + describe('regex match', () => { + let tmpFile; + + beforeEach(() => { + tmpFile = path.join(os.tmpdir(), `tool-labels-regex-${Date.now()}.json`); + }); + + afterEach(() => { + try { + fs.unlinkSync(tmpFile); + } catch (_e) { + /* ignore */ + } + }); + + it('resolves via regex pattern', () => { + fs.writeFileSync( + tmpFile, + JSON.stringify({ + exact: {}, + prefix: {}, + regex: [{ pattern: '/^my_api_/', label: 'Calling API...' }], + }), + ); + loadLabels(tmpFile); + assert.equal(resolve('my_api_create'), 'Calling API...'); + assert.equal(resolve('my_api_update'), 'Calling API...'); + assert.equal(resolve('other_tool'), 'Working...'); + }); + + it('handles invalid regex gracefully', () => { + fs.writeFileSync( + tmpFile, + JSON.stringify({ + exact: {}, + prefix: {}, + regex: [{ pattern: '/[invalid(/', label: 'oops' }], + }), + ); + loadLabels(tmpFile); + // Invalid regex skipped — returns default + assert.equal(resolve('anything'), 'Working...'); + }); + }); + + describe('auto-load', () => { + it('auto-loads built-in labels on first resolve call', () => { + // resetLabels was called in beforeEach — no explicit loadLabels call + const label = resolve('exec'); + assert.equal(label, 'Running command...'); + }); + }); +});