refactor: 30-day defaults, time-based message expiry
All checks were successful
check / check (push) Successful in 5s
All checks were successful
check / check (push) Successful in 5s
- Change QUEUE_MAX_AGE default from 48h to 30 days (2592000s) - Replace count-based MAX_HISTORY with time-based MESSAGE_MAX_AGE (default 30 days) — messages older than the configured age are pruned instead of keeping a fixed count per target - Update README to reflect new defaults and time-based expiry
This commit is contained in:
@@ -1118,84 +1118,23 @@ func (database *Database) PruneOldQueueEntries(
|
||||
return deleted, nil
|
||||
}
|
||||
|
||||
// RotateChannelMessages enforces MAX_HISTORY per channel
|
||||
// by deleting the oldest messages beyond the limit for
|
||||
// each msg_to target. Returns the total number of rows
|
||||
// removed.
|
||||
func (database *Database) RotateChannelMessages(
|
||||
// PruneOldMessages deletes messages older than cutoff and
|
||||
// returns the number of rows removed.
|
||||
func (database *Database) PruneOldMessages(
|
||||
ctx context.Context,
|
||||
maxHistory int,
|
||||
cutoff time.Time,
|
||||
) (int64, error) {
|
||||
if maxHistory <= 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Find distinct targets that have messages.
|
||||
rows, err := database.conn.QueryContext(ctx,
|
||||
`SELECT msg_to, COUNT(*) AS cnt
|
||||
FROM messages
|
||||
WHERE msg_to != ''
|
||||
GROUP BY msg_to
|
||||
HAVING cnt > ?`,
|
||||
maxHistory,
|
||||
res, err := database.conn.ExecContext(ctx,
|
||||
"DELETE FROM messages WHERE created_at < ?",
|
||||
cutoff,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf(
|
||||
"list targets for rotation: %w", err,
|
||||
"prune old messages: %w", err,
|
||||
)
|
||||
}
|
||||
|
||||
defer func() { _ = rows.Close() }()
|
||||
deleted, _ := res.RowsAffected()
|
||||
|
||||
type targetCount struct {
|
||||
target string
|
||||
count int64
|
||||
}
|
||||
|
||||
var targets []targetCount
|
||||
|
||||
for rows.Next() {
|
||||
var entry targetCount
|
||||
|
||||
err = rows.Scan(&entry.target, &entry.count)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf(
|
||||
"scan target count: %w", err,
|
||||
)
|
||||
}
|
||||
|
||||
targets = append(targets, entry)
|
||||
}
|
||||
|
||||
err = rows.Err()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("rows error: %w", err)
|
||||
}
|
||||
|
||||
var totalDeleted int64
|
||||
|
||||
for _, entry := range targets {
|
||||
res, delErr := database.conn.ExecContext(ctx,
|
||||
`DELETE FROM messages
|
||||
WHERE msg_to = ?
|
||||
AND id NOT IN (
|
||||
SELECT id FROM messages
|
||||
WHERE msg_to = ?
|
||||
ORDER BY id DESC
|
||||
LIMIT ?
|
||||
)`,
|
||||
entry.target, entry.target, maxHistory,
|
||||
)
|
||||
if delErr != nil {
|
||||
return totalDeleted, fmt.Errorf(
|
||||
"rotate messages for %s: %w",
|
||||
entry.target, delErr,
|
||||
)
|
||||
}
|
||||
|
||||
deleted, _ := res.RowsAffected()
|
||||
totalDeleted += deleted
|
||||
}
|
||||
|
||||
return totalDeleted, nil
|
||||
return deleted, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user