package main import ( "encoding/json" "fmt" "io" "net/http" "strings" "time" "github.com/mattermost/mattermost/server/public/model" "github.com/mattermost/mattermost/server/public/plugin" ) // ServeHTTP handles HTTP requests to the plugin. func (p *Plugin) ServeHTTP(c *plugin.Context, w http.ResponseWriter, r *http.Request) { path := r.URL.Path // Auth middleware: two auth paths. // 1. Shared secret (Bearer token) — used by the daemon for write operations. // 2. Mattermost session (Mattermost-User-Id header) — used by browser requests. // The Mattermost server automatically injects this header for authenticated // requests routed through the plugin HTTP handler. // // Read-only endpoints (GET /sessions, GET /health) accept either auth method. // Write endpoints (POST, PUT, DELETE) require the shared secret. isReadOnly := r.Method == http.MethodGet && (path == "/api/v1/sessions" || path == "/api/v1/health") config := p.getConfiguration() hasSharedSecret := false if config.SharedSecret != "" { auth := r.Header.Get("Authorization") expected := "Bearer " + config.SharedSecret hasSharedSecret = (auth == expected) } // Check Mattermost session auth (browser requests). // The MM server injects Mattermost-User-Id for authenticated users. mmUserID := r.Header.Get("Mattermost-User-Id") hasMattermostSession := mmUserID != "" if isReadOnly { // Read-only: accept either shared secret OR Mattermost session if !hasSharedSecret && !hasMattermostSession { http.Error(w, `{"error": "unauthorized: valid Mattermost session or shared secret required"}`, http.StatusUnauthorized) return } } else { // Write operations: require shared secret (daemon auth) if !hasSharedSecret { http.Error(w, `{"error": "unauthorized"}`, http.StatusUnauthorized) return } } switch { case path == "/api/v1/health" && r.Method == http.MethodGet: p.handleHealth(w, r) case path == "/api/v1/sessions" && r.Method == http.MethodGet: p.handleListSessions(w, r) case path == "/api/v1/sessions" && r.Method == http.MethodPost: p.handleCreateSession(w, r) case strings.HasPrefix(path, "/api/v1/sessions/") && r.Method == http.MethodPut: sessionKey := strings.TrimPrefix(path, "/api/v1/sessions/") p.handleUpdateSession(w, r, sessionKey) case strings.HasPrefix(path, "/api/v1/sessions/") && r.Method == http.MethodDelete: sessionKey := strings.TrimPrefix(path, "/api/v1/sessions/") p.handleDeleteSession(w, r, sessionKey) default: http.NotFound(w, r) } } // handleHealth returns plugin health status. func (p *Plugin) handleHealth(w http.ResponseWriter, r *http.Request) { sessions, err := p.store.ListAllSessions() count := 0 if err == nil { for _, s := range sessions { if s.Status == "active" { count++ } } } writeJSON(w, http.StatusOK, map[string]interface{}{ "status": "healthy", "active_sessions": count, "plugin_id": "com.openclaw.livestatus", }) } // handleListSessions returns all sessions (active and non-active). func (p *Plugin) handleListSessions(w http.ResponseWriter, r *http.Request) { sessions, err := p.store.ListAllSessions() if err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } writeJSON(w, http.StatusOK, sessions) } // CreateSessionRequest is the request body for creating a new session. type CreateSessionRequest struct { SessionKey string `json:"session_key"` ChannelID string `json:"channel_id"` RootID string `json:"root_id,omitempty"` AgentID string `json:"agent_id"` BotUserID string `json:"bot_user_id,omitempty"` } // handleCreateSession creates a new custom_livestatus post and starts tracking. func (p *Plugin) handleCreateSession(w http.ResponseWriter, r *http.Request) { var req CreateSessionRequest if err := readJSON(r, &req); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) return } if req.SessionKey == "" || req.ChannelID == "" { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "session_key and channel_id required"}) return } // Validate KV key length — Mattermost enforces a 50-char limit. // Encoded key = kvPrefix (11 chars) + url.PathEscape(sessionKey). // Exceeding the limit causes KVSet to silently succeed but never store data. if len(encodeKey(req.SessionKey)) > 50 { writeJSON(w, http.StatusBadRequest, map[string]string{ "error": fmt.Sprintf("session_key too long: encoded key length %d exceeds 50-char KV limit", len(encodeKey(req.SessionKey))), }) return } // Check max active sessions config := p.getConfiguration() allSessions, _ := p.store.ListAllSessions() activeCount := 0 for _, s := range allSessions { if s.Status == "active" { activeCount++ } } if activeCount >= config.MaxActiveSessions { writeJSON(w, http.StatusTooManyRequests, map[string]string{"error": "max active sessions reached"}) return } // Create the custom post — UserId is required // Use the bot user ID passed in the request, or fall back to plugin bot userID := req.BotUserID if userID == "" { // Try to get plugin's own bot userID = p.getBotUserID() } if userID == "" { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "bot_user_id required (no plugin bot available)"}) return } post := &model.Post{ UserId: userID, ChannelId: req.ChannelID, RootId: req.RootID, Type: "custom_livestatus", Message: "Agent session active", } post.AddProp("session_key", req.SessionKey) post.AddProp("agent_id", req.AgentID) post.AddProp("status", "active") createdPost, appErr := p.API.CreatePost(post) if appErr != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": appErr.Error()}) return } if createdPost == nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": "CreatePost returned nil without error"}) return } // Store session data now := time.Now().UnixMilli() sessionData := SessionData{ SessionKey: req.SessionKey, PostID: createdPost.Id, ChannelID: req.ChannelID, RootID: req.RootID, AgentID: req.AgentID, Status: "active", Lines: []string{}, StartTimeMs: now, LastUpdateMs: now, } if err := p.store.SaveSession(req.SessionKey, sessionData); err != nil { p.API.LogWarn("Failed to save session", "error", err.Error()) } // Broadcast initial state p.broadcastUpdate(req.ChannelID, sessionData) writeJSON(w, http.StatusCreated, map[string]string{ "post_id": createdPost.Id, "session_key": req.SessionKey, }) } // UpdateSessionRequest is the request body for updating a session. type UpdateSessionRequest struct { Status string `json:"status"` Lines []string `json:"lines"` ElapsedMs int64 `json:"elapsed_ms"` TokenCount int `json:"token_count"` Children []SessionData `json:"children,omitempty"` StartTimeMs int64 `json:"start_time_ms"` } // handleUpdateSession updates session data and broadcasts via WebSocket. // Critically: does NOT call any Mattermost post API — no "(edited)" label. func (p *Plugin) handleUpdateSession(w http.ResponseWriter, r *http.Request, sessionKey string) { var req UpdateSessionRequest if err := readJSON(r, &req); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) return } // Get existing session existing, err := p.store.GetSession(sessionKey) if err != nil || existing == nil { writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"}) return } // Update fields existing.Status = req.Status existing.Lines = req.Lines existing.ElapsedMs = req.ElapsedMs existing.TokenCount = req.TokenCount existing.Children = req.Children existing.LastUpdateMs = time.Now().UnixMilli() if req.StartTimeMs > 0 { existing.StartTimeMs = req.StartTimeMs } // Save to KV store if err := p.store.SaveSession(sessionKey, *existing); err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } // Broadcast update via WebSocket (for webapp — instant, no API call) p.broadcastUpdate(existing.ChannelID, *existing) // Mobile fallback: update the post message field with formatted markdown. // Mobile app doesn't render custom post types, so it shows the message field. // The webapp plugin overrides rendering entirely, so "(edited)" is invisible on web. go p.updatePostMessageForMobile(*existing) writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) } // handleDeleteSession marks a session as complete. func (p *Plugin) handleDeleteSession(w http.ResponseWriter, r *http.Request, sessionKey string) { existing, err := p.store.GetSession(sessionKey) if err != nil || existing == nil { writeJSON(w, http.StatusNotFound, map[string]string{"error": "session not found"}) return } // Mark as done existing.Status = "done" // Update the post props to reflect completion (one final API call) post, appErr := p.API.GetPost(existing.PostID) if appErr == nil && post != nil { post.AddProp("status", "done") post.AddProp("final_lines", existing.Lines) post.AddProp("elapsed_ms", existing.ElapsedMs) post.AddProp("token_count", existing.TokenCount) _, _ = p.API.UpdatePost(post) } // Broadcast final state p.broadcastUpdate(existing.ChannelID, *existing) // Clean up KV store _ = p.store.DeleteSession(sessionKey) writeJSON(w, http.StatusOK, map[string]string{"status": "done"}) } // updatePostMessageForMobile updates the post's Message field with formatted markdown. // This provides a mobile fallback — mobile apps don't render custom post type components // but DO display the Message field. On webapp, the plugin's React component overrides // rendering entirely, so the "(edited)" indicator is invisible. func (p *Plugin) updatePostMessageForMobile(data SessionData) { post, appErr := p.API.GetPost(data.PostID) if appErr != nil || post == nil { return } newMessage := formatStatusMarkdown(data) if post.Message == newMessage { return // Skip API call if content hasn't changed } post.Message = newMessage _, updateErr := p.API.UpdatePost(post) if updateErr != nil { p.API.LogDebug("Failed to update post message for mobile", "postId", data.PostID, "error", updateErr.Error()) } } // formatStatusMarkdown generates a markdown blockquote status view for mobile clients. func formatStatusMarkdown(data SessionData) string { // Status icon var statusIcon string switch data.Status { case "active": statusIcon = "[ACTIVE]" case "done": statusIcon = "[DONE]" case "error": statusIcon = "[ERROR]" case "interrupted": statusIcon = "[INTERRUPTED]" default: statusIcon = "[UNKNOWN]" } // Elapsed time elapsed := formatElapsedMs(data.ElapsedMs) // Build lines result := fmt.Sprintf("> **%s** `%s` | %s\n", statusIcon, data.AgentID, elapsed) // Show last N status lines (keep it compact for mobile) maxLines := 15 lines := data.Lines if len(lines) > maxLines { lines = lines[len(lines)-maxLines:] } for _, line := range lines { if len(line) > 120 { line = line[:117] + "..." } result += "> " + line + "\n" } // Token count for completed sessions if data.Status != "active" && data.TokenCount > 0 { result += fmt.Sprintf("> **[%s]** %s | %s tokens\n", strings.ToUpper(data.Status), elapsed, formatTokenCount(data.TokenCount)) } return result } // formatElapsedMs formats milliseconds as human-readable duration. func formatElapsedMs(ms int64) string { if ms < 0 { ms = 0 } s := ms / 1000 m := s / 60 h := m / 60 if h > 0 { return fmt.Sprintf("%dh%dm", h, m%60) } if m > 0 { return fmt.Sprintf("%dm%ds", m, s%60) } return fmt.Sprintf("%ds", s) } // formatTokenCount formats a token count compactly. func formatTokenCount(count int) string { if count >= 1000000 { return fmt.Sprintf("%.1fM", float64(count)/1000000) } if count >= 1000 { return fmt.Sprintf("%.1fk", float64(count)/1000) } return fmt.Sprintf("%d", count) } // Helper functions func readJSON(r *http.Request, v interface{}) error { body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) // 1MB limit if err != nil { return fmt.Errorf("read body: %w", err) } defer r.Body.Close() return json.Unmarshal(body, v) } func writeJSON(w http.ResponseWriter, status int, v interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(v) }