Compare commits

..

10 Commits

11 changed files with 437 additions and 118 deletions

1
.gitignore vendored
View File

@ -3,3 +3,4 @@
.env .env
articles.db* articles.db*
gomeshalerter gomeshalerter
*.bak

View File

@ -1,10 +1,13 @@
default: clean run default: clean run
clean: clean:
rm -f gomeshalerter rm -f gomeshalerter *.json
run: gomeshalerter run: gomeshalerter
exec ./gomeshalerter exec ./gomeshalerter
gomeshalerter: *.go gomeshalerter: *.go
go build -o gomeshalerter . go build -o gomeshalerter .
lint:
golangci-lint run

View File

@ -10,15 +10,19 @@ import (
"time" "time"
) )
// broadcaster runs on startup and every hour to select and broadcast the most important article // broadcaster runs on startup and frequently checks if a broadcast is due
func broadcaster(shutdown chan struct{}, dryRun bool) { func broadcaster(shutdown chan struct{}, dryRun bool) {
logInfo("broadcaster", fmt.Sprintf("Starting broadcaster (waiting %d seconds before first broadcast)", int(STARTUP_DELAY.Seconds())), map[string]interface{}{ logInfo("broadcaster", "Starting broadcaster", map[string]interface{}{
"interval": BROADCAST_INTERVAL.String(), "startupDelay": int(STARTUP_DELAY.Seconds()),
"dryRun": dryRun, "checkInterval": BROADCAST_CHECK_INTERVAL.String(),
"broadcastWindow": BROADCAST_INTERVAL.String(),
"dryRun": dryRun,
}) })
// Sleep on startup // Sleep on startup
logInfo("broadcaster", fmt.Sprintf("Sleeping for %d seconds before first broadcast", int(STARTUP_DELAY.Seconds())), nil) logInfo("broadcaster", "Sleeping before first broadcast check", map[string]interface{}{
"seconds": int(STARTUP_DELAY.Seconds()),
})
select { select {
case <-time.After(STARTUP_DELAY): case <-time.After(STARTUP_DELAY):
// Continue after sleep // Continue after sleep
@ -27,17 +31,32 @@ func broadcaster(shutdown chan struct{}, dryRun bool) {
return return
} }
// Run immediately after initial sleep // Track when the last broadcast happened
checkAndBroadcast(dryRun) var lastBroadcastTime time.Time
// Then run on interval // Run checks frequently
ticker := time.NewTicker(BROADCAST_INTERVAL) ticker := time.NewTicker(BROADCAST_CHECK_INTERVAL)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
checkAndBroadcast(dryRun) now := time.Now()
timeSinceLastBroadcast := now.Sub(lastBroadcastTime)
// If it's been at least BROADCAST_INTERVAL since last broadcast
// or if we haven't broadcast yet (lastBroadcastTime is zero)
if lastBroadcastTime.IsZero() || timeSinceLastBroadcast >= BROADCAST_INTERVAL {
logInfo("broadcaster", "Broadcast window reached, checking conditions", map[string]interface{}{
"timeSinceLastBroadcast": timeSinceLastBroadcast.String(),
"requiredInterval": BROADCAST_INTERVAL.String(),
})
// Only update lastBroadcastTime if we actually broadcast something
if didBroadcast := checkAndBroadcast(dryRun); didBroadcast {
lastBroadcastTime = now
}
}
case <-shutdown: case <-shutdown:
logInfo("broadcaster", "Shutting down broadcaster", nil) logInfo("broadcaster", "Shutting down broadcaster", nil)
return return
@ -46,7 +65,8 @@ func broadcaster(shutdown chan struct{}, dryRun bool) {
} }
// checkAndBroadcast checks if there are any unsummarized articles before broadcasting // checkAndBroadcast checks if there are any unsummarized articles before broadcasting
func checkAndBroadcast(dryRun bool) { // Returns true if a broadcast was made
func checkAndBroadcast(dryRun bool) bool {
// Check if there are any unsummarized articles // Check if there are any unsummarized articles
articles := loadArticles() articles := loadArticles()
unsummarizedCount := 0 unsummarizedCount := 0
@ -62,15 +82,16 @@ func checkAndBroadcast(dryRun bool) {
"unsummarizedCount": unsummarizedCount, "unsummarizedCount": unsummarizedCount,
"totalArticles": len(articles), "totalArticles": len(articles),
}) })
return return false
} }
// No unsummarized articles, proceed with broadcast // No unsummarized articles, proceed with broadcast
broadcastWithRedundancyCheck(dryRun) return broadcastWithRedundancyCheck(dryRun)
} }
// broadcastWithRedundancyCheck attempts to find a non-redundant article to broadcast // broadcastWithRedundancyCheck attempts to find a non-redundant article to broadcast
func broadcastWithRedundancyCheck(dryRun bool) { // Returns true if a broadcast was made
func broadcastWithRedundancyCheck(dryRun bool) bool {
articles := loadArticles() articles := loadArticles()
now := time.Now() now := time.Now()
cutoff := now.Add(-ARTICLE_FRESHNESS_WINDOW) // Time window for considering articles fresh cutoff := now.Add(-ARTICLE_FRESHNESS_WINDOW) // Time window for considering articles fresh
@ -87,7 +108,7 @@ func broadcastWithRedundancyCheck(dryRun bool) {
logInfo("broadcaster", "No fresh articles found for broadcasting", map[string]interface{}{ logInfo("broadcaster", "No fresh articles found for broadcasting", map[string]interface{}{
"totalArticles": len(articles), "totalArticles": len(articles),
}) })
return return false
} }
// Sort by importance // Sort by importance
@ -111,7 +132,7 @@ func broadcastWithRedundancyCheck(dryRun bool) {
}) })
// Continue with this candidate despite the error // Continue with this candidate despite the error
broadcastArticle(candidate, dryRun) broadcastArticle(candidate, dryRun)
return return true
} }
if isRedundant { if isRedundant {
@ -138,13 +159,14 @@ func broadcastWithRedundancyCheck(dryRun bool) {
"candidateNumber": i + 1, "candidateNumber": i + 1,
}) })
broadcastArticle(candidate, dryRun) broadcastArticle(candidate, dryRun)
return return true
} }
// If we got here, all candidates were redundant // If we got here, all candidates were redundant
logInfo("broadcaster", "All candidates were deemed redundant, no broadcast", map[string]interface{}{ logInfo("broadcaster", "All candidates were deemed redundant, no broadcast", map[string]interface{}{
"candidatesChecked": len(candidates), "candidatesChecked": len(candidates),
}) })
return false
} }
// broadcastArticle broadcasts the chosen article // broadcastArticle broadcasts the chosen article
@ -169,7 +191,8 @@ func broadcastArticle(chosen Article, dryRun bool) {
maxSummaryLen := MAX_MESSAGE_LENGTH - len(ts) - len(sourceAbbr) - len(": [] [AI/LLM] ") - 3 // 3 for "..." maxSummaryLen := MAX_MESSAGE_LENGTH - len(ts) - len(sourceAbbr) - len(": [] [AI/LLM] ") - 3 // 3 for "..."
truncatedSummary := strings.TrimSpace(chosen.Summary)[:maxSummaryLen] + "..." truncatedSummary := strings.TrimSpace(chosen.Summary)[:maxSummaryLen] + "..."
msg = fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, truncatedSummary) msg = fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, truncatedSummary)
logInfo("broadcaster", fmt.Sprintf("Message truncated to fit %d character limit", MAX_MESSAGE_LENGTH), map[string]interface{}{ logInfo("broadcaster", "Message truncated to fit character limit", map[string]interface{}{
"limit": MAX_MESSAGE_LENGTH,
"originalLength": len(chosen.Summary), "originalLength": len(chosen.Summary),
"truncatedLength": maxSummaryLen, "truncatedLength": maxSummaryLen,
}) })
@ -189,7 +212,9 @@ func broadcastArticle(chosen Article, dryRun bool) {
}) })
// Wait before broadcasting to allow time to see the message // Wait before broadcasting to allow time to see the message
logInfo("broadcaster", fmt.Sprintf("Waiting %d seconds before broadcasting...", int(BROADCAST_PREPARATION_DELAY.Seconds())), nil) logInfo("broadcaster", "Waiting before broadcasting", map[string]interface{}{
"seconds": int(BROADCAST_PREPARATION_DELAY.Seconds()),
})
time.Sleep(BROADCAST_PREPARATION_DELAY) time.Sleep(BROADCAST_PREPARATION_DELAY)
// Update broadcast time and save to database // Update broadcast time and save to database

