gomeshalerter/main.go
2025-05-22 05:25:33 -07:00

1382 lines
38 KiB
Go

package main
import (
"bytes"
"crypto/sha256"
"database/sql"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"os/signal"
"sort"
"strings"
"sync"
"syscall"
"time"
_ "github.com/joho/godotenv/autoload"
_ "github.com/mattn/go-sqlite3"
"github.com/mmcdole/gofeed"
)
type Article struct {
Title string `json:"title"`
Description string `json:"description"`
Link string `json:"link"`
Published time.Time `json:"published"`
Source string `json:"source"`
FirstSeen time.Time `json:"firstseen"`
Seen time.Time `json:"seen"`
Summary string `json:"summary"`
Importance int `json:"importance"`
ID string `json:"id"`
BroadcastTime time.Time `json:"broadcastTime,omitempty"`
}
type LogEntry struct {
Timestamp time.Time `json:"timestamp"`
Event string `json:"event"`
Details map[string]interface{} `json:"details"`
}
var (
runStart = time.Now()
logPath = runStart.Format("2006-01-02.15:04:05") + ".gomeshalerter.json"
logFile *os.File
logData []LogEntry
db *sql.DB
logMutex sync.Mutex // Mutex for thread-safe logging
)
// Map of source names to their abbreviations
var sourceAbbreviations = map[string]string{
"BBC": "BBC",
"CNN": "CNN",
"NYTimes": "NYT",
"Guardian": "Grd",
"Al Jazeera": "AlJ",
"Fox News": "Fox",
"NBC": "NBC",
"ABC": "ABC",
"CBS": "CBS",
"Sky News": "Sky",
"Time": "Time",
"NPR": "NPR",
"Deutsche Welle": "DW",
"Associated Press": "AP",
"Euronews": "EurN",
"France 24": "F24",
"The Independent": "Ind",
"CNBC": "CNBC",
}
// Find the maximum abbreviation length
func getMaxAbbreviationLength() int {
maxLen := 0
for _, abbr := range sourceAbbreviations {
if len(abbr) > maxLen {
maxLen = len(abbr)
}
}
return maxLen
}
const (
dbPath = "articles.db"
// LLM prompts
ARTICLES_PROMPT = `Summarize each of these news items in under 165
characters, optimizing for information density (common news headline
abbreviations OK) and rate their importance from 1 to 100.
100 means most important; 1 means least important.
Never rate over 90 unless it is a massive event such as: war outbreak,
revolution, death of a head of state, large-scale natural disaster, mass
casualty terrorism, etc.
IMPORTANT: Rank any headlines primarily promoting commercial products or services as 1 (lowest importance).
For each article, return a JSON object with "id", "summary", and "importance"
fields. Return your response as a JSON array of objects like: [{"id":
"article_id", "summary": "...", "importance": 42}, ...]
Here are the articles:
`
SYSTEM_PROMPT = "You are a news analyst."
BATCH_SIZE = 10
MAX_INDIVIDUAL_PROCESSING = 50
// Timing constants
RSS_CHECK_INTERVAL = 15 * time.Minute
SUMMARIZE_INTERVAL = 10 * time.Second
BROADCAST_INTERVAL = 1 * time.Hour
)
var feeds = map[string]string{
"BBC": "https://feeds.bbci.co.uk/news/world/rss.xml",
"CNN": "http://rss.cnn.com/rss/edition.rss",
"NYTimes": "https://rss.nytimes.com/services/xml/rss/nyt/World.xml",
"Guardian": "https://www.theguardian.com/world/rss",
"Al Jazeera": "https://www.aljazeera.com/xml/rss/all.xml",
"Fox News": "http://feeds.foxnews.com/foxnews/latest",
"NBC": "http://feeds.nbcnews.com/nbcnews/public/news",
"ABC": "https://abcnews.go.com/abcnews/topstories",
"CBS": "https://www.cbsnews.com/latest/rss/world",
"Sky News": "https://feeds.skynews.com/feeds/rss/world.xml",
"Time": "https://time.com/feed/",
"NPR": "https://feeds.npr.org/1001/rss.xml",
"Deutsche Welle": "https://rss.dw.com/rdf/rss-en-world",
// New replacement feeds
"Associated Press": "https://apnews.com/hub/world-news/feed",
"Euronews": "https://www.euronews.com/rss/world-news.xml",
"France 24": "https://www.france24.com/en/rss",
"The Independent": "https://www.independent.co.uk/news/world/rss",
"CNBC": "https://www.cnbc.com/id/100727362/device/rss/rss.xml",
}
// Define a struct for redundancy check response
type RedundancyCheckResponse struct {
IsRedundant bool `json:"is_redundant"`
Reason string `json:"reason"`
}
func main() {
fmt.Fprintf(os.Stderr, "[%s] starting gomeshalerter\n", runStart.Format("15:04:05"))
setupLogging()
defer flushLog()
var err error
db, err = sql.Open("sqlite3", dbPath+"?_journal=WAL") // Use WAL mode for better concurrency
if err != nil {
log.Fatalf("Failed to open database: %v", err)
}
// Define a cleanup function to properly close resources
cleanup := func() {
fmt.Fprintf(os.Stderr, "[shutdown] Closing database...\n")
if err := db.Close(); err != nil {
fmt.Fprintf(os.Stderr, "[shutdown] Error closing database: %v\n", err)
}
flushLog()
fmt.Fprintf(os.Stderr, "[shutdown] Cleanup complete\n")
}
// Ensure cleanup runs on normal exit
defer cleanup()
if err := setupDatabase(); err != nil {
log.Fatalf("Failed to setup database: %v", err)
}
ollamaModel := "qwen3:32b"
ollamaURL := os.Getenv("OLLAMA_URL")
if ollamaURL == "" {
ollamaURL = "http://localhost:11434" // Default Ollama server URL
}
fmt.Fprintf(os.Stderr, "[ollama] Using model: %s at %s\n", ollamaModel, ollamaURL)
// Replace --broadcast flag with --dry-run flag (default is to broadcast)
dryRun := flag.Bool("dry-run", false, "don't actually send to meshtastic, just print what would be sent")
flag.Parse()
// Create a WaitGroup to manage goroutines
var wg sync.WaitGroup
// Create a channel to signal shutdown
shutdown := make(chan struct{})
// Set up signal handling for immediate exit with cleanup
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
fmt.Fprintf(os.Stderr, "[shutdown] Received signal, performing cleanup before exit...\n")
cleanup()
fmt.Fprintf(os.Stderr, "[shutdown] Exiting...\n")
os.Exit(0) // Exit after cleanup
}()
// Start RSS feed checker goroutine
wg.Add(1)
go func() {
defer wg.Done()
rssFeedChecker(shutdown, ollamaURL, ollamaModel)
}()
// Start article summarizer goroutine
wg.Add(1)
go func() {
defer wg.Done()
articleSummarizer(shutdown, ollamaURL, ollamaModel)
}()
// Start broadcaster goroutine
wg.Add(1)
go func() {
defer wg.Done()
broadcaster(shutdown, *dryRun)
}()
// Wait for all goroutines to finish
wg.Wait()
fmt.Fprintf(os.Stderr, "[shutdown] All goroutines stopped, exiting...\n")
}
// rssFeedChecker checks RSS feeds every 15 minutes and adds new articles to the database
func rssFeedChecker(shutdown chan struct{}, ollamaURL, ollamaModel string) {
fmt.Fprintf(os.Stderr, "[rss] Starting RSS feed checker (interval: %s)\n", RSS_CHECK_INTERVAL)
// Run immediately on startup
checkRSSFeeds()
// Then run on interval
ticker := time.NewTicker(RSS_CHECK_INTERVAL)
defer ticker.Stop()
for {
select {
case <-ticker.C:
checkRSSFeeds()
case <-shutdown:
fmt.Fprintf(os.Stderr, "[rss] Shutting down RSS feed checker\n")
return
}
}
}
// checkRSSFeeds fetches all RSS feeds and adds new articles to the database
func checkRSSFeeds() {
articles := loadArticles()
oldCount := len(articles)
logInfo("rss", "Checking RSS feeds", map[string]interface{}{
"time": time.Now().Format("15:04:05"),
"articlesBeforeFetch": oldCount,
})
now := time.Now()
newArticles := fetchAllFeedsParallel(now)
newCount := 0
for _, a := range newArticles {
if _, ok := articles[a.Link]; !ok {
if a.ID == "" {
a.ID = generateID(a.Link)
}
articles[a.Link] = a
saveArticle(a)
newCount++
logInfo("new", fmt.Sprintf("Found new article: %s", a.Title), map[string]interface{}{
"id": a.ID,
"source": a.Source,
"published": a.Published.Format(time.RFC3339),
})
}
}
logInfo("rss", "Completed RSS check", map[string]interface{}{
"articlesBeforeFetch": oldCount,
"articlesAfterFetch": oldCount + newCount,
"newArticles": newCount,
})
}
// articleSummarizer checks for articles without summaries every 10 seconds and processes them in batches
func articleSummarizer(shutdown chan struct{}, ollamaURL, ollamaModel string) {
fmt.Fprintf(os.Stderr, "[summarizer] Starting article summarizer (interval: %s)\n", SUMMARIZE_INTERVAL)
ticker := time.NewTicker(SUMMARIZE_INTERVAL)
defer ticker.Stop()
for {
select {
case <-ticker.C:
summarizeArticles(ollamaURL, ollamaModel)
case <-shutdown:
fmt.Fprintf(os.Stderr, "[summarizer] Shutting down article summarizer\n")
return
}
}
}
// summarizeArticles processes articles without summaries in batches
func summarizeArticles(ollamaURL, ollamaModel string) {
articles := loadArticles()
// Count articles that already have summaries
summarizedCount := 0
for _, article := range articles {
if article.Summary != "" && article.Importance != 0 {
summarizedCount++
}
}
// Collect articles that need summarization
var articlesToSummarize []Article
for _, article := range articles {
if article.Summary == "" || article.Importance == 0 {
articlesToSummarize = append(articlesToSummarize, article)
}
}
if len(articlesToSummarize) == 0 {
return // No articles to summarize
}
logInfo("summarizer", "Processing articles", map[string]interface{}{
"alreadySummarized": summarizedCount,
"toSummarize": len(articlesToSummarize),
"totalArticles": len(articles),
})
// Process in batches
for i := 0; i < len(articlesToSummarize); i += BATCH_SIZE {
end := i + BATCH_SIZE
if end > len(articlesToSummarize) {
end = len(articlesToSummarize)
}
batch := articlesToSummarize[i:end]
batchInfo := make([]string, 0, len(batch))
for _, a := range batch {
batchInfo = append(batchInfo, a.ID[:8])
}
logInfo("summarizer", fmt.Sprintf("Processing batch %d to %d", i+1, end), map[string]interface{}{
"batchSize": len(batch),
"batchIds": strings.Join(batchInfo, ","),
})
startTime := time.Now()
summaries, err := processArticleBatch(ollamaURL, ollamaModel, batch)
apiDuration := time.Since(startTime)
if err != nil {
logInfo("summarizer", "Batch processing error", map[string]interface{}{
"error": err.Error(),
"duration": apiDuration.String(),
})
continue
}
// Update articles with summaries
updatedCount := 0
for id, result := range summaries {
for _, article := range batch {
if article.ID == id {
article.Summary = result.Summary
article.Importance = result.Importance
updateArticle(article)
updatedCount++
break
}
}
}
logInfo("summarizer", "Batch processing complete", map[string]interface{}{
"batchSize": len(batch),
"updatedCount": updatedCount,
"duration": apiDuration.String(),
"durationMs": apiDuration.Milliseconds(),
"msPerArticle": apiDuration.Milliseconds() / int64(len(batch)),
})
}
}
// broadcaster runs on startup and every hour to select and broadcast the most important article
func broadcaster(shutdown chan struct{}, dryRun bool) {
logInfo("broadcaster", "Starting broadcaster (waiting 30 seconds before first broadcast)", map[string]interface{}{
"interval": BROADCAST_INTERVAL.String(),
"dryRun": dryRun,
})
// Sleep for 30 seconds on startup
logInfo("broadcaster", "Sleeping for 30 seconds before first broadcast", nil)
select {
case <-time.After(30 * time.Second):
// Continue after sleep
case <-shutdown:
logInfo("broadcaster", "Shutdown signal received during startup sleep", nil)
return
}
// Run immediately after initial sleep
checkAndBroadcast(dryRun)
// Then run on interval
ticker := time.NewTicker(BROADCAST_INTERVAL)
defer ticker.Stop()
for {
select {
case <-ticker.C:
checkAndBroadcast(dryRun)
case <-shutdown:
logInfo("broadcaster", "Shutting down broadcaster", nil)
return
}
}
}
// checkAndBroadcast checks if there are any unsummarized articles before broadcasting
func checkAndBroadcast(dryRun bool) {
// Check if there are any unsummarized articles
articles := loadArticles()
unsummarizedCount := 0
for _, article := range articles {
if article.Summary == "" || article.Importance == 0 {
unsummarizedCount++
}
}
if unsummarizedCount > 0 {
logInfo("broadcaster", "Postponing broadcast - waiting for articles to be summarized", map[string]interface{}{
"unsummarizedCount": unsummarizedCount,
"totalArticles": len(articles),
})
return
}
// No unsummarized articles, proceed with broadcast
broadcastWithRedundancyCheck(dryRun)
}
// broadcastWithRedundancyCheck attempts to find a non-redundant article to broadcast
func broadcastWithRedundancyCheck(dryRun bool) {
articles := loadArticles()
now := time.Now()
cutoff := now.Add(-24 * time.Hour) // 24-hour window for articles
var candidates []Article
for _, a := range articles {
// Only include articles that haven't been broadcast, have summaries, and are fresh
if a.Summary != "" && a.BroadcastTime.IsZero() && a.FirstSeen.After(cutoff) {
candidates = append(candidates, a)
}
}
if len(candidates) == 0 {
logInfo("broadcaster", "No fresh articles found for broadcasting", map[string]interface{}{
"totalArticles": len(articles),
})
return
}
// Sort by importance
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].Importance > candidates[j].Importance
})
// Get recent broadcasts
recentBroadcasts := getRecentBroadcasts(12)
// Try candidates until we find a non-redundant one
for i := 0; i < len(candidates); i++ {
candidate := candidates[i]
// Check if this candidate would be redundant
isRedundant, reason, err := checkRedundancy(candidate, recentBroadcasts)
if err != nil {
logInfo("redundancy", "Error checking redundancy, proceeding anyway", map[string]interface{}{
"error": err.Error(),
"id": candidate.ID,
})
// Continue with this candidate despite the error
broadcastArticle(candidate, dryRun)
return
}
if isRedundant {
logInfo("redundancy", "Article deemed redundant, marking as 'already broadcast'", map[string]interface{}{
"id": candidate.ID,
"summary": candidate.Summary,
"reason": reason,
})
// Mark as broadcast with special timestamp to prevent future selection
candidate.BroadcastTime = time.Unix(1, 0) // epoch + 1 second
updateArticle(candidate)
continue // Try next candidate
}
// Not redundant, proceed with this candidate
logInfo("redundancy", "Article passed redundancy check", map[string]interface{}{
"id": candidate.ID,
"candidateNumber": i + 1,
})
broadcastArticle(candidate, dryRun)
return
}
// If we got here, all candidates were redundant
logInfo("broadcaster", "All candidates were deemed redundant, no broadcast", map[string]interface{}{
"candidatesChecked": len(candidates),
})
}
// getRecentBroadcasts retrieves the n most recently broadcast articles
func getRecentBroadcasts(n int) []Article {
rows, err := db.Query(`
SELECT link, title, description, published, source, firstseen, seen, summary, importance, id, broadcastTime
FROM articles
WHERE broadcastTime IS NOT NULL AND broadcastTime > 1
ORDER BY broadcastTime DESC
LIMIT ?
`, n)
if err != nil {
logInfo("db", "Error retrieving recent broadcasts", map[string]interface{}{
"error": err.Error(),
})
return []Article{}
}
defer rows.Close()
var broadcasts []Article
for rows.Next() {
var a Article
var seen sql.NullTime
var broadcastTime sql.NullTime
err := rows.Scan(
&a.Link, &a.Title, &a.Description, &a.Published, &a.Source,
&a.FirstSeen, &seen, &a.Summary, &a.Importance, &a.ID, &broadcastTime,
)
if err != nil {
logInfo("db", "Error scanning broadcast article", map[string]interface{}{
"error": err.Error(),
})
continue
}
if seen.Valid {
a.Seen = seen.Time
}
if broadcastTime.Valid {
a.BroadcastTime = broadcastTime.Time
}
broadcasts = append(broadcasts, a)
}
return broadcasts
}
// checkRedundancy asks the LLM if the candidate message is redundant compared to recent broadcasts
func checkRedundancy(candidate Article, recentBroadcasts []Article) (bool, string, error) {
if len(recentBroadcasts) == 0 {
return false, "No previous broadcasts to compare", nil
}
// Get the abbreviated source name
sourceAbbr := sourceAbbreviations[candidate.Source]
if sourceAbbr == "" {
if len(candidate.Source) >= 3 {
sourceAbbr = candidate.Source[:3]
} else {
sourceAbbr = candidate.Source
}
}
// Format the candidate message as it would be broadcast
ts := time.Now().Format("15:04 MST")
candidateMsg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, strings.TrimSpace(candidate.Summary))
// Format the recent broadcasts
var recentMessages []string
for i, article := range recentBroadcasts {
sourceAbbr := sourceAbbreviations[article.Source]
if sourceAbbr == "" {
if len(article.Source) >= 3 {
sourceAbbr = article.Source[:3]
} else {
sourceAbbr = article.Source
}
}
broadcastTime := article.BroadcastTime.Format("15:04 MST")
msg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", broadcastTime, sourceAbbr, strings.TrimSpace(article.Summary))
recentMessages = append(recentMessages, fmt.Sprintf("%d. %s", i+1, msg))
}
// Create the LLM prompt
prompt := fmt.Sprintf(`Determine if the following message would be redundant to broadcast given the previous broadcasts.
CANDIDATE MESSAGE TO BROADCAST:
%s
PREVIOUS BROADCASTS (most recent first):
%s
Is the candidate message redundant with any of the previously broadcast messages? A message is considered redundant if:
1. It covers substantially the same event or news story as a previous message
2. It doesn't add significant new information beyond what was already broadcast
3. It's about the same persons/entities in the same context as a previous message
You MUST respond ONLY with a valid JSON object having the following structure, and nothing else:
{"is_redundant": true/false, "reason": "explanation of your decision"}
DO NOT include any other text, explanation, or formatting in your response, ONLY the JSON.`,
candidateMsg,
strings.Join(recentMessages, "\n"),
)
// Get the URL for the Ollama API
ollamaURL := os.Getenv("OLLAMA_URL")
if ollamaURL == "" {
ollamaURL = "http://localhost:11434" // Default Ollama server URL
}
// Use qwen3:32b model for consistency
ollamaModel := "qwen3:32b"
payload := map[string]interface{}{
"model": ollamaModel,
"messages": []map[string]string{
{"role": "system", "content": "You are a news redundancy analyzer. Your ONLY job is to determine if a news message is redundant compared to previously broadcast messages. You MUST respond ONLY with a valid JSON object in the format {\"is_redundant\": boolean, \"reason\": \"string\"} and nothing else."},
{"role": "user", "content": prompt},
},
"stream": false,
}
logInfo("redundancy", "Checking if article is redundant with previous broadcasts", map[string]interface{}{
"candidateID": candidate.ID,
"recentBroadcasts": len(recentBroadcasts),
})
// Convert payload to JSON
payloadBytes, err := json.Marshal(payload)
if err != nil {
return false, "", fmt.Errorf("error marshaling request: %v", err)
}
// Create the request
req, err := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(payloadBytes))
if err != nil {
return false, "", fmt.Errorf("error creating request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
// Send the request with timeout and retry mechanism
client := &http.Client{Timeout: 30 * time.Second}
// Try up to 3 times with backoff
var resp *http.Response
var respErr error
for attempt := 1; attempt <= 3; attempt++ {
resp, respErr = client.Do(req)
if respErr == nil {
break
}
logInfo("redundancy", "Request attempt failed", map[string]interface{}{
"attempt": attempt,
"error": respErr.Error(),
})
if attempt < 3 {
// Wait with exponential backoff before retry
backoffTime := time.Duration(attempt) * 2 * time.Second
time.Sleep(backoffTime)
}
}
// If all attempts failed, proceed with broadcasting anyway
if respErr != nil {
logInfo("redundancy", "All request attempts failed", map[string]interface{}{
"error": respErr.Error(),
})
return false, "Failed to check redundancy: " + respErr.Error(), nil
}
defer resp.Body.Close()
// Read the response
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
logInfo("redundancy", "Error reading response", map[string]interface{}{
"error": err.Error(),
})
return false, "Failed to read redundancy check response", nil
}
// Parse the response
var result struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
}
if err := json.Unmarshal(bodyBytes, &result); err != nil {
logInfo("redundancy", "Error parsing response JSON", map[string]interface{}{
"error": err.Error(),
"body": string(bodyBytes),
})
return false, "Failed to parse redundancy check response", nil
}
// Extract the JSON part from the response
content := result.Message.Content
content = strings.TrimSpace(content)
// Handle case where the response might be wrapped in markdown code blocks
if strings.HasPrefix(content, "```json") {
content = strings.TrimPrefix(content, "```json")
content = strings.TrimPrefix(content, "```")
if idx := strings.Index(content, "```"); idx > 0 {
content = content[:idx]
}
} else if strings.HasPrefix(content, "```") {
content = strings.TrimPrefix(content, "```")
if idx := strings.Index(content, "```"); idx > 0 {
content = content[:idx]
}
}
content = strings.TrimSpace(content)
// Check for JSON syntax - find first { and last }
firstBrace := strings.Index(content, "{")
lastBrace := strings.LastIndex(content, "}")
if firstBrace >= 0 && lastBrace > firstBrace {
// Extract what appears to be valid JSON
content = content[firstBrace : lastBrace+1]
} else {
logInfo("redundancy", "Response doesn't contain valid JSON structure", map[string]interface{}{
"content": content,
})
// Default to non-redundant with explanation
return false, "Could not parse LLM response as JSON", nil
}
// Parse the redundancy check result
var redundancyResult RedundancyCheckResponse
if err := json.Unmarshal([]byte(content), &redundancyResult); err != nil {
logInfo("redundancy", "Failed to unmarshal JSON response", map[string]interface{}{
"error": err.Error(),
"content": content,
})
// Default to non-redundant with explanation
return false, "Error parsing redundancy check result", nil
}
logInfo("redundancy", "Redundancy check result", map[string]interface{}{
"isRedundant": redundancyResult.IsRedundant,
"reason": redundancyResult.Reason,
"candidateID": candidate.ID,
})
return redundancyResult.IsRedundant, redundancyResult.Reason, nil
}
// broadcastArticle broadcasts the chosen article
func broadcastArticle(chosen Article, dryRun bool) {
// Get the abbreviated source name
sourceAbbr := sourceAbbreviations[chosen.Source]
if sourceAbbr == "" {
// Default to first 3 characters if no abbreviation defined
if len(chosen.Source) >= 3 {
sourceAbbr = chosen.Source[:3]
} else {
sourceAbbr = chosen.Source
}
}
ts := time.Now().Format("15:04 MST")
msg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, strings.TrimSpace(chosen.Summary))
// Ensure message is under 200 characters
if len(msg) > 200 {
// Calculate max summary length including timestamp, source prefix, AI/LLM prefix, and ellipsis
maxSummaryLen := 200 - len(ts) - len(sourceAbbr) - len(": [] [AI/LLM] ") - 3 // 3 for "..."
truncatedSummary := strings.TrimSpace(chosen.Summary)[:maxSummaryLen] + "..."
msg = fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, truncatedSummary)
logInfo("broadcaster", "Message truncated to fit 200 character limit", map[string]interface{}{
"originalLength": len(chosen.Summary),
"truncatedLength": maxSummaryLen,
})
}
// Print info about what will be broadcast
cmdStr := fmt.Sprintf("meshtastic --sendtext \"%s\"", msg)
logInfo("broadcaster", "Preparing to broadcast", map[string]interface{}{
"message": msg,
"length": len(msg),
"command": cmdStr,
"dryRun": dryRun,
"id": chosen.ID,
"importance": chosen.Importance,
"source": chosen.Source,
"sourceAbbr": sourceAbbr,
})
// Wait 30 seconds before broadcasting to allow time to see the message
logInfo("broadcaster", "Waiting 30 seconds before broadcasting...", nil)
time.Sleep(30 * time.Second)
// Update broadcast time and save to database
chosen.BroadcastTime = time.Now()
updateArticle(chosen)
logInfo("broadcaster", "Set broadcast time for article", map[string]interface{}{
"id": chosen.ID,
"summary": chosen.Summary,
"importance": chosen.Importance,
"broadcastTime": chosen.BroadcastTime.Format(time.RFC3339),
})
output, _ := json.MarshalIndent(chosen, "", " ")
fmt.Println(string(output))
if !dryRun {
logInfo("broadcaster", "Broadcasting message", map[string]interface{}{
"message": msg,
"length": len(msg),
})
cmd := exec.Command("meshtastic", "--sendtext", msg)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
logInfo("broadcaster", "Broadcast failed", map[string]interface{}{
"error": err.Error(),
"id": chosen.ID,
})
} else {
logInfo("broadcaster", "Broadcast success", map[string]interface{}{
"id": chosen.ID,
})
}
} else {
// In dry-run mode, just print the command
logInfo("broadcaster", "DRY RUN - would run command", map[string]interface{}{
"command": cmdStr,
"id": chosen.ID,
})
}
}
func setupLogging() {
var err error
logFile, err = os.Create(logPath)
if err != nil {
log.Fatalf("could not create log file: %v", err)
}
}
func flushLog() {
logMutex.Lock()
defer logMutex.Unlock()
if logFile == nil {
return
}
enc := json.NewEncoder(logFile)
enc.SetIndent("", " ")
_ = enc.Encode(logData)
logFile.Close()
}
func logEvent(event string, details map[string]interface{}) {
logMutex.Lock()
defer logMutex.Unlock()
details["timestamp"] = time.Now()
details["event"] = event
logData = append(logData, LogEntry{
Timestamp: time.Now(),
Event: event,
Details: details,
})
}
func setupDatabase() error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS articles (
link TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT,
published TIMESTAMP NOT NULL,
source TEXT NOT NULL,
firstseen TIMESTAMP NOT NULL,
seen TIMESTAMP,
summary TEXT,
importance INTEGER,
id TEXT,
broadcastTime TIMESTAMP
)
`)
if err != nil {
return err
}
// Check if columns exist
rows, err := db.Query(`PRAGMA table_info(articles)`)
if err != nil {
return err
}
defer rows.Close()
hasIDColumn := false
hasBroadcastTimeColumn := false
for rows.Next() {
var cid, notnull, pk int
var name, type_name string
var dflt_value interface{}
if err := rows.Scan(&cid, &name, &type_name, &notnull, &dflt_value, &pk); err != nil {
return err
}
if name == "id" {
hasIDColumn = true
}
if name == "broadcastTime" {
hasBroadcastTimeColumn = true
}
}
// Add missing columns if needed
if !hasIDColumn {
_, err = db.Exec(`ALTER TABLE articles ADD COLUMN id TEXT`)
if err != nil {
return err
}
}
if !hasBroadcastTimeColumn {
_, err = db.Exec(`ALTER TABLE articles ADD COLUMN broadcastTime TIMESTAMP`)
if err != nil {
return err
}
}
return nil
}
// Generate a deterministic ID from a URL
func generateID(url string) string {
hash := sha256.Sum256([]byte(url))
return hex.EncodeToString(hash[:])[:26] // Return first 26 chars of the hash
}
func saveArticle(article Article) error {
_, err := db.Exec(`
INSERT OR IGNORE INTO articles
(link, title, description, published, source, firstseen, seen, summary, importance, id, broadcastTime)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
article.Link, article.Title, article.Description, article.Published,
article.Source, article.FirstSeen, article.Seen, article.Summary, article.Importance, article.ID,
article.BroadcastTime)
if err != nil {
logEvent("db_insert_error", map[string]interface{}{
"article": article.Link,
"id": article.ID,
"error": err.Error(),
})
}
return err
}
func updateArticle(article Article) error {
_, err := db.Exec(`
UPDATE articles SET
title = ?,
description = ?,
published = ?,
source = ?,
firstseen = ?,
seen = ?,
summary = ?,
importance = ?,
id = ?,
broadcastTime = ?
WHERE link = ?
`,
article.Title, article.Description, article.Published, article.Source,
article.FirstSeen, article.Seen, article.Summary, article.Importance, article.ID,
article.BroadcastTime, article.Link)
if err != nil {
logEvent("db_update_error", map[string]interface{}{
"article": article.Link,
"id": article.ID,
"error": err.Error(),
})
}
return err
}
func loadArticles() map[string]Article {
articles := make(map[string]Article)
rows, err := db.Query(`
SELECT link, title, description, published, source, firstseen, seen, summary, importance, id, broadcastTime
FROM articles
`)
if err != nil {
logEvent("db_query_error", map[string]interface{}{
"error": err.Error(),
})
return articles
}
defer rows.Close()
for rows.Next() {
var a Article
var seen sql.NullTime
var broadcastTime sql.NullTime
err := rows.Scan(
&a.Link, &a.Title, &a.Description, &a.Published, &a.Source,
&a.FirstSeen, &seen, &a.Summary, &a.Importance, &a.ID, &broadcastTime,
)
if err != nil {
logEvent("db_scan_error", map[string]interface{}{
"error": err.Error(),
})
continue
}
if seen.Valid {
a.Seen = seen.Time
}
if broadcastTime.Valid {
a.BroadcastTime = broadcastTime.Time
}
articles[a.Link] = a
}
return articles
}
func fetchAllFeedsParallel(now time.Time) []Article {
type fetchResult struct {
Source string
URL string
Feed *gofeed.Feed
Err error
HTTPStatus int
Duration time.Duration
}
var wg sync.WaitGroup
results := make(chan fetchResult, len(feeds))
for source, url := range feeds {
wg.Add(1)
go func(source, url string) {
defer wg.Done()
start := time.Now()
fp := gofeed.NewParser()
client := &http.Client{Timeout: 2 * time.Second}
var httpStatus int
resp, err := client.Get(url)
var feed *gofeed.Feed
if err == nil {
httpStatus = resp.StatusCode
defer resp.Body.Close()
feed, err = fp.Parse(resp.Body)
}
duration := time.Since(start)
details := map[string]interface{}{
"source": source,
"url": url,
"status": httpStatus,
"duration": duration.Seconds(),
}
if err != nil {
details["error"] = err.Error()
}
logEvent("rss_fetch_result", details)
if err != nil {
fmt.Fprintf(os.Stderr, "[rss] FAIL %-15s (%s) [%.2fs] ERR: %v\n", source, url, duration.Seconds(), err)
results <- fetchResult{Source: source, URL: url, Err: err, Duration: duration, HTTPStatus: httpStatus}
return
}
fmt.Fprintf(os.Stderr, "[rss] OK %-15s (%s) [%.2fs] HTTP %d, items: %d\n",
source, url, duration.Seconds(), httpStatus, len(feed.Items))
results <- fetchResult{
Source: source,
URL: url,
Feed: feed,
Duration: duration,
HTTPStatus: httpStatus,
}
}(source, url)
}
wg.Wait()
close(results)
var all []Article
for result := range results {
if result.Err != nil || result.Feed == nil {
continue
}
for _, item := range result.Feed.Items {
published := now
if item.PublishedParsed != nil {
published = *item.PublishedParsed
}
all = append(all, Article{
Title: item.Title,
Description: item.Description,
Link: item.Link,
Published: published,
Source: result.Source,
FirstSeen: now,
ID: generateID(item.Link),
})
}
}
return all
}
type SummaryResult struct {
ID string `json:"id"`
Summary string `json:"summary"`
Importance int `json:"importance"`
}
func processArticleBatch(ollamaURL, model string, articles []Article) (map[string]SummaryResult, error) {
// Prepare batch prompt
var batchPrompt strings.Builder
batchPrompt.WriteString(ARTICLES_PROMPT)
for i, article := range articles {
batchPrompt.WriteString(fmt.Sprintf("Article %d (ID: %s):\n", i+1, article.ID))
batchPrompt.WriteString(fmt.Sprintf("Title: %s\n", article.Title))
batchPrompt.WriteString(fmt.Sprintf("Content: %s\n\n", article.Description))
}
payload := map[string]interface{}{
"model": model,
"messages": []map[string]string{
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": batchPrompt.String()},
},
"stream": false,
}
// Log request details
requestDetails := map[string]interface{}{
"model": model,
"batchSize": len(articles),
"apiEndpoint": ollamaURL + "/api/chat",
}
for i, article := range articles {
requestDetails[fmt.Sprintf("article%d", i+1)] = article.ID
}
logInfo("api", "Sending request to Ollama", requestDetails)
startTime := time.Now()
// Pretty print request JSON for logging
payloadBytes, err := json.MarshalIndent(payload, "", " ")
if err != nil {
logInfo("api", "JSON encoding error", map[string]interface{}{
"error": err.Error(),
})
return nil, err
}
req, err := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(payloadBytes))
if err != nil {
logInfo("api", "Request creation error", map[string]interface{}{
"error": err.Error(),
})
return nil, err
}
req.Header.Set("Content-Type", "application/json")
// Longer timeout for larger batches
timeout := 30 * time.Second
if len(articles) > 1 {
timeout = 120 * time.Second
}
client := &http.Client{Timeout: timeout}
resp, err := client.Do(req)
if err != nil {
logInfo("api", "API call failed", map[string]interface{}{
"error": err.Error(),
"duration": time.Since(startTime).String(),
})
return nil, err
}
defer resp.Body.Close()
bodyBytes, _ := io.ReadAll(resp.Body)
apiDuration := time.Since(startTime)
// Pretty print response JSON for debugging
var prettyJSON bytes.Buffer
err = json.Indent(&prettyJSON, bodyBytes, "", " ")
if err != nil {
// If we can't pretty print, just show as-is
logInfo("api", "Pretty print error", map[string]interface{}{
"error": err.Error(),
})
}
var res struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
}
if err := json.Unmarshal(bodyBytes, &res); err != nil {
logInfo("api", "JSON unmarshal error", map[string]interface{}{
"error": err.Error(),
"duration": apiDuration.String(),
})
return nil, err
}
if res.Message.Content == "" {
logInfo("api", "Empty response content", map[string]interface{}{
"duration": apiDuration.String(),
})
return nil, fmt.Errorf("no content in response")
}
// Parse the JSON response - handle both single object and array responses
var summaries []SummaryResult
// Try to parse as array first
if err := json.Unmarshal([]byte(res.Message.Content), &summaries); err != nil {
// Try to extract JSON from the content in case the model wrapped it in markdown or text
contentStr := res.Message.Content
jsonStart := strings.Index(contentStr, "[")
jsonEnd := strings.LastIndex(contentStr, "]")
if jsonStart >= 0 && jsonEnd > jsonStart {
// Try to parse as JSON array
jsonStr := contentStr[jsonStart : jsonEnd+1]
if err := json.Unmarshal([]byte(jsonStr), &summaries); err != nil {
// Try parsing as a single object for single article case
if len(articles) == 1 {
// Look for { } instead
jsonStart = strings.Index(contentStr, "{")
jsonEnd = strings.LastIndex(contentStr, "}")
if jsonStart >= 0 && jsonEnd > jsonStart {
jsonStr := contentStr[jsonStart : jsonEnd+1]
var singleResult struct {
Summary string `json:"summary"`
Importance int `json:"importance"`
}
if err := json.Unmarshal([]byte(jsonStr), &singleResult); err == nil {
// Create a SummaryResult with the article's ID
summaries = []SummaryResult{{
ID: articles[0].ID,
Summary: singleResult.Summary,
Importance: singleResult.Importance,
}}
} else {
logInfo("api", "JSON parse error for single object", map[string]interface{}{
"error": err.Error(),
"duration": apiDuration.String(),
})
return nil, err
}
} else {
logInfo("api", "Could not find JSON object in response", map[string]interface{}{
"duration": apiDuration.String(),
})
return nil, fmt.Errorf("could not find JSON object in response")
}
} else {
logInfo("api", "JSON parse error for array", map[string]interface{}{
"error": err.Error(),
"duration": apiDuration.String(),
})
return nil, err
}
}
} else {
logInfo("api", "Could not find JSON array in response", map[string]interface{}{
"duration": apiDuration.String(),
})
return nil, fmt.Errorf("could not find JSON array in response")
}
}
// Convert to map for easier lookup
resultMap := make(map[string]SummaryResult)
for _, summary := range summaries {
resultMap[summary.ID] = summary
}
logInfo("api", "API call successful", map[string]interface{}{
"duration": apiDuration.String(),
"durationMs": apiDuration.Milliseconds(),
"resultsCount": len(resultMap),
"msPerArticle": apiDuration.Milliseconds() / int64(len(articles)),
"statusCode": resp.StatusCode,
})
return resultMap, nil
}
// logInfo logs a structured message to both console and log file
func logInfo(component, message string, data map[string]interface{}) {
// Create a copy of the data map to avoid modifying the original
logData := make(map[string]interface{})
for k, v := range data {
logData[k] = v
}
// Add component and message to the log data
logData["component"] = component
logData["message"] = message
// Format console output: timestamp component: message key1=val1 key2=val2
timestamp := time.Now().Format("15:04:05.000")
console := fmt.Sprintf("[%s] [%s] %s", timestamp, component, message)
// Add key-value pairs to console output
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys) // Sort keys for consistent output
for _, k := range keys {
v := data[k]
console += fmt.Sprintf(" %s=%v", k, v)
}
// Print to console
fmt.Fprintln(os.Stderr, console)
// Log to structured log file
logEvent("info", logData)
}