feat: implement queue pruning and message rotation
Enforce QUEUE_MAX_AGE and MAX_HISTORY config values that previously existed but were not applied. The existing cleanup loop now also: - Prunes client_queues entries older than QUEUE_MAX_AGE (default 48h) - Rotates messages per target (channel/DM) beyond MAX_HISTORY (default 10000) - Removes orphaned messages no longer referenced by any client queue closes #40
This commit is contained in:
17
README.md
17
README.md
@@ -1788,10 +1788,11 @@ skew issues) and simpler than UUIDs (integer comparison vs. string comparison).
|
|||||||
|
|
||||||
### Data Lifecycle
|
### Data Lifecycle
|
||||||
|
|
||||||
- **Messages**: Stored indefinitely in the current implementation. Rotation
|
- **Messages**: Rotated per `MAX_HISTORY` — oldest messages beyond the limit
|
||||||
per `MAX_HISTORY` is planned.
|
are pruned periodically per target (channel or DM). Orphaned messages (no
|
||||||
- **Queue entries**: Stored until pruned. Pruning by `QUEUE_MAX_AGE` is
|
longer referenced by any client queue) are also removed.
|
||||||
planned.
|
- **Queue entries**: Pruned automatically when older than `QUEUE_MAX_AGE`
|
||||||
|
(default 48h).
|
||||||
- **Channels**: Deleted when the last member leaves (ephemeral).
|
- **Channels**: Deleted when the last member leaves (ephemeral).
|
||||||
- **Users/sessions**: Deleted on `QUIT` or `POST /api/v1/logout`. Idle
|
- **Users/sessions**: Deleted on `QUIT` or `POST /api/v1/logout`. Idle
|
||||||
sessions are automatically expired after `SESSION_IDLE_TIMEOUT` (default
|
sessions are automatically expired after `SESSION_IDLE_TIMEOUT` (default
|
||||||
@@ -1812,9 +1813,9 @@ directory is also loaded automatically via
|
|||||||
| `PORT` | int | `8080` | HTTP listen port |
|
| `PORT` | int | `8080` | HTTP listen port |
|
||||||
| `DBURL` | string | `file:///var/lib/neoirc/state.db?_journal_mode=WAL` | SQLite connection string. For file-based: `file:///path/to/db.db?_journal_mode=WAL`. For in-memory (testing): `file::memory:?cache=shared`. |
|
| `DBURL` | string | `file:///var/lib/neoirc/state.db?_journal_mode=WAL` | SQLite connection string. For file-based: `file:///path/to/db.db?_journal_mode=WAL`. For in-memory (testing): `file::memory:?cache=shared`. |
|
||||||
| `DEBUG` | bool | `false` | Enable debug logging (verbose request/response logging) |
|
| `DEBUG` | bool | `false` | Enable debug logging (verbose request/response logging) |
|
||||||
| `MAX_HISTORY` | int | `10000` | Maximum messages retained per channel before rotation (planned) |
|
| `MAX_HISTORY` | int | `10000` | Maximum messages retained per target (channel or DM) before rotation |
|
||||||
| `SESSION_IDLE_TIMEOUT` | string | `24h` | Session idle timeout as a Go duration string (e.g. `24h`, `30m`). Sessions with no activity for this long are expired and the nick is released. |
|
| `SESSION_IDLE_TIMEOUT` | string | `24h` | Session idle timeout as a Go duration string (e.g. `24h`, `30m`). Sessions with no activity for this long are expired and the nick is released. |
|
||||||
| `QUEUE_MAX_AGE` | int | `172800` | Maximum age of client queue entries in seconds (48h). Entries older than this are pruned (planned). |
|
| `QUEUE_MAX_AGE` | int | `172800` | Maximum age of client queue entries in seconds (48h). Entries older than this are pruned. |
|
||||||
| `MAX_MESSAGE_SIZE` | int | `4096` | Maximum message body size in bytes (planned enforcement) |
|
| `MAX_MESSAGE_SIZE` | int | `4096` | Maximum message body size in bytes (planned enforcement) |
|
||||||
| `LONG_POLL_TIMEOUT`| int | `15` | Default long-poll timeout in seconds (client can override via query param, server caps at 30) |
|
| `LONG_POLL_TIMEOUT`| int | `15` | Default long-poll timeout in seconds (client can override via query param, server caps at 30) |
|
||||||
| `MOTD` | string | `""` | Message of the day, shown to clients via `GET /api/v1/server` |
|
| `MOTD` | string | `""` | Message of the day, shown to clients via `GET /api/v1/server` |
|
||||||
@@ -2228,8 +2229,8 @@ GET /api/v1/challenge
|
|||||||
### Post-MVP (Planned)
|
### Post-MVP (Planned)
|
||||||
|
|
||||||
- [ ] **Hashcash proof-of-work** for session creation (abuse prevention)
|
- [ ] **Hashcash proof-of-work** for session creation (abuse prevention)
|
||||||
- [ ] **Queue pruning** — delete old queue entries per `QUEUE_MAX_AGE`
|
- [x] **Queue pruning** — delete old queue entries per `QUEUE_MAX_AGE`
|
||||||
- [ ] **Message rotation** — enforce `MAX_HISTORY` per channel
|
- [x] **Message rotation** — enforce `MAX_HISTORY` per target
|
||||||
- [ ] **Channel modes** — enforce `+i`, `+m`, `+s`, `+t`, `+n`
|
- [ ] **Channel modes** — enforce `+i`, `+m`, `+s`, `+t`, `+n`
|
||||||
- [ ] **User channel modes** — `+o` (operator), `+v` (voice)
|
- [ ] **User channel modes** — `+o` (operator), `+v` (voice)
|
||||||
- [x] **MODE command** — query channel and user modes (set not yet implemented)
|
- [x] **MODE command** — query channel and user modes (set not yet implemented)
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ type Config struct {
|
|||||||
SentryDSN string
|
SentryDSN string
|
||||||
MaxHistory int
|
MaxHistory int
|
||||||
MaxMessageSize int
|
MaxMessageSize int
|
||||||
|
QueueMaxAge int
|
||||||
MOTD string
|
MOTD string
|
||||||
ServerName string
|
ServerName string
|
||||||
FederationKey string
|
FederationKey string
|
||||||
@@ -70,6 +71,7 @@ func New(
|
|||||||
viper.SetDefault("METRICS_PASSWORD", "")
|
viper.SetDefault("METRICS_PASSWORD", "")
|
||||||
viper.SetDefault("MAX_HISTORY", "10000")
|
viper.SetDefault("MAX_HISTORY", "10000")
|
||||||
viper.SetDefault("MAX_MESSAGE_SIZE", "4096")
|
viper.SetDefault("MAX_MESSAGE_SIZE", "4096")
|
||||||
|
viper.SetDefault("QUEUE_MAX_AGE", "172800")
|
||||||
viper.SetDefault("MOTD", defaultMOTD)
|
viper.SetDefault("MOTD", defaultMOTD)
|
||||||
viper.SetDefault("SERVER_NAME", "")
|
viper.SetDefault("SERVER_NAME", "")
|
||||||
viper.SetDefault("FEDERATION_KEY", "")
|
viper.SetDefault("FEDERATION_KEY", "")
|
||||||
@@ -94,6 +96,7 @@ func New(
|
|||||||
MetricsPassword: viper.GetString("METRICS_PASSWORD"),
|
MetricsPassword: viper.GetString("METRICS_PASSWORD"),
|
||||||
MaxHistory: viper.GetInt("MAX_HISTORY"),
|
MaxHistory: viper.GetInt("MAX_HISTORY"),
|
||||||
MaxMessageSize: viper.GetInt("MAX_MESSAGE_SIZE"),
|
MaxMessageSize: viper.GetInt("MAX_MESSAGE_SIZE"),
|
||||||
|
QueueMaxAge: viper.GetInt("QUEUE_MAX_AGE"),
|
||||||
MOTD: viper.GetString("MOTD"),
|
MOTD: viper.GetString("MOTD"),
|
||||||
ServerName: viper.GetString("SERVER_NAME"),
|
ServerName: viper.GetString("SERVER_NAME"),
|
||||||
FederationKey: viper.GetString("FEDERATION_KEY"),
|
FederationKey: viper.GetString("FEDERATION_KEY"),
|
||||||
|
|||||||
@@ -1096,3 +1096,128 @@ func (database *Database) GetSessionCreatedAt(
|
|||||||
|
|
||||||
return createdAt, nil
|
return createdAt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PruneOldQueueEntries deletes client_queues rows older
|
||||||
|
// than cutoff and returns the number of rows removed.
|
||||||
|
func (database *Database) PruneOldQueueEntries(
|
||||||
|
ctx context.Context,
|
||||||
|
cutoff time.Time,
|
||||||
|
) (int64, error) {
|
||||||
|
res, err := database.conn.ExecContext(ctx,
|
||||||
|
"DELETE FROM client_queues WHERE created_at < ?",
|
||||||
|
cutoff,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf(
|
||||||
|
"prune old queue entries: %w", err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
deleted, _ := res.RowsAffected()
|
||||||
|
|
||||||
|
return deleted, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// PruneOrphanedMessages deletes messages that are no
|
||||||
|
// longer referenced by any client_queues row and returns
|
||||||
|
// the number of rows removed.
|
||||||
|
func (database *Database) PruneOrphanedMessages(
|
||||||
|
ctx context.Context,
|
||||||
|
) (int64, error) {
|
||||||
|
res, err := database.conn.ExecContext(ctx,
|
||||||
|
`DELETE FROM messages WHERE id NOT IN
|
||||||
|
(SELECT DISTINCT message_id
|
||||||
|
FROM client_queues)`,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf(
|
||||||
|
"prune orphaned messages: %w", err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
deleted, _ := res.RowsAffected()
|
||||||
|
|
||||||
|
return deleted, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RotateChannelMessages enforces MAX_HISTORY per channel
|
||||||
|
// by deleting the oldest messages beyond the limit for
|
||||||
|
// each msg_to target. Returns the total number of rows
|
||||||
|
// removed.
|
||||||
|
func (database *Database) RotateChannelMessages(
|
||||||
|
ctx context.Context,
|
||||||
|
maxHistory int,
|
||||||
|
) (int64, error) {
|
||||||
|
if maxHistory <= 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find distinct targets that have messages.
|
||||||
|
rows, err := database.conn.QueryContext(ctx,
|
||||||
|
`SELECT msg_to, COUNT(*) AS cnt
|
||||||
|
FROM messages
|
||||||
|
WHERE msg_to != ''
|
||||||
|
GROUP BY msg_to
|
||||||
|
HAVING cnt > ?`,
|
||||||
|
maxHistory,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf(
|
||||||
|
"list targets for rotation: %w", err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() { _ = rows.Close() }()
|
||||||
|
|
||||||
|
type targetCount struct {
|
||||||
|
target string
|
||||||
|
count int64
|
||||||
|
}
|
||||||
|
|
||||||
|
var targets []targetCount
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var entry targetCount
|
||||||
|
|
||||||
|
err = rows.Scan(&entry.target, &entry.count)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf(
|
||||||
|
"scan target count: %w", err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
targets = append(targets, entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = rows.Err()
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("rows error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var totalDeleted int64
|
||||||
|
|
||||||
|
for _, entry := range targets {
|
||||||
|
res, delErr := database.conn.ExecContext(ctx,
|
||||||
|
`DELETE FROM messages
|
||||||
|
WHERE msg_to = ?
|
||||||
|
AND id NOT IN (
|
||||||
|
SELECT id FROM messages
|
||||||
|
WHERE msg_to = ?
|
||||||
|
ORDER BY id DESC
|
||||||
|
LIMIT ?
|
||||||
|
)`,
|
||||||
|
entry.target, entry.target, maxHistory,
|
||||||
|
)
|
||||||
|
if delErr != nil {
|
||||||
|
return totalDeleted, fmt.Errorf(
|
||||||
|
"rotate messages for %s: %w",
|
||||||
|
entry.target, delErr,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
deleted, _ := res.RowsAffected()
|
||||||
|
totalDeleted += deleted
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalDeleted, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -200,4 +200,63 @@ func (hdlr *Handlers) runCleanup(
|
|||||||
"deleted", deleted,
|
"deleted", deleted,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
hdlr.pruneQueuesAndMessages(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// pruneQueuesAndMessages removes old client_queues entries
|
||||||
|
// per QUEUE_MAX_AGE, rotates messages per MAX_HISTORY, and
|
||||||
|
// cleans up orphaned messages.
|
||||||
|
func (hdlr *Handlers) pruneQueuesAndMessages(
|
||||||
|
ctx context.Context,
|
||||||
|
) {
|
||||||
|
queueMaxAge := hdlr.params.Config.QueueMaxAge
|
||||||
|
if queueMaxAge > 0 {
|
||||||
|
queueCutoff := time.Now().Add(
|
||||||
|
-time.Duration(queueMaxAge) * time.Second,
|
||||||
|
)
|
||||||
|
|
||||||
|
pruned, err := hdlr.params.Database.
|
||||||
|
PruneOldQueueEntries(ctx, queueCutoff)
|
||||||
|
if err != nil {
|
||||||
|
hdlr.log.Error(
|
||||||
|
"queue pruning failed", "error", err,
|
||||||
|
)
|
||||||
|
} else if pruned > 0 {
|
||||||
|
hdlr.log.Info(
|
||||||
|
"pruned old queue entries",
|
||||||
|
"deleted", pruned,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
maxHistory := hdlr.params.Config.MaxHistory
|
||||||
|
if maxHistory > 0 {
|
||||||
|
rotated, err := hdlr.params.Database.
|
||||||
|
RotateChannelMessages(ctx, maxHistory)
|
||||||
|
if err != nil {
|
||||||
|
hdlr.log.Error(
|
||||||
|
"message rotation failed", "error", err,
|
||||||
|
)
|
||||||
|
} else if rotated > 0 {
|
||||||
|
hdlr.log.Info(
|
||||||
|
"rotated old messages",
|
||||||
|
"deleted", rotated,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
orphaned, err := hdlr.params.Database.
|
||||||
|
PruneOrphanedMessages(ctx)
|
||||||
|
if err != nil {
|
||||||
|
hdlr.log.Error(
|
||||||
|
"orphan message cleanup failed",
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
} else if orphaned > 0 {
|
||||||
|
hdlr.log.Info(
|
||||||
|
"pruned orphaned messages",
|
||||||
|
"deleted", orphaned,
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user