plugin/server/store.go:
- CleanStaleSessions now handles last_update_ms=0 (pre-cleanup era orphans)
- Zero-timestamp sessions: mark active ones interrupted, delete non-active ones
- Previously these were silently skipped with 'continue', accumulating forever
src/status-watcher.js:
- removeSession() keeps fileToSession mapping as ghost entry ('\x00ghost:key')
- When ghost file changes, emits 'session-reactivate' immediately instead of
waiting up to 2s for the session-monitor poll cycle
- Ghost removed after first trigger to avoid repeated events
src/session-monitor.js:
- Added pollNow() for immediate poll without waiting for interval tick
- Reactivation check now uses sessions.json updatedAt vs completedAt timestamp
(pure infrastructure: two on-disk timestamps, no AI involvement)
src/watcher-manager.js:
- Wires session-reactivate event: clearCompleted() + pollNow() for instant re-detection
- New sessions now show up within ~100ms of first file change instead of 2s
Net result: status box appears reliably on every turn, clears 3s after reply,
zero orphan sessions accumulating in the KV store.
179 lines
4.8 KiB
Go
179 lines
4.8 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/mattermost/mattermost/server/public/plugin"
|
|
)
|
|
|
|
const kvPrefix = "ls_session_"
|
|
|
|
// SessionData represents a tracked agent session.
|
|
type SessionData struct {
|
|
SessionKey string `json:"session_key"`
|
|
PostID string `json:"post_id"`
|
|
ChannelID string `json:"channel_id"`
|
|
RootID string `json:"root_id,omitempty"`
|
|
AgentID string `json:"agent_id"`
|
|
Status string `json:"status"` // active, done, error, interrupted
|
|
Lines []string `json:"lines"`
|
|
ElapsedMs int64 `json:"elapsed_ms"`
|
|
TokenCount int `json:"token_count"`
|
|
Children []SessionData `json:"children,omitempty"`
|
|
StartTimeMs int64 `json:"start_time_ms"`
|
|
LastUpdateMs int64 `json:"last_update_ms"`
|
|
}
|
|
|
|
// Store wraps Mattermost KV store operations for session persistence.
|
|
type Store struct {
|
|
api plugin.API
|
|
}
|
|
|
|
// NewStore creates a new Store instance.
|
|
func NewStore(api plugin.API) *Store {
|
|
return &Store{api: api}
|
|
}
|
|
|
|
// encodeKey URL-encodes a session key for safe KV storage.
|
|
func encodeKey(sessionKey string) string {
|
|
return kvPrefix + url.PathEscape(sessionKey)
|
|
}
|
|
|
|
// SaveSession stores a session in the KV store.
|
|
func (s *Store) SaveSession(sessionKey string, data SessionData) error {
|
|
b, err := json.Marshal(data)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal session: %w", err)
|
|
}
|
|
appErr := s.api.KVSet(encodeKey(sessionKey), b)
|
|
if appErr != nil {
|
|
return fmt.Errorf("kv set: %s", appErr.Error())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetSession retrieves a session from the KV store.
|
|
func (s *Store) GetSession(sessionKey string) (*SessionData, error) {
|
|
b, appErr := s.api.KVGet(encodeKey(sessionKey))
|
|
if appErr != nil {
|
|
return nil, fmt.Errorf("kv get: %s", appErr.Error())
|
|
}
|
|
if b == nil {
|
|
return nil, nil
|
|
}
|
|
var data SessionData
|
|
if err := json.Unmarshal(b, &data); err != nil {
|
|
return nil, fmt.Errorf("unmarshal session: %w", err)
|
|
}
|
|
return &data, nil
|
|
}
|
|
|
|
// DeleteSession removes a session from the KV store.
|
|
func (s *Store) DeleteSession(sessionKey string) error {
|
|
appErr := s.api.KVDelete(encodeKey(sessionKey))
|
|
if appErr != nil {
|
|
return fmt.Errorf("kv delete: %s", appErr.Error())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ListAllSessions returns all sessions from the KV store (active and non-active).
|
|
func (s *Store) ListAllSessions() ([]SessionData, error) {
|
|
var sessions []SessionData
|
|
page := 0
|
|
perPage := 100
|
|
|
|
for {
|
|
keys, appErr := s.api.KVList(page, perPage)
|
|
if appErr != nil {
|
|
return nil, fmt.Errorf("kv list: %s", appErr.Error())
|
|
}
|
|
if len(keys) == 0 {
|
|
break
|
|
}
|
|
|
|
for _, key := range keys {
|
|
if !strings.HasPrefix(key, kvPrefix) {
|
|
continue
|
|
}
|
|
b, getErr := s.api.KVGet(key)
|
|
if getErr != nil || b == nil {
|
|
continue
|
|
}
|
|
var data SessionData
|
|
if err := json.Unmarshal(b, &data); err != nil {
|
|
continue
|
|
}
|
|
sessions = append(sessions, data)
|
|
}
|
|
page++
|
|
}
|
|
|
|
return sessions, nil
|
|
}
|
|
|
|
// ListActiveSessions returns only active sessions from the KV store.
|
|
func (s *Store) ListActiveSessions() ([]SessionData, error) {
|
|
all, err := s.ListAllSessions()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var active []SessionData
|
|
for _, sess := range all {
|
|
if sess.Status == "active" {
|
|
active = append(active, sess)
|
|
}
|
|
}
|
|
return active, nil
|
|
}
|
|
|
|
// CleanStaleSessions marks stale active sessions as interrupted and deletes expired completed sessions.
|
|
// staleThresholdMs: active sessions with no update for this long are marked interrupted.
|
|
// expireThresholdMs: non-active sessions older than this are deleted from KV.
|
|
func (s *Store) CleanStaleSessions(staleThresholdMs, expireThresholdMs int64) (cleaned int, expired int, err error) {
|
|
now := time.Now().UnixMilli()
|
|
all, err := s.ListAllSessions()
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
for _, session := range all {
|
|
lastUpdate := session.LastUpdateMs
|
|
if lastUpdate == 0 {
|
|
lastUpdate = session.StartTimeMs
|
|
}
|
|
if lastUpdate == 0 {
|
|
// No timestamps at all (pre-cleanup era orphan) — treat as expired immediately.
|
|
// Delete non-active sessions; mark active ones as interrupted so they get
|
|
// picked up on the next cleanup cycle with a real timestamp.
|
|
if session.Status == "active" {
|
|
session.Status = "interrupted"
|
|
session.LastUpdateMs = now
|
|
_ = s.SaveSession(session.SessionKey, session)
|
|
cleaned++
|
|
} else {
|
|
_ = s.DeleteSession(session.SessionKey)
|
|
expired++
|
|
}
|
|
continue
|
|
}
|
|
age := now - lastUpdate
|
|
|
|
if session.Status == "active" && age > staleThresholdMs {
|
|
// Mark stale sessions as interrupted
|
|
session.Status = "interrupted"
|
|
session.LastUpdateMs = now
|
|
_ = s.SaveSession(session.SessionKey, session)
|
|
cleaned++
|
|
} else if session.Status != "active" && age > expireThresholdMs {
|
|
// Delete expired completed/interrupted sessions
|
|
_ = s.DeleteSession(session.SessionKey)
|
|
expired++
|
|
}
|
|
}
|
|
return cleaned, expired, nil
|
|
}
|