feat: implement queue pruning and message rotation

Enforce QUEUE_MAX_AGE and MAX_HISTORY config values that previously
existed but were not applied. The existing cleanup loop now also:

- Prunes client_queues entries older than QUEUE_MAX_AGE (default 48h)
- Rotates messages per target (channel/DM) beyond MAX_HISTORY (default 10000)
- Removes orphaned messages no longer referenced by any client queue

closes #40
This commit is contained in:
user
2026-03-10 03:20:21 -07:00
committed by clawbot
parent c07f94a432
commit 62e462d732
4 changed files with 196 additions and 8 deletions

View File

@@ -40,6 +40,7 @@ type Config struct {
SentryDSN string
MaxHistory int
MaxMessageSize int
QueueMaxAge int
MOTD string
ServerName string
FederationKey string
@@ -70,6 +71,7 @@ func New(
viper.SetDefault("METRICS_PASSWORD", "")
viper.SetDefault("MAX_HISTORY", "10000")
viper.SetDefault("MAX_MESSAGE_SIZE", "4096")
viper.SetDefault("QUEUE_MAX_AGE", "172800")
viper.SetDefault("MOTD", defaultMOTD)
viper.SetDefault("SERVER_NAME", "")
viper.SetDefault("FEDERATION_KEY", "")
@@ -94,6 +96,7 @@ func New(
MetricsPassword: viper.GetString("METRICS_PASSWORD"),
MaxHistory: viper.GetInt("MAX_HISTORY"),
MaxMessageSize: viper.GetInt("MAX_MESSAGE_SIZE"),
QueueMaxAge: viper.GetInt("QUEUE_MAX_AGE"),
MOTD: viper.GetString("MOTD"),
ServerName: viper.GetString("SERVER_NAME"),
FederationKey: viper.GetString("FEDERATION_KEY"),

View File

@@ -1096,3 +1096,128 @@ func (database *Database) GetSessionCreatedAt(
return createdAt, nil
}
// PruneOldQueueEntries deletes client_queues rows older
// than cutoff and returns the number of rows removed.
func (database *Database) PruneOldQueueEntries(
ctx context.Context,
cutoff time.Time,
) (int64, error) {
res, err := database.conn.ExecContext(ctx,
"DELETE FROM client_queues WHERE created_at < ?",
cutoff,
)
if err != nil {
return 0, fmt.Errorf(
"prune old queue entries: %w", err,
)
}
deleted, _ := res.RowsAffected()
return deleted, nil
}
// PruneOrphanedMessages deletes messages that are no
// longer referenced by any client_queues row and returns
// the number of rows removed.
func (database *Database) PruneOrphanedMessages(
ctx context.Context,
) (int64, error) {
res, err := database.conn.ExecContext(ctx,
`DELETE FROM messages WHERE id NOT IN
(SELECT DISTINCT message_id
FROM client_queues)`,
)
if err != nil {
return 0, fmt.Errorf(
"prune orphaned messages: %w", err,
)
}
deleted, _ := res.RowsAffected()
return deleted, nil
}
// RotateChannelMessages enforces MAX_HISTORY per channel
// by deleting the oldest messages beyond the limit for
// each msg_to target. Returns the total number of rows
// removed.
func (database *Database) RotateChannelMessages(
ctx context.Context,
maxHistory int,
) (int64, error) {
if maxHistory <= 0 {
return 0, nil
}
// Find distinct targets that have messages.
rows, err := database.conn.QueryContext(ctx,
`SELECT msg_to, COUNT(*) AS cnt
FROM messages
WHERE msg_to != ''
GROUP BY msg_to
HAVING cnt > ?`,
maxHistory,
)
if err != nil {
return 0, fmt.Errorf(
"list targets for rotation: %w", err,
)
}
defer func() { _ = rows.Close() }()
type targetCount struct {
target string
count int64
}
var targets []targetCount
for rows.Next() {
var entry targetCount
err = rows.Scan(&entry.target, &entry.count)
if err != nil {
return 0, fmt.Errorf(
"scan target count: %w", err,
)
}
targets = append(targets, entry)
}
err = rows.Err()
if err != nil {
return 0, fmt.Errorf("rows error: %w", err)
}
var totalDeleted int64
for _, entry := range targets {
res, delErr := database.conn.ExecContext(ctx,
`DELETE FROM messages
WHERE msg_to = ?
AND id NOT IN (
SELECT id FROM messages
WHERE msg_to = ?
ORDER BY id DESC
LIMIT ?
)`,
entry.target, entry.target, maxHistory,
)
if delErr != nil {
return totalDeleted, fmt.Errorf(
"rotate messages for %s: %w",
entry.target, delErr,
)
}
deleted, _ := res.RowsAffected()
totalDeleted += deleted
}
return totalDeleted, nil
}

View File

@@ -200,4 +200,63 @@ func (hdlr *Handlers) runCleanup(
"deleted", deleted,
)
}
hdlr.pruneQueuesAndMessages(ctx)
}
// pruneQueuesAndMessages removes old client_queues entries
// per QUEUE_MAX_AGE, rotates messages per MAX_HISTORY, and
// cleans up orphaned messages.
func (hdlr *Handlers) pruneQueuesAndMessages(
ctx context.Context,
) {
queueMaxAge := hdlr.params.Config.QueueMaxAge
if queueMaxAge > 0 {
queueCutoff := time.Now().Add(
-time.Duration(queueMaxAge) * time.Second,
)
pruned, err := hdlr.params.Database.
PruneOldQueueEntries(ctx, queueCutoff)
if err != nil {
hdlr.log.Error(
"queue pruning failed", "error", err,
)
} else if pruned > 0 {
hdlr.log.Info(
"pruned old queue entries",
"deleted", pruned,
)
}
}
maxHistory := hdlr.params.Config.MaxHistory
if maxHistory > 0 {
rotated, err := hdlr.params.Database.
RotateChannelMessages(ctx, maxHistory)
if err != nil {
hdlr.log.Error(
"message rotation failed", "error", err,
)
} else if rotated > 0 {
hdlr.log.Info(
"rotated old messages",
"deleted", rotated,
)
}
}
orphaned, err := hdlr.params.Database.
PruneOrphanedMessages(ctx)
if err != nil {
hdlr.log.Error(
"orphan message cleanup failed",
"error", err,
)
} else if orphaned > 0 {
hdlr.log.Info(
"pruned orphaned messages",
"deleted", orphaned,
)
}
}