gomeshalerter/storage.go

731 lines
16 KiB
Go

package main
import (
"context"
"crypto/sha256"
"database/sql"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"sort"
"strings"
"time"
"github.com/oklog/ulid/v2"
)
// ANSI color codes
const (
colorReset = "\033[0m"
colorRed = "\033[31m"
colorGreen = "\033[32m"
colorYellow = "\033[33m"
colorBlue = "\033[34m"
colorPurple = "\033[35m"
colorCyan = "\033[36m"
colorGray = "\033[37m"
colorWhite = "\033[97m"
bold = "\033[1m"
)
// ColorizedHandler is a custom slog.Handler that outputs colorized logs
type ColorizedHandler struct {
w io.Writer
level slog.Level
timeKey string
msgKey string
}
// NewColorizedHandler creates a new ColorizedHandler
func NewColorizedHandler(w io.Writer, opts *slog.HandlerOptions) *ColorizedHandler {
if opts == nil {
opts = &slog.HandlerOptions{}
}
return &ColorizedHandler{
w: w,
level: opts.Level.Level(),
timeKey: "time",
msgKey: slog.MessageKey,
}
}
// Enabled implements slog.Handler
func (h *ColorizedHandler) Enabled(ctx context.Context, level slog.Level) bool {
return level >= h.level
}
// Handle implements slog.Handler
func (h *ColorizedHandler) Handle(ctx context.Context, r slog.Record) error {
// Skip logs below our level
if !h.Enabled(ctx, r.Level) {
return nil
}
// Format time with milliseconds
timeStr := r.Time.Format("15:04:05.000")
// Get component from attributes
var component string
var message string
// Store other attributes for printing later
attributes := make(map[string]interface{})
r.Attrs(func(a slog.Attr) bool {
if a.Key == "component" {
component = a.Value.String()
return true
}
if a.Key == h.msgKey {
message = a.Value.String()
return true
}
// Skip internal or empty values
if a.Key == h.timeKey || a.Key == "level" || a.Value.String() == "" {
return true
}
attributes[a.Key] = a.Value.Any()
return true
})
// Format level with color
var levelColor string
var levelText string
switch r.Level {
case slog.LevelDebug:
levelColor = colorGray
levelText = "DBG"
case slog.LevelInfo:
levelColor = colorGreen
levelText = "INF"
case slog.LevelWarn:
levelColor = colorYellow
levelText = "WRN"
case slog.LevelError:
levelColor = colorRed
levelText = "ERR"
default:
levelColor = colorReset
levelText = "???"
}
// Build the log line
var sb strings.Builder
// Timestamp with gray color
sb.WriteString(colorGray)
sb.WriteString("[")
sb.WriteString(timeStr)
sb.WriteString("]")
sb.WriteString(colorReset)
sb.WriteString(" ")
// Level with appropriate color
sb.WriteString(levelColor)
sb.WriteString(levelText)
sb.WriteString(colorReset)
sb.WriteString(" ")
// Component in blue
if component != "" {
sb.WriteString(colorBlue)
sb.WriteString("[")
sb.WriteString(component)
sb.WriteString("]")
sb.WriteString(colorReset)
sb.WriteString(" ")
}
// Message in white+bold
if message != "" {
sb.WriteString(bold)
sb.WriteString(colorWhite)
sb.WriteString(message)
sb.WriteString(colorReset)
sb.WriteString(" ")
}
// Sort keys for consistent output
keys := make([]string, 0, len(attributes))
for k := range attributes {
keys = append(keys, k)
}
sort.Strings(keys)
// Add attributes as key=value pairs with colors
for _, k := range keys {
v := attributes[k]
sb.WriteString(colorCyan) // Key in cyan
sb.WriteString(k)
sb.WriteString(colorReset)
sb.WriteString("=")
// Value color depends on type
switch v := v.(type) {
case string:
sb.WriteString(colorYellow) // Strings in yellow
sb.WriteString(v)
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64:
sb.WriteString(colorPurple) // Numbers in purple
sb.WriteString(fmt.Sprintf("%v", v))
case bool:
if v {
sb.WriteString(colorGreen) // true in green
} else {
sb.WriteString(colorRed) // false in red
}
sb.WriteString(fmt.Sprintf("%v", v))
case error:
sb.WriteString(colorRed) // Errors in red
sb.WriteString(v.Error())
default:
sb.WriteString(colorReset) // Other types with no color
sb.WriteString(fmt.Sprintf("%v", v))
}
sb.WriteString(colorReset)
sb.WriteString(" ")
}
sb.WriteString("\n")
// Write to output
_, err := io.WriteString(h.w, sb.String())
return err
}
// WithAttrs implements slog.Handler
func (h *ColorizedHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
// This is a simplified implementation that doesn't actually store the attrs
// In a real implementation, you would create a new handler with these attrs
return h
}
// WithGroup implements slog.Handler
func (h *ColorizedHandler) WithGroup(name string) slog.Handler {
// This is a simplified implementation that doesn't handle groups
return h
}
func setupDatabase() error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS articles (
link TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT,
published TIMESTAMP NOT NULL,
originalDate TIMESTAMP,
source TEXT NOT NULL,
firstseen TIMESTAMP NOT NULL,
seen TIMESTAMP,
summary TEXT,
importance INTEGER,
id TEXT,
broadcastTime TIMESTAMP
)
`)
if err != nil {
return err
}
// Create logs table for structured log entries
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS logs (
id TEXT PRIMARY KEY,
timestamp TIMESTAMP NOT NULL,
log JSON NOT NULL
)
`)
if err != nil {
return err
}
// Create index on timestamp for efficient querying and deletion
_, err = db.Exec(`
CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs (timestamp)
`)
if err != nil {
return err
}
// Check if columns exist
rows, err := db.Query(`PRAGMA table_info(articles)`)
if err != nil {
return err
}
defer rows.Close()
hasIDColumn := false
hasBroadcastTimeColumn := false
hasOriginalDateColumn := false
for rows.Next() {
var cid, notnull, pk int
var name, type_name string
var dflt_value interface{}
if err := rows.Scan(&cid, &name, &type_name, &notnull, &dflt_value, &pk); err != nil {
return err
}
if name == "id" {
hasIDColumn = true
}
if name == "broadcastTime" {
hasBroadcastTimeColumn = true
}
if name == "originalDate" {
hasOriginalDateColumn = true
}
}
// Add missing columns if needed
if !hasIDColumn {
_, err = db.Exec(`ALTER TABLE articles ADD COLUMN id TEXT`)
if err != nil {
return err
}
}
if !hasBroadcastTimeColumn {
_, err = db.Exec(`ALTER TABLE articles ADD COLUMN broadcastTime TIMESTAMP`)
if err != nil {
return err
}
}
if !hasOriginalDateColumn {
_, err = db.Exec(`ALTER TABLE articles ADD COLUMN originalDate TIMESTAMP`)
if err != nil {
return err
}
logInfo("db", "Added originalDate column to articles table", nil)
}
return nil
}
// Generate a deterministic ID from a URL
func generateID(url string) string {
hash := sha256.Sum256([]byte(url))
return hex.EncodeToString(hash[:])[:26] // Return first 26 chars of the hash
}
func saveArticle(article Article) error {
_, err := db.Exec(`
INSERT OR IGNORE INTO articles
(link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
article.Link, article.Title, article.Description, article.Published, article.OriginalDate,
article.Source, article.FirstSeen, article.Seen, article.Summary, article.Importance, article.ID,
article.BroadcastTime)
if err != nil {
logEvent("db_insert_error", map[string]interface{}{
"article": article.Link,
"id": article.ID,
"error": err.Error(),
})
}
return err
}
func updateArticle(article Article) error {
_, err := db.Exec(`
UPDATE articles SET
title = ?,
description = ?,
published = ?,
originalDate = ?,
source = ?,
firstseen = ?,
seen = ?,
summary = ?,
importance = ?,
id = ?,
broadcastTime = ?
WHERE link = ?
`,
article.Title, article.Description, article.Published, article.OriginalDate, article.Source,
article.FirstSeen, article.Seen, article.Summary, article.Importance, article.ID,
article.BroadcastTime, article.Link)
if err != nil {
logEvent("db_update_error", map[string]interface{}{
"article": article.Link,
"id": article.ID,
"error": err.Error(),
})
}
return err
}
func loadArticles() map[string]Article {
articles := make(map[string]Article)
rows, err := db.Query(`
SELECT link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime
FROM articles
`)
if err != nil {
logEvent("db_query_error", map[string]interface{}{
"error": err.Error(),
})
return articles
}
defer rows.Close()
for rows.Next() {
var a Article
var seen sql.NullTime
var broadcastTime sql.NullTime
var originalDate sql.NullTime
err := rows.Scan(
&a.Link, &a.Title, &a.Description, &a.Published, &originalDate, &a.Source,
&a.FirstSeen, &seen, &a.Summary, &a.Importance, &a.ID, &broadcastTime,
)
if err != nil {
logEvent("db_scan_error", map[string]interface{}{
"error": err.Error(),
})
continue
}
if seen.Valid {
a.Seen = seen.Time
}
if broadcastTime.Valid {
a.BroadcastTime = broadcastTime.Time
}
if originalDate.Valid {
a.OriginalDate = originalDate.Time
}
articles[a.Link] = a
}
return articles
}
// getBroadcastArticles is a common function for retrieving broadcast articles
// with consistent filtering criteria
func getBroadcastArticles(limit int) ([]Article, error) {
rows, err := db.Query(`
SELECT link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime
FROM articles
WHERE broadcastTime IS NOT NULL
AND broadcastTime > 1
AND broadcastTime != 0
AND datetime(broadcastTime) != '1970-01-01 00:00:00'
AND datetime(broadcastTime) != '0001-01-01 00:00:00'
AND strftime('%Y', broadcastTime) > '2000' -- Ensure year is at least 2000
ORDER BY broadcastTime DESC
LIMIT ?
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var articles []Article
for rows.Next() {
var a Article
var seen sql.NullTime
var broadcastTime sql.NullTime
var originalDate sql.NullTime
err := rows.Scan(
&a.Link, &a.Title, &a.Description, &a.Published, &originalDate, &a.Source,
&a.FirstSeen, &seen, &a.Summary, &a.Importance, &a.ID, &broadcastTime,
)
if err != nil {
continue
}
if seen.Valid {
a.Seen = seen.Time
}
if broadcastTime.Valid {
a.BroadcastTime = broadcastTime.Time
}
if originalDate.Valid {
a.OriginalDate = originalDate.Time
}
articles = append(articles, a)
}
return articles, nil
}
// getBroadcastHistory gets the most recent broadcast articles
func getBroadcastHistory(limit int) ([]Article, error) {
return getBroadcastArticles(limit)
}
// getNextUpArticles gets the top 25 articles eligible for broadcast sorted by importance
func getNextUpArticles() ([]Article, error) {
now := time.Now()
cutoff := now.Add(-ARTICLE_FRESHNESS_WINDOW) // Time window for considering articles fresh
rows, err := db.Query(`
SELECT link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime
FROM articles
WHERE broadcastTime IS NULL
AND summary IS NOT NULL
AND importance > 0
AND firstseen > ?
ORDER BY importance DESC
LIMIT 25
`, cutoff)
if err != nil {
return nil, err
}
defer rows.Close()
var articles []Article
for rows.Next() {
var a Article
var seen sql.NullTime
var broadcastTime sql.NullTime
var originalDate sql.NullTime
err := rows.Scan(
&a.Link, &a.Title, &a.Description, &a.Published, &originalDate, &a.Source,
&a.FirstSeen, &seen, &a.Summary, &a.Importance, &a.ID, &broadcastTime,
)
if err != nil {
continue
}
if seen.Valid {
a.Seen = seen.Time
}
if broadcastTime.Valid {
a.BroadcastTime = broadcastTime.Time
}
if originalDate.Valid {
a.OriginalDate = originalDate.Time
}
articles = append(articles, a)
}
return articles, nil
}
// getRecentBroadcasts retrieves the n most recently broadcast articles
func getRecentBroadcasts(n int) []Article {
articles, err := getBroadcastArticles(n)
if err != nil {
logInfo("db", "Error retrieving recent broadcasts", map[string]interface{}{
"error": err.Error(),
})
return []Article{}
}
return articles
}
func setupLogging() {
var err error
logFile, err = os.Create(logPath)
if err != nil {
slog.Error("Could not create log file", "error", err)
os.Exit(1)
}
// Set up structured logger with custom colorized handler for console
// and JSON handler for file logging
consoleHandler := NewColorizedHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelDebug,
})
// Use the custom handler
slog.SetDefault(slog.New(consoleHandler))
}
func flushLog() {
logMutex.Lock()
defer logMutex.Unlock()
if logFile == nil {
return
}
enc := json.NewEncoder(logFile)
enc.SetIndent("", " ")
_ = enc.Encode(logData)
logFile.Close()
}
// logEvent logs a structured message to both console and database
func logEvent(event string, details map[string]interface{}) {
logMutex.Lock()
defer logMutex.Unlock()
// Set timestamp if not already provided
if _, exists := details["timestamp"]; !exists {
details["timestamp"] = time.Now()
}
// Set event if not already in details
if _, exists := details["event"]; !exists {
details["event"] = event
}
timestamp := time.Now()
entry := LogEntry{
Timestamp: timestamp,
Event: event,
Details: details,
}
// Add to the permanent log data (for file-based logging)
logData = append(logData, entry)
// Only try to store in database if the database is initialized
if db == nil {
slog.Debug("Skipping database log storage - database not yet initialized")
return
}
// Store log in database
logBytes, err := json.Marshal(entry)
if err != nil {
slog.Error("Error marshaling log entry", "error", err)
return
}
// Generate ULID for the log entry
entropy := ulid.DefaultEntropy()
id := ulid.MustNew(ulid.Timestamp(timestamp), entropy).String()
// Insert into database
_, err = db.Exec("INSERT INTO logs (id, timestamp, log) VALUES (?, ?, ?)",
id, timestamp, string(logBytes))
if err != nil {
slog.Error("Error storing log in database", "error", err)
}
}
// getRecentLogs retrieves recent log entries from the database
func getRecentLogs(limit int) ([]LogEntry, error) {
rows, err := db.Query(`
SELECT log FROM logs
ORDER BY timestamp DESC
LIMIT ?
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var logs []LogEntry
for rows.Next() {
var logJSON string
if err := rows.Scan(&logJSON); err != nil {
return nil, err
}
var entry LogEntry
if err := json.Unmarshal([]byte(logJSON), &entry); err != nil {
return nil, err
}
logs = append(logs, entry)
}
return logs, nil
}
// cleanupOldLogs deletes logs older than one month
func cleanupOldLogs() error {
// Calculate cutoff date (one month ago)
cutoff := time.Now().AddDate(0, -1, 0)
result, err := db.Exec("DELETE FROM logs WHERE timestamp < ?", cutoff)
if err != nil {
return err
}
rowsDeleted, _ := result.RowsAffected()
if rowsDeleted > 0 {
logInfo("logs", "Deleted old log entries", map[string]interface{}{
"count": rowsDeleted,
"olderThan": "1 month",
"cutoffDate": cutoff.Format(time.RFC3339),
})
}
return nil
}
// logCleanupWorker runs periodically to clean up old logs
func logCleanupWorker(shutdown chan struct{}) {
logInfo("logs", "Starting log cleanup worker", map[string]interface{}{
"interval": "15 minutes",
"retention": "1 month",
})
// Run cleanup immediately on startup
if err := cleanupOldLogs(); err != nil {
logInfo("logs", "Error cleaning up old logs", map[string]interface{}{
"error": err.Error(),
})
}
// Then run on interval
ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := cleanupOldLogs(); err != nil {
logInfo("logs", "Error cleaning up old logs", map[string]interface{}{
"error": err.Error(),
})
}
case <-shutdown:
logInfo("logs", "Shutting down log cleanup worker", nil)
return
}
}
}
// logInfo logs a structured message to both console and log file
func logInfo(component string, message string, data map[string]interface{}) {
// Create a copy of the data map to avoid modifying the original
logData := make(map[string]interface{})
for k, v := range data {
logData[k] = v
}
// Add component and message to the log data
logData["component"] = component
logData["message"] = message
// Use slog for structured logging to console
attrs := []any{}
for k, v := range logData {
attrs = append(attrs, k, v)
}
slog.Info(message, attrs...)
// Log to structured log file and database
logEvent("info", logData)
}