Files
sol f545cb00be fix: orphan session cleanup + instant reactivation via ghost watch
plugin/server/store.go:
- CleanStaleSessions now handles last_update_ms=0 (pre-cleanup era orphans)
- Zero-timestamp sessions: mark active ones interrupted, delete non-active ones
- Previously these were silently skipped with 'continue', accumulating forever

src/status-watcher.js:
- removeSession() keeps fileToSession mapping as ghost entry ('\x00ghost:key')
- When ghost file changes, emits 'session-reactivate' immediately instead of
  waiting up to 2s for the session-monitor poll cycle
- Ghost removed after first trigger to avoid repeated events

src/session-monitor.js:
- Added pollNow() for immediate poll without waiting for interval tick
- Reactivation check now uses sessions.json updatedAt vs completedAt timestamp
  (pure infrastructure: two on-disk timestamps, no AI involvement)

src/watcher-manager.js:
- Wires session-reactivate event: clearCompleted() + pollNow() for instant re-detection
- New sessions now show up within ~100ms of first file change instead of 2s

Net result: status box appears reliably on every turn, clears 3s after reply,
zero orphan sessions accumulating in the KV store.
2026-03-09 18:31:41 +00:00

179 lines
4.8 KiB
Go

package main
import (
"encoding/json"
"fmt"
"net/url"
"strings"
"time"
"github.com/mattermost/mattermost/server/public/plugin"
)
const kvPrefix = "ls_session_"
// SessionData represents a tracked agent session.
type SessionData struct {
SessionKey string `json:"session_key"`
PostID string `json:"post_id"`
ChannelID string `json:"channel_id"`
RootID string `json:"root_id,omitempty"`
AgentID string `json:"agent_id"`
Status string `json:"status"` // active, done, error, interrupted
Lines []string `json:"lines"`
ElapsedMs int64 `json:"elapsed_ms"`
TokenCount int `json:"token_count"`
Children []SessionData `json:"children,omitempty"`
StartTimeMs int64 `json:"start_time_ms"`
LastUpdateMs int64 `json:"last_update_ms"`
}
// Store wraps Mattermost KV store operations for session persistence.
type Store struct {
api plugin.API
}
// NewStore creates a new Store instance.
func NewStore(api plugin.API) *Store {
return &Store{api: api}
}
// encodeKey URL-encodes a session key for safe KV storage.
func encodeKey(sessionKey string) string {
return kvPrefix + url.PathEscape(sessionKey)
}
// SaveSession stores a session in the KV store.
func (s *Store) SaveSession(sessionKey string, data SessionData) error {
b, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("marshal session: %w", err)
}
appErr := s.api.KVSet(encodeKey(sessionKey), b)
if appErr != nil {
return fmt.Errorf("kv set: %s", appErr.Error())
}
return nil
}
// GetSession retrieves a session from the KV store.
func (s *Store) GetSession(sessionKey string) (*SessionData, error) {
b, appErr := s.api.KVGet(encodeKey(sessionKey))
if appErr != nil {
return nil, fmt.Errorf("kv get: %s", appErr.Error())
}
if b == nil {
return nil, nil
}
var data SessionData
if err := json.Unmarshal(b, &data); err != nil {
return nil, fmt.Errorf("unmarshal session: %w", err)
}
return &data, nil
}
// DeleteSession removes a session from the KV store.
func (s *Store) DeleteSession(sessionKey string) error {
appErr := s.api.KVDelete(encodeKey(sessionKey))
if appErr != nil {
return fmt.Errorf("kv delete: %s", appErr.Error())
}
return nil
}
// ListAllSessions returns all sessions from the KV store (active and non-active).
func (s *Store) ListAllSessions() ([]SessionData, error) {
var sessions []SessionData
page := 0
perPage := 100
for {
keys, appErr := s.api.KVList(page, perPage)
if appErr != nil {
return nil, fmt.Errorf("kv list: %s", appErr.Error())
}
if len(keys) == 0 {
break
}
for _, key := range keys {
if !strings.HasPrefix(key, kvPrefix) {
continue
}
b, getErr := s.api.KVGet(key)
if getErr != nil || b == nil {
continue
}
var data SessionData
if err := json.Unmarshal(b, &data); err != nil {
continue
}
sessions = append(sessions, data)
}
page++
}
return sessions, nil
}
// ListActiveSessions returns only active sessions from the KV store.
func (s *Store) ListActiveSessions() ([]SessionData, error) {
all, err := s.ListAllSessions()
if err != nil {
return nil, err
}
var active []SessionData
for _, sess := range all {
if sess.Status == "active" {
active = append(active, sess)
}
}
return active, nil
}
// CleanStaleSessions marks stale active sessions as interrupted and deletes expired completed sessions.
// staleThresholdMs: active sessions with no update for this long are marked interrupted.
// expireThresholdMs: non-active sessions older than this are deleted from KV.
func (s *Store) CleanStaleSessions(staleThresholdMs, expireThresholdMs int64) (cleaned int, expired int, err error) {
now := time.Now().UnixMilli()
all, err := s.ListAllSessions()
if err != nil {
return 0, 0, err
}
for _, session := range all {
lastUpdate := session.LastUpdateMs
if lastUpdate == 0 {
lastUpdate = session.StartTimeMs
}
if lastUpdate == 0 {
// No timestamps at all (pre-cleanup era orphan) — treat as expired immediately.
// Delete non-active sessions; mark active ones as interrupted so they get
// picked up on the next cleanup cycle with a real timestamp.
if session.Status == "active" {
session.Status = "interrupted"
session.LastUpdateMs = now
_ = s.SaveSession(session.SessionKey, session)
cleaned++
} else {
_ = s.DeleteSession(session.SessionKey)
expired++
}
continue
}
age := now - lastUpdate
if session.Status == "active" && age > staleThresholdMs {
// Mark stale sessions as interrupted
session.Status = "interrupted"
session.LastUpdateMs = now
_ = s.SaveSession(session.SessionKey, session)
cleaned++
} else if session.Status != "active" && age > expireThresholdMs {
// Delete expired completed/interrupted sessions
_ = s.DeleteSession(session.SessionKey)
expired++
}
}
return cleaned, expired, nil
}