75
device.go Normal file
View File

@ -0,0 +1,75 @@
package main
import (
"os"
"os/exec"
"time"
)
// deviceManager handles periodic maintenance of the Meshtastic device
func deviceManager(shutdown chan struct{}, dryRun bool) {
logInfo("device", "Starting device manager", map[string]interface{}{
"rebootInterval": DEVICE_REBOOT_INTERVAL.String(),
"dryRun": dryRun,
})
// Wait some time before first reboot to allow system startup
initialDelay := 5 * time.Minute
logInfo("device", "Waiting before first device reboot", map[string]interface{}{
"initialDelay": initialDelay.String(),
})
select {
case <-time.After(initialDelay):
// Continue after delay
case <-shutdown:
logInfo("device", "Shutdown signal received during initial delay", nil)
return
}
// Reboot immediately after startup delay
rebootDevice(dryRun)
// Then run on interval
ticker := time.NewTicker(DEVICE_REBOOT_INTERVAL)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rebootDevice(dryRun)
case <-shutdown:
logInfo("device", "Shutting down device manager", nil)
return
}
}
}
// rebootDevice executes the meshtastic --reboot command
func rebootDevice(dryRun bool) {
cmdStr := "meshtastic --reboot"
logInfo("device", "Rebooting Meshtastic device", map[string]interface{}{
"command": cmdStr,
"dryRun": dryRun,
"time": time.Now().Format("15:04:05 MST"),
})
if dryRun {
logInfo("device", "DRY RUN - would execute device reboot", nil)
return
}
// Execute the reboot command
cmd := exec.Command("meshtastic", "--reboot")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
logInfo("device", "Device reboot failed", map[string]interface{}{
"error": err.Error(),
})
} else {
logInfo("device", "Device reboot successful", nil)
}
}

