Phase 1: Fix RHS panel to fetch existing sessions on mount - Add initial API fetch in useAllStatusUpdates() hook - Allow GET /sessions endpoint without shared secret auth - RHS panel now shows sessions after page refresh Phase 2: Floating widget component (registerRootComponent) - New floating_widget.tsx with auto-show/hide behavior - Draggable, collapsible to pulsing dot with session count - Shows last 5 lines of most recent active session - Position persisted to localStorage - CSS styles using Mattermost theme variables Phase 3: Session cleanup and KV optimization - Add LastUpdateMs field to SessionData for staleness tracking - Set LastUpdateMs on session create and update - Add periodic cleanup goroutine (every 5 min) - Stale active sessions (>30 min no update) marked interrupted - Expired non-active sessions (>1 hr) deleted from KV - Add ListAllSessions and keep ListActiveSessions as helper - Add debug logging to daemon file polling Closes #5
371 lines
11 KiB
Go
371 lines
11 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/mattermost/mattermost/server/public/model"
|
|
"github.com/mattermost/mattermost/server/public/plugin"
|
|
)
|
|
|
|
// ServeHTTP handles HTTP requests to the plugin.
|
|
func (p *Plugin) ServeHTTP(c *plugin.Context, w http.ResponseWriter, r *http.Request) {
|
|
path := r.URL.Path
|
|
|
|
// Auth middleware: validate shared secret for write operations.
|
|
// Read-only endpoints (GET /sessions, GET /health) are accessible to any
|
|
// authenticated Mattermost user — no shared secret required.
|
|
isReadOnly := r.Method == http.MethodGet && (path == "/api/v1/sessions" || path == "/api/v1/health")
|
|
if !isReadOnly {
|
|
config := p.getConfiguration()
|
|
if config.SharedSecret != "" {
|
|
auth := r.Header.Get("Authorization")
|
|
expected := "Bearer " + config.SharedSecret
|
|
if auth != expected {
|
|
http.Error(w, `{"error": "unauthorized"}`, http.StatusUnauthorized)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
switch {
|
|
case path == "/api/v1/health" && r.Method == http.MethodGet:
|
|
p.handleHealth(w, r)
|
|
case path == "/api/v1/sessions" && r.Method == http.MethodGet:
|
|
p.handleListSessions(w, r)
|
|
case path == "/api/v1/sessions" && r.Method == http.MethodPost:
|
|
p.handleCreateSession(w, r)
|
|
case strings.HasPrefix(path, "/api/v1/sessions/") && r.Method == http.MethodPut:
|
|
sessionKey := strings.TrimPrefix(path, "/api/v1/sessions/")
|
|
p.handleUpdateSession(w, r, sessionKey)
|
|
case strings.HasPrefix(path, "/api/v1/sessions/") && r.Method == http.MethodDelete:
|
|
sessionKey := strings.TrimPrefix(path, "/api/v1/sessions/")
|
|
p.handleDeleteSession(w, r, sessionKey)
|
|
default:
|
|
http.NotFound(w, r)
|
|
}
|
|
}
|
|
|
|
// handleHealth returns plugin health status.
|
|
func (p *Plugin) handleHealth(w http.ResponseWriter, r *http.Request) {
|
|
sessions, err := p.store.ListAllSessions()
|
|
count := 0
|
|
if err == nil {
|
|
for _, s := range sessions {
|
|
if s.Status == "active" {
|
|
count++
|
|
}
|
|
}
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, map[string]interface{}{
|
|
"status": "healthy",
|
|
"active_sessions": count,
|
|
"plugin_id": "com.openclaw.livestatus",
|
|
})
|
|
}
|
|
|
|
// handleListSessions returns all sessions (active and non-active).
|
|
func (p *Plugin) handleListSessions(w http.ResponseWriter, r *http.Request) {
|
|
sessions, err := p.store.ListAllSessions()
|
|
if err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, sessions)
|
|
}
|
|
|
|
// CreateSessionRequest is the request body for creating a new session.
|
|
type CreateSessionRequest struct {
|
|
SessionKey string `json:"session_key"`
|
|
ChannelID string `json:"channel_id"`
|
|
RootID string `json:"root_id,omitempty"`
|
|
AgentID string `json:"agent_id"`
|
|
BotUserID string `json:"bot_user_id,omitempty"`
|
|
}
|
|
|
|
// handleCreateSession creates a new custom_livestatus post and starts tracking.
|
|
func (p *Plugin) handleCreateSession(w http.ResponseWriter, r *http.Request) {
|
|
var req CreateSessionRequest
|
|
if err := readJSON(r, &req); err != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
if req.SessionKey == "" || req.ChannelID == "" {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "session_key and channel_id required"})
|
|
return
|
|
}
|
|
|
|
// Check max active sessions
|
|
config := p.getConfiguration()
|
|
allSessions, _ := p.store.ListAllSessions()
|
|
activeCount := 0
|
|
for _, s := range allSessions {
|
|
if s.Status == "active" {
|
|
activeCount++
|
|
}
|
|
}
|
|
if activeCount >= config.MaxActiveSessions {
|
|
writeJSON(w, http.StatusTooManyRequests, map[string]string{"error": "max active sessions reached"})
|
|
return
|
|
}
|
|
|
|
// Create the custom post — UserId is required
|
|
// Use the bot user ID passed in the request, or fall back to plugin bot
|
|
userID := req.BotUserID
|
|
if userID == "" {
|
|
// Try to get plugin's own bot
|
|
userID = p.getBotUserID()
|
|
}
|
|
if userID == "" {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "bot_user_id required (no plugin bot available)"})
|
|
return
|
|
}
|
|
|
|
post := &model.Post{
|
|
UserId: userID,
|
|
ChannelId: req.ChannelID,
|
|
RootId: req.RootID,
|
|
Type: "custom_livestatus",
|
|
Message: "Agent session active",
|
|
}
|
|
post.AddProp("session_key", req.SessionKey)
|
|
post.AddProp("agent_id", req.AgentID)
|
|
post.AddProp("status", "active")
|
|
|
|
createdPost, appErr := p.API.CreatePost(post)
|
|
if appErr != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": appErr.Error()})
|
|
return
|
|
}
|
|
if createdPost == nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "CreatePost returned nil without error"})
|
|
return
|
|
}
|
|
|
|
// Store session data
|
|
now := time.Now().UnixMilli()
|
|
sessionData := SessionData{
|
|
SessionKey: req.SessionKey,
|
|
PostID: createdPost.Id,
|
|
ChannelID: req.ChannelID,
|
|
RootID: req.RootID,
|
|
AgentID: req.AgentID,
|
|
Status: "active",
|
|
Lines: []string{},
|
|
StartTimeMs: now,
|
|
LastUpdateMs: now,
|
|
}
|
|
if err := p.store.SaveSession(req.SessionKey, sessionData); err != nil {
|
|
p.API.LogWarn("Failed to save session", "error", err.Error())
|
|
}
|
|
|
|
// Broadcast initial state
|
|
p.broadcastUpdate(req.ChannelID, sessionData)
|
|
|
|
writeJSON(w, http.StatusCreated, map[string]string{
|
|
"post_id": createdPost.Id,
|
|
"session_key": req.SessionKey,
|
|
})
|
|
}
|
|
|
|
// UpdateSessionRequest is the request body for updating a session.
|
|
type UpdateSessionRequest struct {
|
|
Status string `json:"status"`
|
|
Lines []string `json:"lines"`
|
|
ElapsedMs int64 `json:"elapsed_ms"`
|
|
TokenCount int `json:"token_count"`
|
|
Children []SessionData `json:"children,omitempty"`
|
|
StartTimeMs int64 `json:"start_time_ms"`
|
|
}
|
|
|
|
// handleUpdateSession updates session data and broadcasts via WebSocket.
|
|
// Critically: does NOT call any Mattermost post API — no "(edited)" label.
|
|
func (p *Plugin) handleUpdateSession(w http.ResponseWriter, r *http.Request, sessionKey string) {
|
|
var req UpdateSessionRequest
|
|
if err := readJSON(r, &req); err != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
// Get existing session
|
|
existing, err := p.store.GetSession(sessionKey)
|
|
if err != nil || existing == nil {
|
|
writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"})
|
|
return
|
|
}
|
|
|
|
// Update fields
|
|
existing.Status = req.Status
|
|
existing.Lines = req.Lines
|
|
existing.ElapsedMs = req.ElapsedMs
|
|
existing.TokenCount = req.TokenCount
|
|
existing.Children = req.Children
|
|
existing.LastUpdateMs = time.Now().UnixMilli()
|
|
if req.StartTimeMs > 0 {
|
|
existing.StartTimeMs = req.StartTimeMs
|
|
}
|
|
|
|
// Save to KV store
|
|
if err := p.store.SaveSession(sessionKey, *existing); err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
|
|
// Broadcast update via WebSocket (for webapp — instant, no API call)
|
|
p.broadcastUpdate(existing.ChannelID, *existing)
|
|
|
|
// Mobile fallback: update the post message field with formatted markdown.
|
|
// Mobile app doesn't render custom post types, so it shows the message field.
|
|
// The webapp plugin overrides rendering entirely, so "(edited)" is invisible on web.
|
|
go p.updatePostMessageForMobile(*existing)
|
|
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
|
}
|
|
|
|
// handleDeleteSession marks a session as complete.
|
|
func (p *Plugin) handleDeleteSession(w http.ResponseWriter, r *http.Request, sessionKey string) {
|
|
existing, err := p.store.GetSession(sessionKey)
|
|
if err != nil || existing == nil {
|
|
writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"})
|
|
return
|
|
}
|
|
|
|
// Mark as done
|
|
existing.Status = "done"
|
|
|
|
// Update the post props to reflect completion (one final API call)
|
|
post, appErr := p.API.GetPost(existing.PostID)
|
|
if appErr == nil && post != nil {
|
|
post.AddProp("status", "done")
|
|
post.AddProp("final_lines", existing.Lines)
|
|
post.AddProp("elapsed_ms", existing.ElapsedMs)
|
|
post.AddProp("token_count", existing.TokenCount)
|
|
_, _ = p.API.UpdatePost(post)
|
|
}
|
|
|
|
// Broadcast final state
|
|
p.broadcastUpdate(existing.ChannelID, *existing)
|
|
|
|
// Clean up KV store
|
|
_ = p.store.DeleteSession(sessionKey)
|
|
|
|
writeJSON(w, http.StatusOK, map[string]string{"status": "done"})
|
|
}
|
|
|
|
// updatePostMessageForMobile updates the post's Message field with formatted markdown.
|
|
// This provides a mobile fallback — mobile apps don't render custom post type components
|
|
// but DO display the Message field. On webapp, the plugin's React component overrides
|
|
// rendering entirely, so the "(edited)" indicator is invisible.
|
|
func (p *Plugin) updatePostMessageForMobile(data SessionData) {
|
|
post, appErr := p.API.GetPost(data.PostID)
|
|
if appErr != nil || post == nil {
|
|
return
|
|
}
|
|
|
|
newMessage := formatStatusMarkdown(data)
|
|
if post.Message == newMessage {
|
|
return // Skip API call if content hasn't changed
|
|
}
|
|
|
|
post.Message = newMessage
|
|
_, updateErr := p.API.UpdatePost(post)
|
|
if updateErr != nil {
|
|
p.API.LogDebug("Failed to update post message for mobile", "postId", data.PostID, "error", updateErr.Error())
|
|
}
|
|
}
|
|
|
|
// formatStatusMarkdown generates a markdown blockquote status view for mobile clients.
|
|
func formatStatusMarkdown(data SessionData) string {
|
|
// Status icon
|
|
var statusIcon string
|
|
switch data.Status {
|
|
case "active":
|
|
statusIcon = "[ACTIVE]"
|
|
case "done":
|
|
statusIcon = "[DONE]"
|
|
case "error":
|
|
statusIcon = "[ERROR]"
|
|
case "interrupted":
|
|
statusIcon = "[INTERRUPTED]"
|
|
default:
|
|
statusIcon = "[UNKNOWN]"
|
|
}
|
|
|
|
// Elapsed time
|
|
elapsed := formatElapsedMs(data.ElapsedMs)
|
|
|
|
// Build lines
|
|
result := fmt.Sprintf("> **%s** `%s` | %s\n", statusIcon, data.AgentID, elapsed)
|
|
|
|
// Show last N status lines (keep it compact for mobile)
|
|
maxLines := 15
|
|
lines := data.Lines
|
|
if len(lines) > maxLines {
|
|
lines = lines[len(lines)-maxLines:]
|
|
}
|
|
for _, line := range lines {
|
|
if len(line) > 120 {
|
|
line = line[:117] + "..."
|
|
}
|
|
result += "> " + line + "\n"
|
|
}
|
|
|
|
// Token count for completed sessions
|
|
if data.Status != "active" && data.TokenCount > 0 {
|
|
result += fmt.Sprintf("> **[%s]** %s | %s tokens\n", strings.ToUpper(data.Status), elapsed, formatTokenCount(data.TokenCount))
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// formatElapsedMs formats milliseconds as human-readable duration.
|
|
func formatElapsedMs(ms int64) string {
|
|
if ms < 0 {
|
|
ms = 0
|
|
}
|
|
s := ms / 1000
|
|
m := s / 60
|
|
h := m / 60
|
|
if h > 0 {
|
|
return fmt.Sprintf("%dh%dm", h, m%60)
|
|
}
|
|
if m > 0 {
|
|
return fmt.Sprintf("%dm%ds", m, s%60)
|
|
}
|
|
return fmt.Sprintf("%ds", s)
|
|
}
|
|
|
|
// formatTokenCount formats a token count compactly.
|
|
func formatTokenCount(count int) string {
|
|
if count >= 1000000 {
|
|
return fmt.Sprintf("%.1fM", float64(count)/1000000)
|
|
}
|
|
if count >= 1000 {
|
|
return fmt.Sprintf("%.1fk", float64(count)/1000)
|
|
}
|
|
return fmt.Sprintf("%d", count)
|
|
}
|
|
|
|
// Helper functions
|
|
|
|
func readJSON(r *http.Request, v interface{}) error {
|
|
body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) // 1MB limit
|
|
if err != nil {
|
|
return fmt.Errorf("read body: %w", err)
|
|
}
|
|
defer r.Body.Close()
|
|
return json.Unmarshal(body, v)
|
|
}
|
|
|
|
func writeJSON(w http.ResponseWriter, status int, v interface{}) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(status)
|
|
json.NewEncoder(w).Encode(v)
|
|
}
|