From 6f6ea33eaad3e0147ac04962a6ccaaf5a1c5c0e9 Mon Sep 17 00:00:00 2001 From: user Date: Tue, 10 Mar 2026 03:20:21 -0700 Subject: [PATCH] feat: implement queue pruning and message rotation Enforce QUEUE_MAX_AGE and MAX_HISTORY config values that previously existed but were not applied. The existing cleanup loop now also: - Prunes client_queues entries older than QUEUE_MAX_AGE (default 48h) - Rotates messages per target (channel/DM) beyond MAX_HISTORY (default 10000) - Removes orphaned messages no longer referenced by any client queue closes #40 --- README.md | 17 ++--- internal/config/config.go | 3 + internal/db/queries.go | 125 ++++++++++++++++++++++++++++++++++ internal/handlers/handlers.go | 59 ++++++++++++++++ 4 files changed, 196 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 146dad6..31dfdaa 100644 --- a/README.md +++ b/README.md @@ -1784,10 +1784,11 @@ skew issues) and simpler than UUIDs (integer comparison vs. string comparison). ### Data Lifecycle -- **Messages**: Stored indefinitely in the current implementation. Rotation - per `MAX_HISTORY` is planned. -- **Queue entries**: Stored until pruned. Pruning by `QUEUE_MAX_AGE` is - planned. +- **Messages**: Rotated per `MAX_HISTORY` — oldest messages beyond the limit + are pruned periodically per target (channel or DM). Orphaned messages (no + longer referenced by any client queue) are also removed. +- **Queue entries**: Pruned automatically when older than `QUEUE_MAX_AGE` + (default 48h). - **Channels**: Deleted when the last member leaves (ephemeral). - **Users/sessions**: Deleted on `QUIT` or `POST /api/v1/logout`. Idle sessions are automatically expired after `SESSION_IDLE_TIMEOUT` (default @@ -1808,9 +1809,9 @@ directory is also loaded automatically via | `PORT` | int | `8080` | HTTP listen port | | `DBURL` | string | `file:///var/lib/neoirc/state.db?_journal_mode=WAL` | SQLite connection string. For file-based: `file:///path/to/db.db?_journal_mode=WAL`. For in-memory (testing): `file::memory:?cache=shared`. | | `DEBUG` | bool | `false` | Enable debug logging (verbose request/response logging) | -| `MAX_HISTORY` | int | `10000` | Maximum messages retained per channel before rotation (planned) | +| `MAX_HISTORY` | int | `10000` | Maximum messages retained per target (channel or DM) before rotation | | `SESSION_IDLE_TIMEOUT` | string | `24h` | Session idle timeout as a Go duration string (e.g. `24h`, `30m`). Sessions with no activity for this long are expired and the nick is released. | -| `QUEUE_MAX_AGE` | int | `172800` | Maximum age of client queue entries in seconds (48h). Entries older than this are pruned (planned). | +| `QUEUE_MAX_AGE` | int | `172800` | Maximum age of client queue entries in seconds (48h). Entries older than this are pruned. | | `MAX_MESSAGE_SIZE` | int | `4096` | Maximum message body size in bytes (planned enforcement) | | `LONG_POLL_TIMEOUT`| int | `15` | Default long-poll timeout in seconds (client can override via query param, server caps at 30) | | `MOTD` | string | `""` | Message of the day, shown to clients via `GET /api/v1/server` | @@ -2224,8 +2225,8 @@ GET /api/v1/challenge ### Post-MVP (Planned) - [ ] **Hashcash proof-of-work** for session creation (abuse prevention) -- [ ] **Queue pruning** — delete old queue entries per `QUEUE_MAX_AGE` -- [ ] **Message rotation** — enforce `MAX_HISTORY` per channel +- [x] **Queue pruning** — delete old queue entries per `QUEUE_MAX_AGE` +- [x] **Message rotation** — enforce `MAX_HISTORY` per target - [ ] **Channel modes** — enforce `+i`, `+m`, `+s`, `+t`, `+n` - [ ] **User channel modes** — `+o` (operator), `+v` (voice) - [x] **MODE command** — query channel and user modes (set not yet implemented) diff --git a/internal/config/config.go b/internal/config/config.go index 468ca75..393e98e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -40,6 +40,7 @@ type Config struct { SentryDSN string MaxHistory int MaxMessageSize int + QueueMaxAge int MOTD string ServerName string FederationKey string @@ -70,6 +71,7 @@ func New( viper.SetDefault("METRICS_PASSWORD", "") viper.SetDefault("MAX_HISTORY", "10000") viper.SetDefault("MAX_MESSAGE_SIZE", "4096") + viper.SetDefault("QUEUE_MAX_AGE", "172800") viper.SetDefault("MOTD", defaultMOTD) viper.SetDefault("SERVER_NAME", "") viper.SetDefault("FEDERATION_KEY", "") @@ -94,6 +96,7 @@ func New( MetricsPassword: viper.GetString("METRICS_PASSWORD"), MaxHistory: viper.GetInt("MAX_HISTORY"), MaxMessageSize: viper.GetInt("MAX_MESSAGE_SIZE"), + QueueMaxAge: viper.GetInt("QUEUE_MAX_AGE"), MOTD: viper.GetString("MOTD"), ServerName: viper.GetString("SERVER_NAME"), FederationKey: viper.GetString("FEDERATION_KEY"), diff --git a/internal/db/queries.go b/internal/db/queries.go index 6ffff23..a4a4c26 100644 --- a/internal/db/queries.go +++ b/internal/db/queries.go @@ -1096,3 +1096,128 @@ func (database *Database) GetSessionCreatedAt( return createdAt, nil } + +// PruneOldQueueEntries deletes client_queues rows older +// than cutoff and returns the number of rows removed. +func (database *Database) PruneOldQueueEntries( + ctx context.Context, + cutoff time.Time, +) (int64, error) { + res, err := database.conn.ExecContext(ctx, + "DELETE FROM client_queues WHERE created_at < ?", + cutoff, + ) + if err != nil { + return 0, fmt.Errorf( + "prune old queue entries: %w", err, + ) + } + + deleted, _ := res.RowsAffected() + + return deleted, nil +} + +// PruneOrphanedMessages deletes messages that are no +// longer referenced by any client_queues row and returns +// the number of rows removed. +func (database *Database) PruneOrphanedMessages( + ctx context.Context, +) (int64, error) { + res, err := database.conn.ExecContext(ctx, + `DELETE FROM messages WHERE id NOT IN + (SELECT DISTINCT message_id + FROM client_queues)`, + ) + if err != nil { + return 0, fmt.Errorf( + "prune orphaned messages: %w", err, + ) + } + + deleted, _ := res.RowsAffected() + + return deleted, nil +} + +// RotateChannelMessages enforces MAX_HISTORY per channel +// by deleting the oldest messages beyond the limit for +// each msg_to target. Returns the total number of rows +// removed. +func (database *Database) RotateChannelMessages( + ctx context.Context, + maxHistory int, +) (int64, error) { + if maxHistory <= 0 { + return 0, nil + } + + // Find distinct targets that have messages. + rows, err := database.conn.QueryContext(ctx, + `SELECT msg_to, COUNT(*) AS cnt + FROM messages + WHERE msg_to != '' + GROUP BY msg_to + HAVING cnt > ?`, + maxHistory, + ) + if err != nil { + return 0, fmt.Errorf( + "list targets for rotation: %w", err, + ) + } + + defer func() { _ = rows.Close() }() + + type targetCount struct { + target string + count int64 + } + + var targets []targetCount + + for rows.Next() { + var entry targetCount + + err = rows.Scan(&entry.target, &entry.count) + if err != nil { + return 0, fmt.Errorf( + "scan target count: %w", err, + ) + } + + targets = append(targets, entry) + } + + err = rows.Err() + if err != nil { + return 0, fmt.Errorf("rows error: %w", err) + } + + var totalDeleted int64 + + for _, entry := range targets { + res, delErr := database.conn.ExecContext(ctx, + `DELETE FROM messages + WHERE msg_to = ? + AND id NOT IN ( + SELECT id FROM messages + WHERE msg_to = ? + ORDER BY id DESC + LIMIT ? + )`, + entry.target, entry.target, maxHistory, + ) + if delErr != nil { + return totalDeleted, fmt.Errorf( + "rotate messages for %s: %w", + entry.target, delErr, + ) + } + + deleted, _ := res.RowsAffected() + totalDeleted += deleted + } + + return totalDeleted, nil +} diff --git a/internal/handlers/handlers.go b/internal/handlers/handlers.go index 72ef994..4876284 100644 --- a/internal/handlers/handlers.go +++ b/internal/handlers/handlers.go @@ -200,4 +200,63 @@ func (hdlr *Handlers) runCleanup( "deleted", deleted, ) } + + hdlr.pruneQueuesAndMessages(ctx) +} + +// pruneQueuesAndMessages removes old client_queues entries +// per QUEUE_MAX_AGE, rotates messages per MAX_HISTORY, and +// cleans up orphaned messages. +func (hdlr *Handlers) pruneQueuesAndMessages( + ctx context.Context, +) { + queueMaxAge := hdlr.params.Config.QueueMaxAge + if queueMaxAge > 0 { + queueCutoff := time.Now().Add( + -time.Duration(queueMaxAge) * time.Second, + ) + + pruned, err := hdlr.params.Database. + PruneOldQueueEntries(ctx, queueCutoff) + if err != nil { + hdlr.log.Error( + "queue pruning failed", "error", err, + ) + } else if pruned > 0 { + hdlr.log.Info( + "pruned old queue entries", + "deleted", pruned, + ) + } + } + + maxHistory := hdlr.params.Config.MaxHistory + if maxHistory > 0 { + rotated, err := hdlr.params.Database. + RotateChannelMessages(ctx, maxHistory) + if err != nil { + hdlr.log.Error( + "message rotation failed", "error", err, + ) + } else if rotated > 0 { + hdlr.log.Info( + "rotated old messages", + "deleted", rotated, + ) + } + } + + orphaned, err := hdlr.params.Database. + PruneOrphanedMessages(ctx) + if err != nil { + hdlr.log.Error( + "orphan message cleanup failed", + "error", err, + ) + } else if orphaned > 0 { + hdlr.log.Info( + "pruned orphaned messages", + "deleted", orphaned, + ) + } }