1
go.mod
View File

@ -15,6 +15,7 @@ require (
github.com/mmcdole/goxpp v1.1.1-0.20240225020742-a0c311522b23 // indirect github.com/mmcdole/goxpp v1.1.1-0.20240225020742-a0c311522b23 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/oklog/ulid/v2 v2.1.1 // indirect
golang.org/x/net v0.4.0 // indirect golang.org/x/net v0.4.0 // indirect
golang.org/x/text v0.5.0 // indirect golang.org/x/text v0.5.0 // indirect
) )

3
go.sum
View File

@ -21,6 +21,9 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s=
github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

20
llm.go
View File

@ -25,7 +25,9 @@ type SummaryResult struct {
// articleSummarizer checks for articles without summaries every 10 seconds and processes them in batches // articleSummarizer checks for articles without summaries every 10 seconds and processes them in batches
func articleSummarizer(shutdown chan struct{}, ollamaURL, ollamaModel string) { func articleSummarizer(shutdown chan struct{}, ollamaURL, ollamaModel string) {
fmt.Fprintf(os.Stderr, "[summarizer] Starting article summarizer (interval: %s)\n", SUMMARIZE_INTERVAL) logInfo("summarizer", "Starting article summarizer", map[string]interface{}{
"interval": SUMMARIZE_INTERVAL.String(),
})
ticker := time.NewTicker(SUMMARIZE_INTERVAL) ticker := time.NewTicker(SUMMARIZE_INTERVAL)
defer ticker.Stop() defer ticker.Stop()
@ -35,7 +37,7 @@ func articleSummarizer(shutdown chan struct{}, ollamaURL, ollamaModel string) {
case <-ticker.C: case <-ticker.C:
summarizeArticles(ollamaURL, ollamaModel) summarizeArticles(ollamaURL, ollamaModel)
case <-shutdown: case <-shutdown:
fmt.Fprintf(os.Stderr, "[summarizer] Shutting down article summarizer\n") logInfo("summarizer", "Shutting down article summarizer", nil)
return return
} }
} }
@ -84,9 +86,11 @@ func summarizeArticles(ollamaURL, ollamaModel string) {
batchInfo = append(batchInfo, a.ID[:8]) batchInfo = append(batchInfo, a.ID[:8])
} }
logInfo("summarizer", fmt.Sprintf("Processing batch %d to %d", i+1, end), map[string]interface{}{ logInfo("summarizer", "Processing batch", map[string]interface{}{
"batchSize": len(batch), "batchSize": len(batch),
"batchIds": strings.Join(batchInfo, ","), "batchIds": strings.Join(batchInfo, ","),
"startItem": i + 1,
"endItem": end,
}) })
startTime := time.Now() startTime := time.Now()
@ -106,6 +110,16 @@ func summarizeArticles(ollamaURL, ollamaModel string) {
for id, result := range summaries { for id, result := range summaries {
for _, article := range batch { for _, article := range batch {
if article.ID == id { if article.ID == id {
// Log the summary details for each article
logInfo("summary_result", "LLM generated summary", map[string]interface{}{
"id": article.ID,
"title": article.Title,
"summary": result.Summary,
"importance": result.Importance,
"source": article.Source,
"length": len(result.Summary),
})
article.Summary = result.Summary article.Summary = result.Summary
article.Importance = result.Importance article.Importance = result.Importance
if err := updateArticle(article); err != nil { if err := updateArticle(article); err != nil {

40
main.go
View File

@ -3,8 +3,7 @@ package main
import ( import (
"database/sql" "database/sql"
"flag" "flag"
"fmt" "log/slog"
"log"
"os" "os"
"os/signal" "os/signal"
"sync" "sync"
@ -25,30 +24,37 @@ var (
) )
func main() { func main() {
fmt.Fprintf(os.Stderr, "[%s] starting gomeshalerter\n", runStart.Format("15:04:05"))
setupLogging() setupLogging()
defer flushLog() defer flushLog()
logInfo("main", "Starting gomeshalerter", map[string]interface{}{
"timestamp": runStart.Format("15:04:05"),
})
var err error var err error
db, err = sql.Open("sqlite3", dbPath+"?_journal=WAL") // Use WAL mode for better concurrency db, err = sql.Open("sqlite3", dbPath+"?_journal=WAL") // Use WAL mode for better concurrency
if err != nil { if err != nil {
log.Fatalf("Failed to open database: %v", err) slog.Error("Failed to open database", "error", err)
os.Exit(1)
} }
// Define a cleanup function to properly close resources // Define a cleanup function to properly close resources
cleanup := func() { cleanup := func() {
fmt.Fprintf(os.Stderr, "[shutdown] Closing database...\n") logInfo("shutdown", "Closing database", nil)
if err := db.Close(); err != nil { if err := db.Close(); err != nil {
fmt.Fprintf(os.Stderr, "[shutdown] Error closing database: %v\n", err) logInfo("shutdown", "Error closing database", map[string]interface{}{
"error": err.Error(),
})
} }
flushLog() flushLog()
fmt.Fprintf(os.Stderr, "[shutdown] Cleanup complete\n") logInfo("shutdown", "Cleanup complete", nil)
} }
// Ensure cleanup runs on normal exit // Ensure cleanup runs on normal exit
defer cleanup() defer cleanup()
if err := setupDatabase(); err != nil { if err := setupDatabase(); err != nil {
log.Fatalf("Failed to setup database: %v", err) slog.Error("Failed to setup database", "error", err)
os.Exit(1)
} }
ollamaModel := "qwen3:32b" ollamaModel := "qwen3:32b"
@ -57,7 +63,10 @@ func main() {
ollamaURL = "http://localhost:11434" // Default Ollama server URL ollamaURL = "http://localhost:11434" // Default Ollama server URL
} }
fmt.Fprintf(os.Stderr, "[ollama] Using model: %s at %s\n", ollamaModel, ollamaURL) logInfo("ollama", "Using model", map[string]interface{}{
"model": ollamaModel,
"url": ollamaURL,
})
// Replace --broadcast flag with --dry-run flag (default is to broadcast) // Replace --broadcast flag with --dry-run flag (default is to broadcast)
dryRun := flag.Bool("dry-run", false, "don't actually send to meshtastic, just print what would be sent") dryRun := flag.Bool("dry-run", false, "don't actually send to meshtastic, just print what would be sent")
@ -75,9 +84,9 @@ func main() {
go func() { go func() {
<-sigChan <-sigChan
fmt.Fprintf(os.Stderr, "[shutdown] Received signal, performing cleanup before exit...\n") logInfo("shutdown", "Received signal, performing cleanup before exit", nil)
cleanup() cleanup()
fmt.Fprintf(os.Stderr, "[shutdown] Exiting...\n") logInfo("shutdown", "Exiting", nil)
os.Exit(0) // Exit after cleanup os.Exit(0) // Exit after cleanup
}() }()
@ -116,7 +125,14 @@ func main() {
logCleanupWorker(shutdown) logCleanupWorker(shutdown)
}() }()
// Start device manager goroutine for periodic device maintenance
wg.Add(1)
go func() {
defer wg.Done()
deviceManager(shutdown, *dryRun)
}()
// Wait for all goroutines to finish // Wait for all goroutines to finish
wg.Wait() wg.Wait()
fmt.Fprintf(os.Stderr, "[shutdown] All goroutines stopped, exiting...\n") logInfo("shutdown", "All goroutines stopped, exiting", nil)
} }

View File

@ -51,16 +51,19 @@ Never rate over 90 unless it is a massive event such as: war outbreak,
revolution, death of a head of state, large-scale natural disaster, mass revolution, death of a head of state, large-scale natural disaster, mass
casualty terrorism, etc. casualty terrorism, etc.
IMPORTANT: Rank any headlines primarily promoting commercial products or Rank any headlines primarily promoting commercial products or
services as 1 (lowest importance). services as 1 (lowest importance).
Rank any article with a headline that poses a question without providing an Rank any article with a headline that poses a question without providing an
answer (as an attempt to lure a reader into clicking a link) as 1 (lowest answer (as an attempt to lure a reader into clicking a link) as 1 (lowest
importance). importance).
IMPORTANT: Boost the importance score by 10-20 points for breaking news that is less Boost the importance score by 10 points for breaking news that is less than 60
than 60 minutes old based on its original publication date (which is provided for each article). minutes old based on its original publication date (which is provided for each
This helps ensure timely distribution of very recent news. article), but only for events that need to be reported in minutes, such as
emeregencies or other critical breaking news.
Do not editorialize or otherwise label the summary.
For each article, return a JSON object with "id", "summary", and "importance" For each article, return a JSON object with "id", "summary", and "importance"
fields. Return your response as a JSON array of objects like: [{"id": fields. Return your response as a JSON array of objects like: [{"id":
@ -70,13 +73,15 @@ Here are the articles:
` `
SYSTEM_PROMPT = "You are a news analyst." SYSTEM_PROMPT = "You are a news analyst."
BATCH_SIZE = 10 BATCH_SIZE = 5
MAX_INDIVIDUAL_PROCESSING = 50 MAX_INDIVIDUAL_PROCESSING = 50
// Timing constants // Timing constants
RSS_CHECK_INTERVAL = 15 * time.Minute RSS_CHECK_INTERVAL = 15 * time.Minute
SUMMARIZE_INTERVAL = 10 * time.Second SUMMARIZE_INTERVAL = 10 * time.Second
BROADCAST_INTERVAL = 1 * time.Hour BROADCAST_INTERVAL = 1 * time.Hour
BROADCAST_CHECK_INTERVAL = 10 * time.Second // Interval to check if broadcasting is needed
DEVICE_REBOOT_INTERVAL = 6 * time.Hour // Interval to reboot Meshtastic device
STARTUP_DELAY = 60 * time.Second // Delay before first broadcast STARTUP_DELAY = 60 * time.Second // Delay before first broadcast
BROADCAST_PREPARATION_DELAY = 30 * time.Second // Delay before executing broadcast command BROADCAST_PREPARATION_DELAY = 30 * time.Second // Delay before executing broadcast command
ARTICLE_FRESHNESS_WINDOW = 24 * time.Hour // Time window for considering articles fresh ARTICLE_FRESHNESS_WINDOW = 24 * time.Hour // Time window for considering articles fresh

27
rss.go
View File

@ -1,9 +1,7 @@
package main package main
import ( import (
"fmt"
"net/http" "net/http"
"os"
"sync" "sync"
"time" "time"
@ -53,7 +51,9 @@ var feeds = map[string]string{
// rssFeedChecker checks RSS feeds every 15 minutes and adds new articles to the database // rssFeedChecker checks RSS feeds every 15 minutes and adds new articles to the database
func rssFeedChecker(shutdown chan struct{}, ollamaURL, ollamaModel string) { func rssFeedChecker(shutdown chan struct{}, ollamaURL, ollamaModel string) {
fmt.Fprintf(os.Stderr, "[rss] Starting RSS feed checker (interval: %s)\n", RSS_CHECK_INTERVAL) logInfo("rss", "Starting RSS feed checker", map[string]interface{}{
"interval": RSS_CHECK_INTERVAL.String(),
})
// Run immediately on startup // Run immediately on startup
checkRSSFeeds() checkRSSFeeds()
@ -67,7 +67,7 @@ func rssFeedChecker(shutdown chan struct{}, ollamaURL, ollamaModel string) {
case <-ticker.C: case <-ticker.C:
checkRSSFeeds() checkRSSFeeds()
case <-shutdown: case <-shutdown:
fmt.Fprintf(os.Stderr, "[rss] Shutting down RSS feed checker\n") logInfo("rss", "Shutting down RSS feed checker", nil)
return return
} }
} }
@ -100,8 +100,9 @@ func checkRSSFeeds() {
}) })
} }
newCount++ newCount++
logInfo("new", fmt.Sprintf("Found new article: %s", a.Title), map[string]interface{}{ logInfo("new", "Found new article", map[string]interface{}{
"id": a.ID, "id": a.ID,
"title": a.Title,
"source": a.Source, "source": a.Source,
"published": a.Published.Format(time.RFC3339), "published": a.Published.Format(time.RFC3339),
}) })
@ -160,13 +161,23 @@ func fetchAllFeedsParallel(now time.Time) []Article {
logEvent("rss_fetch_result", details) logEvent("rss_fetch_result", details)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "[rss] FAIL %-15s (%s) [%.2fs] ERR: %v\n", source, url, duration.Seconds(), err) logInfo("rss", "Feed fetch failed", map[string]interface{}{
"source": source,
"url": url,
"duration": duration.Seconds(),
"error": err.Error(),
})
results <- fetchResult{Source: source, URL: url, Err: err, Duration: duration, HTTPStatus: httpStatus} results <- fetchResult{Source: source, URL: url, Err: err, Duration: duration, HTTPStatus: httpStatus}
return return
} }
fmt.Fprintf(os.Stderr, "[rss] OK %-15s (%s) [%.2fs] HTTP %d, items: %d\n", logInfo("rss", "Feed fetch succeeded", map[string]interface{}{
source, url, duration.Seconds(), httpStatus, len(feed.Items)) "source": source,
"url": url,
"duration": duration.Seconds(),
"status": httpStatus,
"items": len(feed.Items),
})
results <- fetchResult{ results <- fetchResult{
Source: source, Source: source,

View File

@ -1,18 +1,213 @@
package main package main
import ( import (
"context"
"crypto/sha256" "crypto/sha256"
"database/sql" "database/sql"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"log/slog"
"os" "os"
"sort" "sort"
"strings"
"time" "time"
"github.com/oklog/ulid/v2" "github.com/oklog/ulid/v2"
) )
// ANSI color codes
const (
colorReset = "\033[0m"
colorRed = "\033[31m"
colorGreen = "\033[32m"
colorYellow = "\033[33m"
colorBlue = "\033[34m"
colorPurple = "\033[35m"
colorCyan = "\033[36m"
colorGray = "\033[37m"
colorWhite = "\033[97m"
bold = "\033[1m"
)
// ColorizedHandler is a custom slog.Handler that outputs colorized logs
type ColorizedHandler struct {
w io.Writer
level slog.Level
timeKey string
msgKey string
}
// NewColorizedHandler creates a new ColorizedHandler
func NewColorizedHandler(w io.Writer, opts *slog.HandlerOptions) *ColorizedHandler {
if opts == nil {
opts = &slog.HandlerOptions{}
}
return &ColorizedHandler{
w: w,
level: opts.Level.Level(),
timeKey: "time",
msgKey: slog.MessageKey,
}
}
// Enabled implements slog.Handler
func (h *ColorizedHandler) Enabled(ctx context.Context, level slog.Level) bool {
return level >= h.level
}
// Handle implements slog.Handler
func (h *ColorizedHandler) Handle(ctx context.Context, r slog.Record) error {
// Skip logs below our level
if !h.Enabled(ctx, r.Level) {
return nil
}
// Format time with milliseconds
timeStr := r.Time.Format("15:04:05.000")
// Get component from attributes
var component string
var message string
// Store other attributes for printing later
attributes := make(map[string]interface{})
r.Attrs(func(a slog.Attr) bool {
if a.Key == "component" {
component = a.Value.String()
return true
}
if a.Key == h.msgKey {
message = a.Value.String()
return true
}
// Skip internal or empty values
if a.Key == h.timeKey || a.Key == "level" || a.Value.String() == "" {
return true
}
attributes[a.Key] = a.Value.Any()
return true
})
// Format level with color
var levelColor string
var levelText string
switch r.Level {
case slog.LevelDebug:
levelColor = colorGray
levelText = "DBG"
case slog.LevelInfo:
levelColor = colorGreen
levelText = "INF"
case slog.LevelWarn:
levelColor = colorYellow
levelText = "WRN"
case slog.LevelError:
levelColor = colorRed
levelText = "ERR"
default:
levelColor = colorReset
levelText = "???"
}
// Build the log line
var sb strings.Builder
// Timestamp with gray color
sb.WriteString(colorGray)
sb.WriteString("[")
sb.WriteString(timeStr)
sb.WriteString("]")
sb.WriteString(colorReset)
sb.WriteString(" ")
// Level with appropriate color
sb.WriteString(levelColor)
sb.WriteString(levelText)
sb.WriteString(colorReset)
sb.WriteString(" ")
// Component in blue
if component != "" {
sb.WriteString(colorBlue)
sb.WriteString("[")
sb.WriteString(component)
sb.WriteString("]")
sb.WriteString(colorReset)
sb.WriteString(" ")
}
// Message in white+bold
if message != "" {
sb.WriteString(bold)
sb.WriteString(colorWhite)
sb.WriteString(message)
sb.WriteString(colorReset)
sb.WriteString(" ")
}
// Sort keys for consistent output
keys := make([]string, 0, len(attributes))
for k := range attributes {
keys = append(keys, k)
}
sort.Strings(keys)
// Add attributes as key=value pairs with colors
for _, k := range keys {
v := attributes[k]
sb.WriteString(colorCyan) // Key in cyan
sb.WriteString(k)
sb.WriteString(colorReset)
sb.WriteString("=")
// Value color depends on type
switch v := v.(type) {
case string:
sb.WriteString(colorYellow) // Strings in yellow
sb.WriteString(v)
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64:
sb.WriteString(colorPurple) // Numbers in purple
sb.WriteString(fmt.Sprintf("%v", v))
case bool:
if v {
sb.WriteString(colorGreen) // true in green
} else {
sb.WriteString(colorRed) // false in red
}
sb.WriteString(fmt.Sprintf("%v", v))
case error:
sb.WriteString(colorRed) // Errors in red
sb.WriteString(v.Error())
default:
sb.WriteString(colorReset) // Other types with no color
sb.WriteString(fmt.Sprintf("%v", v))
}
sb.WriteString(colorReset)
sb.WriteString(" ")
}
sb.WriteString("\n")
// Write to output
_, err := io.WriteString(h.w, sb.String())
return err
}
// WithAttrs implements slog.Handler
func (h *ColorizedHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
// This is a simplified implementation that doesn't actually store the attrs
// In a real implementation, you would create a new handler with these attrs
return h
}
// WithGroup implements slog.Handler
func (h *ColorizedHandler) WithGroup(name string) slog.Handler {
// This is a simplified implementation that doesn't handle groups
return h
}
func setupDatabase() error { func setupDatabase() error {
_, err := db.Exec(` _, err := db.Exec(`
CREATE TABLE IF NOT EXISTS articles ( CREATE TABLE IF NOT EXISTS articles (
@ -217,8 +412,9 @@ func loadArticles() map[string]Article {
return articles return articles
} }
// getBroadcastHistory gets the most recent broadcast articles // getBroadcastArticles is a common function for retrieving broadcast articles
func getBroadcastHistory(limit int) ([]Article, error) { // with consistent filtering criteria
func getBroadcastArticles(limit int) ([]Article, error) {
rows, err := db.Query(` rows, err := db.Query(`
SELECT link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime SELECT link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime
FROM articles FROM articles
@ -271,6 +467,11 @@ func getBroadcastHistory(limit int) ([]Article, error) {
return articles, nil return articles, nil
} }
// getBroadcastHistory gets the most recent broadcast articles
func getBroadcastHistory(limit int) ([]Article, error) {
return getBroadcastArticles(limit)
}
// getNextUpArticles gets the top 25 articles eligible for broadcast sorted by importance // getNextUpArticles gets the top 25 articles eligible for broadcast sorted by importance
func getNextUpArticles() ([]Article, error) { func getNextUpArticles() ([]Article, error) {
now := time.Now() now := time.Now()
@ -328,66 +529,32 @@ func getNextUpArticles() ([]Article, error) {
// getRecentBroadcasts retrieves the n most recently broadcast articles // getRecentBroadcasts retrieves the n most recently broadcast articles
func getRecentBroadcasts(n int) []Article { func getRecentBroadcasts(n int) []Article {
rows, err := db.Query(` articles, err := getBroadcastArticles(n)
SELECT link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime
FROM articles
WHERE broadcastTime IS NOT NULL AND broadcastTime > 1
ORDER BY broadcastTime DESC
LIMIT ?
`, n)
if err != nil { if err != nil {
logInfo("db", "Error retrieving recent broadcasts", map[string]interface{}{ logInfo("db", "Error retrieving recent broadcasts", map[string]interface{}{
"error": err.Error(), "error": err.Error(),
}) })
return []Article{} return []Article{}
} }
defer rows.Close() return articles
var broadcasts []Article
for rows.Next() {
var a Article
var seen sql.NullTime
var broadcastTime sql.NullTime
var originalDate sql.NullTime
err := rows.Scan(
&a.Link, &a.Title, &a.Description, &a.Published, &originalDate, &a.Source,
&a.FirstSeen, &seen, &a.Summary, &a.Importance, &a.ID, &broadcastTime,
)
if err != nil {
logInfo("db", "Error scanning broadcast article", map[string]interface{}{
"error": err.Error(),
})
continue
}
if seen.Valid {
a.Seen = seen.Time
}
if broadcastTime.Valid {
a.BroadcastTime = broadcastTime.Time
}
if originalDate.Valid {
a.OriginalDate = originalDate.Time
}
broadcasts = append(broadcasts, a)
}
return broadcasts
} }
func setupLogging() { func setupLogging() {
var err error var err error
logFile, err = os.Create(logPath) logFile, err = os.Create(logPath)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "could not create log file: %v\n", err) slog.Error("Could not create log file", "error", err)
os.Exit(1) os.Exit(1)
} }
// Set up structured logger with custom colorized handler for console
// and JSON handler for file logging
consoleHandler := NewColorizedHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelDebug,
})
// Use the custom handler
slog.SetDefault(slog.New(consoleHandler))
} }
func flushLog() { func flushLog() {
@ -429,10 +596,16 @@ func logEvent(event string, details map[string]interface{}) {
// Add to the permanent log data (for file-based logging) // Add to the permanent log data (for file-based logging)
logData = append(logData, entry) logData = append(logData, entry)
// Only try to store in database if the database is initialized
if db == nil {
slog.Debug("Skipping database log storage - database not yet initialized")
return
}
// Store log in database // Store log in database
logBytes, err := json.Marshal(entry) logBytes, err := json.Marshal(entry)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Error marshaling log entry: %v\n", err) slog.Error("Error marshaling log entry", "error", err)
return return
} }
@ -444,7 +617,7 @@ func logEvent(event string, details map[string]interface{}) {
_, err = db.Exec("INSERT INTO logs (id, timestamp, log) VALUES (?, ?, ?)", _, err = db.Exec("INSERT INTO logs (id, timestamp, log) VALUES (?, ?, ?)",
id, timestamp, string(logBytes)) id, timestamp, string(logBytes))
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Error storing log in database: %v\n", err) slog.Error("Error storing log in database", "error", err)
} }
} }
@ -490,7 +663,11 @@ func cleanupOldLogs() error {
rowsDeleted, _ := result.RowsAffected() rowsDeleted, _ := result.RowsAffected()
if rowsDeleted > 0 { if rowsDeleted > 0 {
fmt.Fprintf(os.Stderr, "[logs] Deleted %d log entries older than one month\n", rowsDeleted) logInfo("logs", "Deleted old log entries", map[string]interface{}{
"count": rowsDeleted,
"olderThan": "1 month",
"cutoffDate": cutoff.Format(time.RFC3339),
})
} }
return nil return nil
@ -530,7 +707,7 @@ func logCleanupWorker(shutdown chan struct{}) {
} }
// logInfo logs a structured message to both console and log file // logInfo logs a structured message to both console and log file
func logInfo(component, message string, data map[string]interface{}) { func logInfo(component string, message string, data map[string]interface{}) {
// Create a copy of the data map to avoid modifying the original // Create a copy of the data map to avoid modifying the original
logData := make(map[string]interface{}) logData := make(map[string]interface{})
for k, v := range data { for k, v := range data {
@ -541,25 +718,13 @@ func logInfo(component, message string, data map[string]interface{}) {
logData["component"] = component logData["component"] = component
logData["message"] = message logData["message"] = message
// Format console output: timestamp component: message key1=val1 key2=val2 // Use slog for structured logging to console
timestamp := time.Now().Format("15:04:05.000") attrs := []any{}
console := fmt.Sprintf("[%s] [%s] %s", timestamp, component, message) for k, v := range logData {
attrs = append(attrs, k, v)
// Add key-value pairs to console output
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
} }
sort.Strings(keys) // Sort keys for consistent output slog.Info(message, attrs...)
for _, k := range keys { // Log to structured log file and database
v := data[k]
console += fmt.Sprintf(" %s=%v", k, v)
}
// Print to console
fmt.Fprintln(os.Stderr, console)
// Log to structured log file
logEvent("info", logData) logEvent("info", logData)
} }