From 26b26553547ce498c97c5cddef45143e533fe74c Mon Sep 17 00:00:00 2001 From: sneak Date: Thu, 22 May 2025 06:34:31 -0700 Subject: [PATCH] Refactor: Split monolithic codebase into modular components and improve functionality --- broadcaster.go | 230 ++++++++ llm.go | 547 ++++++++++++++++++ main.go | 1296 +----------------------------------------- models.go | 97 ++++ rss.go | 223 ++++++++ storage.go | 486 ++++++++++++++++ templates/index.html | 290 ++++++++++ webserver.go | 135 +++++ 8 files changed, 2025 insertions(+), 1279 deletions(-) create mode 100644 broadcaster.go create mode 100644 llm.go create mode 100644 models.go create mode 100644 rss.go create mode 100644 storage.go create mode 100644 templates/index.html create mode 100644 webserver.go diff --git a/broadcaster.go b/broadcaster.go new file mode 100644 index 0000000..e77c2af --- /dev/null +++ b/broadcaster.go @@ -0,0 +1,230 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "os/exec" + "sort" + "strings" + "time" +) + +// broadcaster runs on startup and every hour to select and broadcast the most important article +func broadcaster(shutdown chan struct{}, dryRun bool) { + logInfo("broadcaster", fmt.Sprintf("Starting broadcaster (waiting %d seconds before first broadcast)", int(STARTUP_DELAY.Seconds())), map[string]interface{}{ + "interval": BROADCAST_INTERVAL.String(), + "dryRun": dryRun, + }) + + // Sleep on startup + logInfo("broadcaster", fmt.Sprintf("Sleeping for %d seconds before first broadcast", int(STARTUP_DELAY.Seconds())), nil) + select { + case <-time.After(STARTUP_DELAY): + // Continue after sleep + case <-shutdown: + logInfo("broadcaster", "Shutdown signal received during startup sleep", nil) + return + } + + // Run immediately after initial sleep + checkAndBroadcast(dryRun) + + // Then run on interval + ticker := time.NewTicker(BROADCAST_INTERVAL) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + checkAndBroadcast(dryRun) + case <-shutdown: + logInfo("broadcaster", "Shutting down broadcaster", nil) + return + } + } +} + +// checkAndBroadcast checks if there are any unsummarized articles before broadcasting +func checkAndBroadcast(dryRun bool) { + // Check if there are any unsummarized articles + articles := loadArticles() + unsummarizedCount := 0 + + for _, article := range articles { + if article.Summary == "" || article.Importance == 0 { + unsummarizedCount++ + } + } + + if unsummarizedCount > 0 { + logInfo("broadcaster", "Postponing broadcast - waiting for articles to be summarized", map[string]interface{}{ + "unsummarizedCount": unsummarizedCount, + "totalArticles": len(articles), + }) + return + } + + // No unsummarized articles, proceed with broadcast + broadcastWithRedundancyCheck(dryRun) +} + +// broadcastWithRedundancyCheck attempts to find a non-redundant article to broadcast +func broadcastWithRedundancyCheck(dryRun bool) { + articles := loadArticles() + now := time.Now() + cutoff := now.Add(-ARTICLE_FRESHNESS_WINDOW) // Time window for considering articles fresh + + var candidates []Article + for _, a := range articles { + // Only include articles that haven't been broadcast, have summaries, and are fresh + if a.Summary != "" && a.BroadcastTime.IsZero() && a.FirstSeen.After(cutoff) { + candidates = append(candidates, a) + } + } + + if len(candidates) == 0 { + logInfo("broadcaster", "No fresh articles found for broadcasting", map[string]interface{}{ + "totalArticles": len(articles), + }) + return + } + + // Sort by importance + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].Importance > candidates[j].Importance + }) + + // Get recent broadcasts + recentBroadcasts := getRecentBroadcasts(12) + + // Try candidates until we find a non-redundant one + for i := 0; i < len(candidates); i++ { + candidate := candidates[i] + + // Check if this candidate would be redundant + isRedundant, reason, err := checkRedundancy(candidate, recentBroadcasts) + if err != nil { + logInfo("redundancy", "Error checking redundancy, proceeding anyway", map[string]interface{}{ + "error": err.Error(), + "id": candidate.ID, + }) + // Continue with this candidate despite the error + broadcastArticle(candidate, dryRun) + return + } + + if isRedundant { + logInfo("redundancy", "Article deemed redundant, marking as 'already broadcast'", map[string]interface{}{ + "id": candidate.ID, + "summary": candidate.Summary, + "reason": reason, + }) + + // Mark as broadcast with special timestamp to prevent future selection + candidate.BroadcastTime = time.Unix(1, 0) // epoch + 1 second + updateArticle(candidate) + continue // Try next candidate + } + + // Not redundant, proceed with this candidate + logInfo("redundancy", "Article passed redundancy check", map[string]interface{}{ + "id": candidate.ID, + "candidateNumber": i + 1, + }) + broadcastArticle(candidate, dryRun) + return + } + + // If we got here, all candidates were redundant + logInfo("broadcaster", "All candidates were deemed redundant, no broadcast", map[string]interface{}{ + "candidatesChecked": len(candidates), + }) +} + +// broadcastArticle broadcasts the chosen article +func broadcastArticle(chosen Article, dryRun bool) { + // Get the abbreviated source name + sourceAbbr := sourceAbbreviations[chosen.Source] + if sourceAbbr == "" { + // Default to first 3 characters if no abbreviation defined + if len(chosen.Source) >= 3 { + sourceAbbr = chosen.Source[:3] + } else { + sourceAbbr = chosen.Source + } + } + + ts := time.Now().Format("15:04 MST") + msg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, strings.TrimSpace(chosen.Summary)) + + // Ensure message is under 200 characters + if len(msg) > MAX_MESSAGE_LENGTH { + // Calculate max summary length including timestamp, source prefix, AI/LLM prefix, and ellipsis + maxSummaryLen := MAX_MESSAGE_LENGTH - len(ts) - len(sourceAbbr) - len(": [] [AI/LLM] ") - 3 // 3 for "..." + truncatedSummary := strings.TrimSpace(chosen.Summary)[:maxSummaryLen] + "..." + msg = fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, truncatedSummary) + logInfo("broadcaster", fmt.Sprintf("Message truncated to fit %d character limit", MAX_MESSAGE_LENGTH), map[string]interface{}{ + "originalLength": len(chosen.Summary), + "truncatedLength": maxSummaryLen, + }) + } + + // Print info about what will be broadcast + cmdStr := fmt.Sprintf("meshtastic --sendtext \"%s\"", msg) + logInfo("broadcaster", "Preparing to broadcast", map[string]interface{}{ + "message": msg, + "length": len(msg), + "command": cmdStr, + "dryRun": dryRun, + "id": chosen.ID, + "importance": chosen.Importance, + "source": chosen.Source, + "sourceAbbr": sourceAbbr, + }) + + // Wait before broadcasting to allow time to see the message + logInfo("broadcaster", fmt.Sprintf("Waiting %d seconds before broadcasting...", int(BROADCAST_PREPARATION_DELAY.Seconds())), nil) + time.Sleep(BROADCAST_PREPARATION_DELAY) + + // Update broadcast time and save to database + chosen.BroadcastTime = time.Now() + updateArticle(chosen) + + logInfo("broadcaster", "Set broadcast time for article", map[string]interface{}{ + "id": chosen.ID, + "summary": chosen.Summary, + "importance": chosen.Importance, + "broadcastTime": chosen.BroadcastTime.Format(time.RFC3339), + }) + + output, _ := json.MarshalIndent(chosen, "", " ") + fmt.Println(string(output)) + + if !dryRun { + logInfo("broadcaster", "Broadcasting message", map[string]interface{}{ + "message": msg, + "length": len(msg), + }) + + cmd := exec.Command("meshtastic", "--sendtext", msg) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + logInfo("broadcaster", "Broadcast failed", map[string]interface{}{ + "error": err.Error(), + "id": chosen.ID, + }) + } else { + logInfo("broadcaster", "Broadcast success", map[string]interface{}{ + "id": chosen.ID, + }) + } + } else { + // In dry-run mode, just print the command + logInfo("broadcaster", "DRY RUN - would run command", map[string]interface{}{ + "command": cmdStr, + "id": chosen.ID, + }) + } +} diff --git a/llm.go b/llm.go new file mode 100644 index 0000000..ed0bc8e --- /dev/null +++ b/llm.go @@ -0,0 +1,547 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" +) + +// Define a struct for redundancy check response +type RedundancyCheckResponse struct { + IsRedundant bool `json:"is_redundant"` + Reason string `json:"reason"` +} + +type SummaryResult struct { + ID string `json:"id"` + Summary string `json:"summary"` + Importance int `json:"importance"` +} + +// articleSummarizer checks for articles without summaries every 10 seconds and processes them in batches +func articleSummarizer(shutdown chan struct{}, ollamaURL, ollamaModel string) { + fmt.Fprintf(os.Stderr, "[summarizer] Starting article summarizer (interval: %s)\n", SUMMARIZE_INTERVAL) + + ticker := time.NewTicker(SUMMARIZE_INTERVAL) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + summarizeArticles(ollamaURL, ollamaModel) + case <-shutdown: + fmt.Fprintf(os.Stderr, "[summarizer] Shutting down article summarizer\n") + return + } + } +} + +// summarizeArticles processes articles without summaries in batches +func summarizeArticles(ollamaURL, ollamaModel string) { + articles := loadArticles() + + // Count articles that already have summaries + summarizedCount := 0 + for _, article := range articles { + if article.Summary != "" && article.Importance != 0 { + summarizedCount++ + } + } + + // Collect articles that need summarization + var articlesToSummarize []Article + for _, article := range articles { + if article.Summary == "" || article.Importance == 0 { + articlesToSummarize = append(articlesToSummarize, article) + } + } + + if len(articlesToSummarize) == 0 { + return // No articles to summarize + } + + logInfo("summarizer", "Processing articles", map[string]interface{}{ + "alreadySummarized": summarizedCount, + "toSummarize": len(articlesToSummarize), + "totalArticles": len(articles), + }) + + // Process in batches + for i := 0; i < len(articlesToSummarize); i += BATCH_SIZE { + end := i + BATCH_SIZE + if end > len(articlesToSummarize) { + end = len(articlesToSummarize) + } + + batch := articlesToSummarize[i:end] + batchInfo := make([]string, 0, len(batch)) + for _, a := range batch { + batchInfo = append(batchInfo, a.ID[:8]) + } + + logInfo("summarizer", fmt.Sprintf("Processing batch %d to %d", i+1, end), map[string]interface{}{ + "batchSize": len(batch), + "batchIds": strings.Join(batchInfo, ","), + }) + + startTime := time.Now() + summaries, err := processArticleBatch(ollamaURL, ollamaModel, batch) + apiDuration := time.Since(startTime) + + if err != nil { + logInfo("summarizer", "Batch processing error", map[string]interface{}{ + "error": err.Error(), + "duration": apiDuration.String(), + }) + continue + } + + // Update articles with summaries + updatedCount := 0 + for id, result := range summaries { + for _, article := range batch { + if article.ID == id { + article.Summary = result.Summary + article.Importance = result.Importance + updateArticle(article) + updatedCount++ + break + } + } + } + + logInfo("summarizer", "Batch processing complete", map[string]interface{}{ + "batchSize": len(batch), + "updatedCount": updatedCount, + "duration": apiDuration.String(), + "durationMs": apiDuration.Milliseconds(), + "msPerArticle": apiDuration.Milliseconds() / int64(len(batch)), + }) + + // Calculate and log ETA for remaining articles + remainingCount := len(articlesToSummarize) - (i + len(batch)) + if remainingCount > 0 { + // Calculate milliseconds per article from this batch + msPerArticle := apiDuration.Milliseconds() / int64(len(batch)) + + // Calculate remaining time + remainingTimeMs := msPerArticle * int64(remainingCount) + remainingTime := time.Duration(remainingTimeMs) * time.Millisecond + + // Calculate how many batches remain + remainingBatches := (remainingCount + BATCH_SIZE - 1) / BATCH_SIZE // Ceiling division + + // Estimated completion time + eta := time.Now().Add(remainingTime) + + logInfo("summarizer", "Estimated time to completion", map[string]interface{}{ + "remainingArticles": remainingCount, + "remainingBatches": remainingBatches, + "msPerArticle": msPerArticle, + "estimatedTimeLeft": remainingTime.String(), + "eta": eta.Format("15:04:05 MST"), + }) + } + } +} + +func processArticleBatch(ollamaURL, model string, articles []Article) (map[string]SummaryResult, error) { + // Prepare batch prompt + var batchPrompt strings.Builder + batchPrompt.WriteString(ARTICLES_PROMPT) + + for i, article := range articles { + batchPrompt.WriteString(fmt.Sprintf("Article %d (ID: %s):\n", i+1, article.ID)) + batchPrompt.WriteString(fmt.Sprintf("Title: %s\n", article.Title)) + batchPrompt.WriteString(fmt.Sprintf("Content: %s\n", article.Description)) + + // Add original publication date information to help with breaking news ranking + batchPrompt.WriteString(fmt.Sprintf("Original Publication Date: %s\n", article.OriginalDate.Format(time.RFC3339))) + + // Calculate minutes since publication for clearer guidance + minutesOld := time.Since(article.OriginalDate).Minutes() + batchPrompt.WriteString(fmt.Sprintf("Minutes Since Publication: %.0f\n\n", minutesOld)) + } + + payload := map[string]interface{}{ + "model": model, + "messages": []map[string]string{ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": batchPrompt.String()}, + }, + "stream": false, + } + + // Log request details + requestDetails := map[string]interface{}{ + "model": model, + "batchSize": len(articles), + "apiEndpoint": ollamaURL + "/api/chat", + } + + for i, article := range articles { + requestDetails[fmt.Sprintf("article%d", i+1)] = article.ID + } + + logInfo("api", "Sending request to Ollama", requestDetails) + startTime := time.Now() + + // Pretty print request JSON for logging + payloadBytes, err := json.MarshalIndent(payload, "", " ") + if err != nil { + logInfo("api", "JSON encoding error", map[string]interface{}{ + "error": err.Error(), + }) + return nil, err + } + + req, err := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(payloadBytes)) + if err != nil { + logInfo("api", "Request creation error", map[string]interface{}{ + "error": err.Error(), + }) + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + // Longer timeout for larger batches + timeout := 30 * time.Second + if len(articles) > 1 { + timeout = 120 * time.Second + } + client := &http.Client{Timeout: timeout} + + resp, err := client.Do(req) + if err != nil { + logInfo("api", "API call failed", map[string]interface{}{ + "error": err.Error(), + "duration": time.Since(startTime).String(), + }) + return nil, err + } + defer resp.Body.Close() + + bodyBytes, _ := io.ReadAll(resp.Body) + apiDuration := time.Since(startTime) + + // Pretty print response JSON for debugging + var prettyJSON bytes.Buffer + err = json.Indent(&prettyJSON, bodyBytes, "", " ") + if err != nil { + // If we can't pretty print, just show as-is + logInfo("api", "Pretty print error", map[string]interface{}{ + "error": err.Error(), + }) + } + + var res struct { + Message struct { + Content string `json:"content"` + } `json:"message"` + } + if err := json.Unmarshal(bodyBytes, &res); err != nil { + logInfo("api", "JSON unmarshal error", map[string]interface{}{ + "error": err.Error(), + "duration": apiDuration.String(), + }) + return nil, err + } + + if res.Message.Content == "" { + logInfo("api", "Empty response content", map[string]interface{}{ + "duration": apiDuration.String(), + }) + return nil, fmt.Errorf("no content in response") + } + + // Parse the JSON response - handle both single object and array responses + var summaries []SummaryResult + + // Try to parse as array first + if err := json.Unmarshal([]byte(res.Message.Content), &summaries); err != nil { + // Try to extract JSON from the content in case the model wrapped it in markdown or text + contentStr := res.Message.Content + jsonStart := strings.Index(contentStr, "[") + jsonEnd := strings.LastIndex(contentStr, "]") + + if jsonStart >= 0 && jsonEnd > jsonStart { + // Try to parse as JSON array + jsonStr := contentStr[jsonStart : jsonEnd+1] + if err := json.Unmarshal([]byte(jsonStr), &summaries); err != nil { + // Try parsing as a single object for single article case + if len(articles) == 1 { + // Look for { } instead + jsonStart = strings.Index(contentStr, "{") + jsonEnd = strings.LastIndex(contentStr, "}") + + if jsonStart >= 0 && jsonEnd > jsonStart { + jsonStr := contentStr[jsonStart : jsonEnd+1] + var singleResult struct { + Summary string `json:"summary"` + Importance int `json:"importance"` + } + + if err := json.Unmarshal([]byte(jsonStr), &singleResult); err == nil { + // Create a SummaryResult with the article's ID + summaries = []SummaryResult{{ + ID: articles[0].ID, + Summary: singleResult.Summary, + Importance: singleResult.Importance, + }} + } else { + logInfo("api", "JSON parse error for single object", map[string]interface{}{ + "error": err.Error(), + "duration": apiDuration.String(), + }) + return nil, err + } + } else { + logInfo("api", "Could not find JSON object in response", map[string]interface{}{ + "duration": apiDuration.String(), + }) + return nil, fmt.Errorf("could not find JSON object in response") + } + } else { + logInfo("api", "JSON parse error for array", map[string]interface{}{ + "error": err.Error(), + "duration": apiDuration.String(), + }) + return nil, err + } + } + } else { + logInfo("api", "Could not find JSON array in response", map[string]interface{}{ + "duration": apiDuration.String(), + }) + return nil, fmt.Errorf("could not find JSON array in response") + } + } + + // Convert to map for easier lookup + resultMap := make(map[string]SummaryResult) + for _, summary := range summaries { + resultMap[summary.ID] = summary + } + + logInfo("api", "API call successful", map[string]interface{}{ + "duration": apiDuration.String(), + "durationMs": apiDuration.Milliseconds(), + "resultsCount": len(resultMap), + "msPerArticle": apiDuration.Milliseconds() / int64(len(articles)), + "statusCode": resp.StatusCode, + }) + + return resultMap, nil +} + +// checkRedundancy asks the LLM if the candidate message is redundant compared to recent broadcasts +func checkRedundancy(candidate Article, recentBroadcasts []Article) (bool, string, error) { + if len(recentBroadcasts) == 0 { + return false, "No previous broadcasts to compare", nil + } + + // Get the abbreviated source name + sourceAbbr := sourceAbbreviations[candidate.Source] + if sourceAbbr == "" { + if len(candidate.Source) >= 3 { + sourceAbbr = candidate.Source[:3] + } else { + sourceAbbr = candidate.Source + } + } + + // Format the candidate message as it would be broadcast + ts := time.Now().Format("15:04 MST") + candidateMsg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, strings.TrimSpace(candidate.Summary)) + + // Format the recent broadcasts + var recentMessages []string + for i, article := range recentBroadcasts { + sourceAbbr := sourceAbbreviations[article.Source] + if sourceAbbr == "" { + if len(article.Source) >= 3 { + sourceAbbr = article.Source[:3] + } else { + sourceAbbr = article.Source + } + } + + broadcastTime := article.BroadcastTime.Format("15:04 MST") + msg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", broadcastTime, sourceAbbr, strings.TrimSpace(article.Summary)) + recentMessages = append(recentMessages, fmt.Sprintf("%d. %s", i+1, msg)) + } + + // Create the LLM prompt + prompt := fmt.Sprintf(`Determine if the following message would be redundant to broadcast given the previous broadcasts. + +CANDIDATE MESSAGE TO BROADCAST: +%s + +PREVIOUS BROADCASTS (most recent first): +%s + +Is the candidate message redundant with any of the previously broadcast messages? A message is considered redundant if: +1. It covers substantially the same event or news story as a previous message +2. It doesn't add significant new information beyond what was already broadcast +3. It's about the same persons/entities in the same context as a previous message + +You MUST respond ONLY with a valid JSON object having the following structure, and nothing else: +{"is_redundant": true/false, "reason": "explanation of your decision"} + +DO NOT include any other text, explanation, or formatting in your response, ONLY the JSON.`, + candidateMsg, + strings.Join(recentMessages, "\n"), + ) + + // Get the URL for the Ollama API + ollamaURL := os.Getenv("OLLAMA_URL") + if ollamaURL == "" { + ollamaURL = "http://localhost:11434" // Default Ollama server URL + } + + // Use qwen3:32b model for consistency + ollamaModel := "qwen3:32b" + + payload := map[string]interface{}{ + "model": ollamaModel, + "messages": []map[string]string{ + {"role": "system", "content": "You are a news redundancy analyzer. Your ONLY job is to determine if a news message is redundant compared to previously broadcast messages. You MUST respond ONLY with a valid JSON object in the format {\"is_redundant\": boolean, \"reason\": \"string\"} and nothing else."}, + {"role": "user", "content": prompt}, + }, + "stream": false, + } + + logInfo("redundancy", "Checking if article is redundant with previous broadcasts", map[string]interface{}{ + "candidateID": candidate.ID, + "recentBroadcasts": len(recentBroadcasts), + }) + + // Convert payload to JSON + payloadBytes, err := json.Marshal(payload) + if err != nil { + return false, "", fmt.Errorf("error marshaling request: %v", err) + } + + // Create the request + req, err := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(payloadBytes)) + if err != nil { + return false, "", fmt.Errorf("error creating request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + + // Send the request with timeout and retry mechanism + client := &http.Client{Timeout: 30 * time.Second} + + // Try up to 3 times with backoff + var resp *http.Response + var respErr error + + for attempt := 1; attempt <= 3; attempt++ { + resp, respErr = client.Do(req) + if respErr == nil { + break + } + + logInfo("redundancy", "Request attempt failed", map[string]interface{}{ + "attempt": attempt, + "error": respErr.Error(), + }) + + if attempt < 3 { + // Wait with exponential backoff before retry + backoffTime := time.Duration(attempt) * 2 * time.Second + time.Sleep(backoffTime) + } + } + + // If all attempts failed, proceed with broadcasting anyway + if respErr != nil { + logInfo("redundancy", "All request attempts failed", map[string]interface{}{ + "error": respErr.Error(), + }) + return false, "Failed to check redundancy: " + respErr.Error(), nil + } + defer resp.Body.Close() + + // Read the response + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + logInfo("redundancy", "Error reading response", map[string]interface{}{ + "error": err.Error(), + }) + return false, "Failed to read redundancy check response", nil + } + + // Parse the response + var result struct { + Message struct { + Content string `json:"content"` + } `json:"message"` + } + if err := json.Unmarshal(bodyBytes, &result); err != nil { + logInfo("redundancy", "JSON unmarshal error", map[string]interface{}{ + "error": err.Error(), + "body": string(bodyBytes), + }) + return false, "Failed to parse redundancy check response", nil + } + + // Extract the JSON part from the response + content := result.Message.Content + content = strings.TrimSpace(content) + + // Handle case where the response might be wrapped in markdown code blocks + if strings.HasPrefix(content, "```json") { + content = strings.TrimPrefix(content, "```json") + content = strings.TrimPrefix(content, "```") + if idx := strings.Index(content, "```"); idx > 0 { + content = content[:idx] + } + } else if strings.HasPrefix(content, "```") { + content = strings.TrimPrefix(content, "```") + if idx := strings.Index(content, "```"); idx > 0 { + content = content[:idx] + } + } + + content = strings.TrimSpace(content) + + // Check for JSON syntax - find first { and last } + firstBrace := strings.Index(content, "{") + lastBrace := strings.LastIndex(content, "}") + + if firstBrace >= 0 && lastBrace > firstBrace { + // Extract what appears to be valid JSON + content = content[firstBrace : lastBrace+1] + } else { + logInfo("redundancy", "Response doesn't contain valid JSON structure", map[string]interface{}{ + "content": content, + }) + // Default to non-redundant with explanation + return false, "Could not parse LLM response as JSON", nil + } + + // Parse the redundancy check result + var redundancyResult RedundancyCheckResponse + if err := json.Unmarshal([]byte(content), &redundancyResult); err != nil { + logInfo("redundancy", "Failed to unmarshal JSON response", map[string]interface{}{ + "error": err.Error(), + "content": content, + }) + // Default to non-redundant with explanation + return false, "Error parsing redundancy check result", nil + } + + logInfo("redundancy", "Redundancy check result", map[string]interface{}{ + "isRedundant": redundancyResult.IsRedundant, + "reason": redundancyResult.Reason, + "candidateID": candidate.ID, + }) + + return redundancyResult.IsRedundant, redundancyResult.Reason, nil +} diff --git a/main.go b/main.go index 157e9df..d2e1ebd 100644 --- a/main.go +++ b/main.go @@ -1,153 +1,30 @@ package main import ( - "bytes" - "crypto/sha256" "database/sql" - "encoding/hex" - "encoding/json" "flag" "fmt" - "io" "log" - "net/http" "os" - "os/exec" "os/signal" - "sort" - "strings" "sync" "syscall" "time" _ "github.com/joho/godotenv/autoload" _ "github.com/mattn/go-sqlite3" - "github.com/mmcdole/gofeed" ) -type Article struct { - Title string `json:"title"` - Description string `json:"description"` - Link string `json:"link"` - Published time.Time `json:"published"` - Source string `json:"source"` - FirstSeen time.Time `json:"firstseen"` - Seen time.Time `json:"seen"` - Summary string `json:"summary"` - Importance int `json:"importance"` - ID string `json:"id"` - BroadcastTime time.Time `json:"broadcastTime,omitempty"` -} - -type LogEntry struct { - Timestamp time.Time `json:"timestamp"` - Event string `json:"event"` - Details map[string]interface{} `json:"details"` -} - var ( - runStart = time.Now() - logPath = runStart.Format("2006-01-02.15:04:05") + ".gomeshalerter.json" - logFile *os.File - logData []LogEntry - db *sql.DB - logMutex sync.Mutex // Mutex for thread-safe logging + runStart = time.Now() + logPath = runStart.Format("2006-01-02.15:04:05") + ".gomeshalerter.json" + logFile *os.File + logData []LogEntry + logBuffer *LogRingBuffer + db *sql.DB + logMutex sync.Mutex // Mutex for thread-safe logging ) -// Map of source names to their abbreviations -var sourceAbbreviations = map[string]string{ - "BBC": "BBC", - "CNN": "CNN", - "NYTimes": "NYT", - "Guardian": "Grd", - "Al Jazeera": "AlJ", - "Fox News": "Fox", - "NBC": "NBC", - "ABC": "ABC", - "CBS": "CBS", - "Sky News": "Sky", - "Time": "Time", - "NPR": "NPR", - "Deutsche Welle": "DW", - "Associated Press": "AP", - "Euronews": "EurN", - "France 24": "F24", - "The Independent": "Ind", - "CNBC": "CNBC", -} - -// Find the maximum abbreviation length -func getMaxAbbreviationLength() int { - maxLen := 0 - for _, abbr := range sourceAbbreviations { - if len(abbr) > maxLen { - maxLen = len(abbr) - } - } - return maxLen -} - -const ( - dbPath = "articles.db" - - // LLM prompts - ARTICLES_PROMPT = `Summarize each of these news items in under 165 - characters, optimizing for information density (common news headline - abbreviations OK) and rate their importance from 1 to 100. - -100 means most important; 1 means least important. - -Never rate over 90 unless it is a massive event such as: war outbreak, -revolution, death of a head of state, large-scale natural disaster, mass -casualty terrorism, etc. - -IMPORTANT: Rank any headlines primarily promoting commercial products or services as 1 (lowest importance). - -For each article, return a JSON object with "id", "summary", and "importance" -fields. Return your response as a JSON array of objects like: [{"id": -"article_id", "summary": "...", "importance": 42}, ...] - -Here are the articles: -` - - SYSTEM_PROMPT = "You are a news analyst." - BATCH_SIZE = 10 - MAX_INDIVIDUAL_PROCESSING = 50 - - // Timing constants - RSS_CHECK_INTERVAL = 15 * time.Minute - SUMMARIZE_INTERVAL = 10 * time.Second - BROADCAST_INTERVAL = 1 * time.Hour -) - -var feeds = map[string]string{ - "BBC": "https://feeds.bbci.co.uk/news/world/rss.xml", - "CNN": "http://rss.cnn.com/rss/edition.rss", - "NYTimes": "https://rss.nytimes.com/services/xml/rss/nyt/World.xml", - "Guardian": "https://www.theguardian.com/world/rss", - "Al Jazeera": "https://www.aljazeera.com/xml/rss/all.xml", - "Fox News": "http://feeds.foxnews.com/foxnews/latest", - "NBC": "http://feeds.nbcnews.com/nbcnews/public/news", - "ABC": "https://abcnews.go.com/abcnews/topstories", - "CBS": "https://www.cbsnews.com/latest/rss/world", - "Sky News": "https://feeds.skynews.com/feeds/rss/world.xml", - "Time": "https://time.com/feed/", - "NPR": "https://feeds.npr.org/1001/rss.xml", - "Deutsche Welle": "https://rss.dw.com/rdf/rss-en-world", - // New replacement feeds - "Associated Press": "https://apnews.com/hub/world-news/feed", - "Euronews": "https://www.euronews.com/rss/world-news.xml", - "France 24": "https://www.france24.com/en/rss", - "The Independent": "https://www.independent.co.uk/news/world/rss", - "CNBC": "https://www.cnbc.com/id/100727362/device/rss/rss.xml", -} - -// Define a struct for redundancy check response -type RedundancyCheckResponse struct { - IsRedundant bool `json:"is_redundant"` - Reason string `json:"reason"` -} - func main() { fmt.Fprintf(os.Stderr, "[%s] starting gomeshalerter\n", runStart.Format("15:04:05")) setupLogging() @@ -159,6 +36,9 @@ func main() { log.Fatalf("Failed to open database: %v", err) } + // Initialize log buffer + logBuffer = NewLogRingBuffer(MAX_LOG_ENTRIES) + // Define a cleanup function to properly close resources cleanup := func() { fmt.Fprintf(os.Stderr, "[shutdown] Closing database...\n") @@ -226,1156 +106,14 @@ func main() { broadcaster(shutdown, *dryRun) }() + // Start web server goroutine + wg.Add(1) + go func() { + defer wg.Done() + webServer(shutdown) + }() + // Wait for all goroutines to finish wg.Wait() fmt.Fprintf(os.Stderr, "[shutdown] All goroutines stopped, exiting...\n") } - -// rssFeedChecker checks RSS feeds every 15 minutes and adds new articles to the database -func rssFeedChecker(shutdown chan struct{}, ollamaURL, ollamaModel string) { - fmt.Fprintf(os.Stderr, "[rss] Starting RSS feed checker (interval: %s)\n", RSS_CHECK_INTERVAL) - - // Run immediately on startup - checkRSSFeeds() - - // Then run on interval - ticker := time.NewTicker(RSS_CHECK_INTERVAL) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - checkRSSFeeds() - case <-shutdown: - fmt.Fprintf(os.Stderr, "[rss] Shutting down RSS feed checker\n") - return - } - } -} - -// checkRSSFeeds fetches all RSS feeds and adds new articles to the database -func checkRSSFeeds() { - articles := loadArticles() - oldCount := len(articles) - - logInfo("rss", "Checking RSS feeds", map[string]interface{}{ - "time": time.Now().Format("15:04:05"), - "articlesBeforeFetch": oldCount, - }) - - now := time.Now() - newArticles := fetchAllFeedsParallel(now) - newCount := 0 - - for _, a := range newArticles { - if _, ok := articles[a.Link]; !ok { - if a.ID == "" { - a.ID = generateID(a.Link) - } - articles[a.Link] = a - saveArticle(a) - newCount++ - logInfo("new", fmt.Sprintf("Found new article: %s", a.Title), map[string]interface{}{ - "id": a.ID, - "source": a.Source, - "published": a.Published.Format(time.RFC3339), - }) - } - } - - logInfo("rss", "Completed RSS check", map[string]interface{}{ - "articlesBeforeFetch": oldCount, - "articlesAfterFetch": oldCount + newCount, - "newArticles": newCount, - }) -} - -// articleSummarizer checks for articles without summaries every 10 seconds and processes them in batches -func articleSummarizer(shutdown chan struct{}, ollamaURL, ollamaModel string) { - fmt.Fprintf(os.Stderr, "[summarizer] Starting article summarizer (interval: %s)\n", SUMMARIZE_INTERVAL) - - ticker := time.NewTicker(SUMMARIZE_INTERVAL) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - summarizeArticles(ollamaURL, ollamaModel) - case <-shutdown: - fmt.Fprintf(os.Stderr, "[summarizer] Shutting down article summarizer\n") - return - } - } -} - -// summarizeArticles processes articles without summaries in batches -func summarizeArticles(ollamaURL, ollamaModel string) { - articles := loadArticles() - - // Count articles that already have summaries - summarizedCount := 0 - for _, article := range articles { - if article.Summary != "" && article.Importance != 0 { - summarizedCount++ - } - } - - // Collect articles that need summarization - var articlesToSummarize []Article - for _, article := range articles { - if article.Summary == "" || article.Importance == 0 { - articlesToSummarize = append(articlesToSummarize, article) - } - } - - if len(articlesToSummarize) == 0 { - return // No articles to summarize - } - - logInfo("summarizer", "Processing articles", map[string]interface{}{ - "alreadySummarized": summarizedCount, - "toSummarize": len(articlesToSummarize), - "totalArticles": len(articles), - }) - - // Process in batches - for i := 0; i < len(articlesToSummarize); i += BATCH_SIZE { - end := i + BATCH_SIZE - if end > len(articlesToSummarize) { - end = len(articlesToSummarize) - } - - batch := articlesToSummarize[i:end] - batchInfo := make([]string, 0, len(batch)) - for _, a := range batch { - batchInfo = append(batchInfo, a.ID[:8]) - } - - logInfo("summarizer", fmt.Sprintf("Processing batch %d to %d", i+1, end), map[string]interface{}{ - "batchSize": len(batch), - "batchIds": strings.Join(batchInfo, ","), - }) - - startTime := time.Now() - summaries, err := processArticleBatch(ollamaURL, ollamaModel, batch) - apiDuration := time.Since(startTime) - - if err != nil { - logInfo("summarizer", "Batch processing error", map[string]interface{}{ - "error": err.Error(), - "duration": apiDuration.String(), - }) - continue - } - - // Update articles with summaries - updatedCount := 0 - for id, result := range summaries { - for _, article := range batch { - if article.ID == id { - article.Summary = result.Summary - article.Importance = result.Importance - updateArticle(article) - updatedCount++ - break - } - } - } - - logInfo("summarizer", "Batch processing complete", map[string]interface{}{ - "batchSize": len(batch), - "updatedCount": updatedCount, - "duration": apiDuration.String(), - "durationMs": apiDuration.Milliseconds(), - "msPerArticle": apiDuration.Milliseconds() / int64(len(batch)), - }) - } -} - -// broadcaster runs on startup and every hour to select and broadcast the most important article -func broadcaster(shutdown chan struct{}, dryRun bool) { - logInfo("broadcaster", "Starting broadcaster (waiting 30 seconds before first broadcast)", map[string]interface{}{ - "interval": BROADCAST_INTERVAL.String(), - "dryRun": dryRun, - }) - - // Sleep for 30 seconds on startup - logInfo("broadcaster", "Sleeping for 30 seconds before first broadcast", nil) - select { - case <-time.After(30 * time.Second): - // Continue after sleep - case <-shutdown: - logInfo("broadcaster", "Shutdown signal received during startup sleep", nil) - return - } - - // Run immediately after initial sleep - checkAndBroadcast(dryRun) - - // Then run on interval - ticker := time.NewTicker(BROADCAST_INTERVAL) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - checkAndBroadcast(dryRun) - case <-shutdown: - logInfo("broadcaster", "Shutting down broadcaster", nil) - return - } - } -} - -// checkAndBroadcast checks if there are any unsummarized articles before broadcasting -func checkAndBroadcast(dryRun bool) { - // Check if there are any unsummarized articles - articles := loadArticles() - unsummarizedCount := 0 - - for _, article := range articles { - if article.Summary == "" || article.Importance == 0 { - unsummarizedCount++ - } - } - - if unsummarizedCount > 0 { - logInfo("broadcaster", "Postponing broadcast - waiting for articles to be summarized", map[string]interface{}{ - "unsummarizedCount": unsummarizedCount, - "totalArticles": len(articles), - }) - return - } - - // No unsummarized articles, proceed with broadcast - broadcastWithRedundancyCheck(dryRun) -} - -// broadcastWithRedundancyCheck attempts to find a non-redundant article to broadcast -func broadcastWithRedundancyCheck(dryRun bool) { - articles := loadArticles() - now := time.Now() - cutoff := now.Add(-24 * time.Hour) // 24-hour window for articles - - var candidates []Article - for _, a := range articles { - // Only include articles that haven't been broadcast, have summaries, and are fresh - if a.Summary != "" && a.BroadcastTime.IsZero() && a.FirstSeen.After(cutoff) { - candidates = append(candidates, a) - } - } - - if len(candidates) == 0 { - logInfo("broadcaster", "No fresh articles found for broadcasting", map[string]interface{}{ - "totalArticles": len(articles), - }) - return - } - - // Sort by importance - sort.Slice(candidates, func(i, j int) bool { - return candidates[i].Importance > candidates[j].Importance - }) - - // Get recent broadcasts - recentBroadcasts := getRecentBroadcasts(12) - - // Try candidates until we find a non-redundant one - for i := 0; i < len(candidates); i++ { - candidate := candidates[i] - - // Check if this candidate would be redundant - isRedundant, reason, err := checkRedundancy(candidate, recentBroadcasts) - if err != nil { - logInfo("redundancy", "Error checking redundancy, proceeding anyway", map[string]interface{}{ - "error": err.Error(), - "id": candidate.ID, - }) - // Continue with this candidate despite the error - broadcastArticle(candidate, dryRun) - return - } - - if isRedundant { - logInfo("redundancy", "Article deemed redundant, marking as 'already broadcast'", map[string]interface{}{ - "id": candidate.ID, - "summary": candidate.Summary, - "reason": reason, - }) - - // Mark as broadcast with special timestamp to prevent future selection - candidate.BroadcastTime = time.Unix(1, 0) // epoch + 1 second - updateArticle(candidate) - continue // Try next candidate - } - - // Not redundant, proceed with this candidate - logInfo("redundancy", "Article passed redundancy check", map[string]interface{}{ - "id": candidate.ID, - "candidateNumber": i + 1, - }) - broadcastArticle(candidate, dryRun) - return - } - - // If we got here, all candidates were redundant - logInfo("broadcaster", "All candidates were deemed redundant, no broadcast", map[string]interface{}{ - "candidatesChecked": len(candidates), - }) -} - -// getRecentBroadcasts retrieves the n most recently broadcast articles -func getRecentBroadcasts(n int) []Article { - rows, err := db.Query(` - SELECT link, title, description, published, source, firstseen, seen, summary, importance, id, broadcastTime - FROM articles - WHERE broadcastTime IS NOT NULL AND broadcastTime > 1 - ORDER BY broadcastTime DESC - LIMIT ? - `, n) - - if err != nil { - logInfo("db", "Error retrieving recent broadcasts", map[string]interface{}{ - "error": err.Error(), - }) - return []Article{} - } - defer rows.Close() - - var broadcasts []Article - for rows.Next() { - var a Article - var seen sql.NullTime - var broadcastTime sql.NullTime - - err := rows.Scan( - &a.Link, &a.Title, &a.Description, &a.Published, &a.Source, - &a.FirstSeen, &seen, &a.Summary, &a.Importance, &a.ID, &broadcastTime, - ) - - if err != nil { - logInfo("db", "Error scanning broadcast article", map[string]interface{}{ - "error": err.Error(), - }) - continue - } - - if seen.Valid { - a.Seen = seen.Time - } - - if broadcastTime.Valid { - a.BroadcastTime = broadcastTime.Time - } - - broadcasts = append(broadcasts, a) - } - - return broadcasts -} - -// checkRedundancy asks the LLM if the candidate message is redundant compared to recent broadcasts -func checkRedundancy(candidate Article, recentBroadcasts []Article) (bool, string, error) { - if len(recentBroadcasts) == 0 { - return false, "No previous broadcasts to compare", nil - } - - // Get the abbreviated source name - sourceAbbr := sourceAbbreviations[candidate.Source] - if sourceAbbr == "" { - if len(candidate.Source) >= 3 { - sourceAbbr = candidate.Source[:3] - } else { - sourceAbbr = candidate.Source - } - } - - // Format the candidate message as it would be broadcast - ts := time.Now().Format("15:04 MST") - candidateMsg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, strings.TrimSpace(candidate.Summary)) - - // Format the recent broadcasts - var recentMessages []string - for i, article := range recentBroadcasts { - sourceAbbr := sourceAbbreviations[article.Source] - if sourceAbbr == "" { - if len(article.Source) >= 3 { - sourceAbbr = article.Source[:3] - } else { - sourceAbbr = article.Source - } - } - - broadcastTime := article.BroadcastTime.Format("15:04 MST") - msg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", broadcastTime, sourceAbbr, strings.TrimSpace(article.Summary)) - recentMessages = append(recentMessages, fmt.Sprintf("%d. %s", i+1, msg)) - } - - // Create the LLM prompt - prompt := fmt.Sprintf(`Determine if the following message would be redundant to broadcast given the previous broadcasts. - -CANDIDATE MESSAGE TO BROADCAST: -%s - -PREVIOUS BROADCASTS (most recent first): -%s - -Is the candidate message redundant with any of the previously broadcast messages? A message is considered redundant if: -1. It covers substantially the same event or news story as a previous message -2. It doesn't add significant new information beyond what was already broadcast -3. It's about the same persons/entities in the same context as a previous message - -You MUST respond ONLY with a valid JSON object having the following structure, and nothing else: -{"is_redundant": true/false, "reason": "explanation of your decision"} - -DO NOT include any other text, explanation, or formatting in your response, ONLY the JSON.`, - candidateMsg, - strings.Join(recentMessages, "\n"), - ) - - // Get the URL for the Ollama API - ollamaURL := os.Getenv("OLLAMA_URL") - if ollamaURL == "" { - ollamaURL = "http://localhost:11434" // Default Ollama server URL - } - - // Use qwen3:32b model for consistency - ollamaModel := "qwen3:32b" - - payload := map[string]interface{}{ - "model": ollamaModel, - "messages": []map[string]string{ - {"role": "system", "content": "You are a news redundancy analyzer. Your ONLY job is to determine if a news message is redundant compared to previously broadcast messages. You MUST respond ONLY with a valid JSON object in the format {\"is_redundant\": boolean, \"reason\": \"string\"} and nothing else."}, - {"role": "user", "content": prompt}, - }, - "stream": false, - } - - logInfo("redundancy", "Checking if article is redundant with previous broadcasts", map[string]interface{}{ - "candidateID": candidate.ID, - "recentBroadcasts": len(recentBroadcasts), - }) - - // Convert payload to JSON - payloadBytes, err := json.Marshal(payload) - if err != nil { - return false, "", fmt.Errorf("error marshaling request: %v", err) - } - - // Create the request - req, err := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(payloadBytes)) - if err != nil { - return false, "", fmt.Errorf("error creating request: %v", err) - } - req.Header.Set("Content-Type", "application/json") - - // Send the request with timeout and retry mechanism - client := &http.Client{Timeout: 30 * time.Second} - - // Try up to 3 times with backoff - var resp *http.Response - var respErr error - - for attempt := 1; attempt <= 3; attempt++ { - resp, respErr = client.Do(req) - if respErr == nil { - break - } - - logInfo("redundancy", "Request attempt failed", map[string]interface{}{ - "attempt": attempt, - "error": respErr.Error(), - }) - - if attempt < 3 { - // Wait with exponential backoff before retry - backoffTime := time.Duration(attempt) * 2 * time.Second - time.Sleep(backoffTime) - } - } - - // If all attempts failed, proceed with broadcasting anyway - if respErr != nil { - logInfo("redundancy", "All request attempts failed", map[string]interface{}{ - "error": respErr.Error(), - }) - return false, "Failed to check redundancy: " + respErr.Error(), nil - } - defer resp.Body.Close() - - // Read the response - bodyBytes, err := io.ReadAll(resp.Body) - if err != nil { - logInfo("redundancy", "Error reading response", map[string]interface{}{ - "error": err.Error(), - }) - return false, "Failed to read redundancy check response", nil - } - - // Parse the response - var result struct { - Message struct { - Content string `json:"content"` - } `json:"message"` - } - if err := json.Unmarshal(bodyBytes, &result); err != nil { - logInfo("redundancy", "Error parsing response JSON", map[string]interface{}{ - "error": err.Error(), - "body": string(bodyBytes), - }) - return false, "Failed to parse redundancy check response", nil - } - - // Extract the JSON part from the response - content := result.Message.Content - content = strings.TrimSpace(content) - - // Handle case where the response might be wrapped in markdown code blocks - if strings.HasPrefix(content, "```json") { - content = strings.TrimPrefix(content, "```json") - content = strings.TrimPrefix(content, "```") - if idx := strings.Index(content, "```"); idx > 0 { - content = content[:idx] - } - } else if strings.HasPrefix(content, "```") { - content = strings.TrimPrefix(content, "```") - if idx := strings.Index(content, "```"); idx > 0 { - content = content[:idx] - } - } - - content = strings.TrimSpace(content) - - // Check for JSON syntax - find first { and last } - firstBrace := strings.Index(content, "{") - lastBrace := strings.LastIndex(content, "}") - - if firstBrace >= 0 && lastBrace > firstBrace { - // Extract what appears to be valid JSON - content = content[firstBrace : lastBrace+1] - } else { - logInfo("redundancy", "Response doesn't contain valid JSON structure", map[string]interface{}{ - "content": content, - }) - // Default to non-redundant with explanation - return false, "Could not parse LLM response as JSON", nil - } - - // Parse the redundancy check result - var redundancyResult RedundancyCheckResponse - if err := json.Unmarshal([]byte(content), &redundancyResult); err != nil { - logInfo("redundancy", "Failed to unmarshal JSON response", map[string]interface{}{ - "error": err.Error(), - "content": content, - }) - // Default to non-redundant with explanation - return false, "Error parsing redundancy check result", nil - } - - logInfo("redundancy", "Redundancy check result", map[string]interface{}{ - "isRedundant": redundancyResult.IsRedundant, - "reason": redundancyResult.Reason, - "candidateID": candidate.ID, - }) - - return redundancyResult.IsRedundant, redundancyResult.Reason, nil -} - -// broadcastArticle broadcasts the chosen article -func broadcastArticle(chosen Article, dryRun bool) { - // Get the abbreviated source name - sourceAbbr := sourceAbbreviations[chosen.Source] - if sourceAbbr == "" { - // Default to first 3 characters if no abbreviation defined - if len(chosen.Source) >= 3 { - sourceAbbr = chosen.Source[:3] - } else { - sourceAbbr = chosen.Source - } - } - - ts := time.Now().Format("15:04 MST") - msg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, strings.TrimSpace(chosen.Summary)) - - // Ensure message is under 200 characters - if len(msg) > 200 { - // Calculate max summary length including timestamp, source prefix, AI/LLM prefix, and ellipsis - maxSummaryLen := 200 - len(ts) - len(sourceAbbr) - len(": [] [AI/LLM] ") - 3 // 3 for "..." - truncatedSummary := strings.TrimSpace(chosen.Summary)[:maxSummaryLen] + "..." - msg = fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, truncatedSummary) - logInfo("broadcaster", "Message truncated to fit 200 character limit", map[string]interface{}{ - "originalLength": len(chosen.Summary), - "truncatedLength": maxSummaryLen, - }) - } - - // Print info about what will be broadcast - cmdStr := fmt.Sprintf("meshtastic --sendtext \"%s\"", msg) - logInfo("broadcaster", "Preparing to broadcast", map[string]interface{}{ - "message": msg, - "length": len(msg), - "command": cmdStr, - "dryRun": dryRun, - "id": chosen.ID, - "importance": chosen.Importance, - "source": chosen.Source, - "sourceAbbr": sourceAbbr, - }) - - // Wait 30 seconds before broadcasting to allow time to see the message - logInfo("broadcaster", "Waiting 30 seconds before broadcasting...", nil) - time.Sleep(30 * time.Second) - - // Update broadcast time and save to database - chosen.BroadcastTime = time.Now() - updateArticle(chosen) - - logInfo("broadcaster", "Set broadcast time for article", map[string]interface{}{ - "id": chosen.ID, - "summary": chosen.Summary, - "importance": chosen.Importance, - "broadcastTime": chosen.BroadcastTime.Format(time.RFC3339), - }) - - output, _ := json.MarshalIndent(chosen, "", " ") - fmt.Println(string(output)) - - if !dryRun { - logInfo("broadcaster", "Broadcasting message", map[string]interface{}{ - "message": msg, - "length": len(msg), - }) - - cmd := exec.Command("meshtastic", "--sendtext", msg) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - if err := cmd.Run(); err != nil { - logInfo("broadcaster", "Broadcast failed", map[string]interface{}{ - "error": err.Error(), - "id": chosen.ID, - }) - } else { - logInfo("broadcaster", "Broadcast success", map[string]interface{}{ - "id": chosen.ID, - }) - } - } else { - // In dry-run mode, just print the command - logInfo("broadcaster", "DRY RUN - would run command", map[string]interface{}{ - "command": cmdStr, - "id": chosen.ID, - }) - } -} - -func setupLogging() { - var err error - logFile, err = os.Create(logPath) - if err != nil { - log.Fatalf("could not create log file: %v", err) - } -} - -func flushLog() { - logMutex.Lock() - defer logMutex.Unlock() - - if logFile == nil { - return - } - - enc := json.NewEncoder(logFile) - enc.SetIndent("", " ") - _ = enc.Encode(logData) - logFile.Close() -} - -func logEvent(event string, details map[string]interface{}) { - logMutex.Lock() - defer logMutex.Unlock() - - details["timestamp"] = time.Now() - details["event"] = event - - logData = append(logData, LogEntry{ - Timestamp: time.Now(), - Event: event, - Details: details, - }) -} - -func setupDatabase() error { - _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS articles ( - link TEXT PRIMARY KEY, - title TEXT NOT NULL, - description TEXT, - published TIMESTAMP NOT NULL, - source TEXT NOT NULL, - firstseen TIMESTAMP NOT NULL, - seen TIMESTAMP, - summary TEXT, - importance INTEGER, - id TEXT, - broadcastTime TIMESTAMP - ) - `) - if err != nil { - return err - } - - // Check if columns exist - rows, err := db.Query(`PRAGMA table_info(articles)`) - if err != nil { - return err - } - defer rows.Close() - - hasIDColumn := false - hasBroadcastTimeColumn := false - for rows.Next() { - var cid, notnull, pk int - var name, type_name string - var dflt_value interface{} - if err := rows.Scan(&cid, &name, &type_name, ¬null, &dflt_value, &pk); err != nil { - return err - } - if name == "id" { - hasIDColumn = true - } - if name == "broadcastTime" { - hasBroadcastTimeColumn = true - } - } - - // Add missing columns if needed - if !hasIDColumn { - _, err = db.Exec(`ALTER TABLE articles ADD COLUMN id TEXT`) - if err != nil { - return err - } - } - - if !hasBroadcastTimeColumn { - _, err = db.Exec(`ALTER TABLE articles ADD COLUMN broadcastTime TIMESTAMP`) - if err != nil { - return err - } - } - - return nil -} - -// Generate a deterministic ID from a URL -func generateID(url string) string { - hash := sha256.Sum256([]byte(url)) - return hex.EncodeToString(hash[:])[:26] // Return first 26 chars of the hash -} - -func saveArticle(article Article) error { - _, err := db.Exec(` - INSERT OR IGNORE INTO articles - (link, title, description, published, source, firstseen, seen, summary, importance, id, broadcastTime) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `, - article.Link, article.Title, article.Description, article.Published, - article.Source, article.FirstSeen, article.Seen, article.Summary, article.Importance, article.ID, - article.BroadcastTime) - - if err != nil { - logEvent("db_insert_error", map[string]interface{}{ - "article": article.Link, - "id": article.ID, - "error": err.Error(), - }) - } - return err -} - -func updateArticle(article Article) error { - _, err := db.Exec(` - UPDATE articles SET - title = ?, - description = ?, - published = ?, - source = ?, - firstseen = ?, - seen = ?, - summary = ?, - importance = ?, - id = ?, - broadcastTime = ? - WHERE link = ? - `, - article.Title, article.Description, article.Published, article.Source, - article.FirstSeen, article.Seen, article.Summary, article.Importance, article.ID, - article.BroadcastTime, article.Link) - - if err != nil { - logEvent("db_update_error", map[string]interface{}{ - "article": article.Link, - "id": article.ID, - "error": err.Error(), - }) - } - return err -} - -func loadArticles() map[string]Article { - articles := make(map[string]Article) - - rows, err := db.Query(` - SELECT link, title, description, published, source, firstseen, seen, summary, importance, id, broadcastTime - FROM articles - `) - if err != nil { - logEvent("db_query_error", map[string]interface{}{ - "error": err.Error(), - }) - return articles - } - defer rows.Close() - - for rows.Next() { - var a Article - var seen sql.NullTime - var broadcastTime sql.NullTime - - err := rows.Scan( - &a.Link, &a.Title, &a.Description, &a.Published, &a.Source, - &a.FirstSeen, &seen, &a.Summary, &a.Importance, &a.ID, &broadcastTime, - ) - - if err != nil { - logEvent("db_scan_error", map[string]interface{}{ - "error": err.Error(), - }) - continue - } - - if seen.Valid { - a.Seen = seen.Time - } - - if broadcastTime.Valid { - a.BroadcastTime = broadcastTime.Time - } - - articles[a.Link] = a - } - - return articles -} - -func fetchAllFeedsParallel(now time.Time) []Article { - type fetchResult struct { - Source string - URL string - Feed *gofeed.Feed - Err error - HTTPStatus int - Duration time.Duration - } - - var wg sync.WaitGroup - results := make(chan fetchResult, len(feeds)) - - for source, url := range feeds { - wg.Add(1) - go func(source, url string) { - defer wg.Done() - - start := time.Now() - fp := gofeed.NewParser() - client := &http.Client{Timeout: 2 * time.Second} - var httpStatus int - - resp, err := client.Get(url) - var feed *gofeed.Feed - if err == nil { - httpStatus = resp.StatusCode - defer resp.Body.Close() - feed, err = fp.Parse(resp.Body) - } - - duration := time.Since(start) - - details := map[string]interface{}{ - "source": source, - "url": url, - "status": httpStatus, - "duration": duration.Seconds(), - } - if err != nil { - details["error"] = err.Error() - } - logEvent("rss_fetch_result", details) - - if err != nil { - fmt.Fprintf(os.Stderr, "[rss] FAIL %-15s (%s) [%.2fs] ERR: %v\n", source, url, duration.Seconds(), err) - results <- fetchResult{Source: source, URL: url, Err: err, Duration: duration, HTTPStatus: httpStatus} - return - } - - fmt.Fprintf(os.Stderr, "[rss] OK %-15s (%s) [%.2fs] HTTP %d, items: %d\n", - source, url, duration.Seconds(), httpStatus, len(feed.Items)) - - results <- fetchResult{ - Source: source, - URL: url, - Feed: feed, - Duration: duration, - HTTPStatus: httpStatus, - } - }(source, url) - } - - wg.Wait() - close(results) - - var all []Article - for result := range results { - if result.Err != nil || result.Feed == nil { - continue - } - for _, item := range result.Feed.Items { - published := now - if item.PublishedParsed != nil { - published = *item.PublishedParsed - } - all = append(all, Article{ - Title: item.Title, - Description: item.Description, - Link: item.Link, - Published: published, - Source: result.Source, - FirstSeen: now, - ID: generateID(item.Link), - }) - } - } - return all -} - -type SummaryResult struct { - ID string `json:"id"` - Summary string `json:"summary"` - Importance int `json:"importance"` -} - -func processArticleBatch(ollamaURL, model string, articles []Article) (map[string]SummaryResult, error) { - // Prepare batch prompt - var batchPrompt strings.Builder - batchPrompt.WriteString(ARTICLES_PROMPT) - - for i, article := range articles { - batchPrompt.WriteString(fmt.Sprintf("Article %d (ID: %s):\n", i+1, article.ID)) - batchPrompt.WriteString(fmt.Sprintf("Title: %s\n", article.Title)) - batchPrompt.WriteString(fmt.Sprintf("Content: %s\n\n", article.Description)) - } - - payload := map[string]interface{}{ - "model": model, - "messages": []map[string]string{ - {"role": "system", "content": SYSTEM_PROMPT}, - {"role": "user", "content": batchPrompt.String()}, - }, - "stream": false, - } - - // Log request details - requestDetails := map[string]interface{}{ - "model": model, - "batchSize": len(articles), - "apiEndpoint": ollamaURL + "/api/chat", - } - - for i, article := range articles { - requestDetails[fmt.Sprintf("article%d", i+1)] = article.ID - } - - logInfo("api", "Sending request to Ollama", requestDetails) - startTime := time.Now() - - // Pretty print request JSON for logging - payloadBytes, err := json.MarshalIndent(payload, "", " ") - if err != nil { - logInfo("api", "JSON encoding error", map[string]interface{}{ - "error": err.Error(), - }) - return nil, err - } - - req, err := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(payloadBytes)) - if err != nil { - logInfo("api", "Request creation error", map[string]interface{}{ - "error": err.Error(), - }) - return nil, err - } - req.Header.Set("Content-Type", "application/json") - - // Longer timeout for larger batches - timeout := 30 * time.Second - if len(articles) > 1 { - timeout = 120 * time.Second - } - client := &http.Client{Timeout: timeout} - - resp, err := client.Do(req) - if err != nil { - logInfo("api", "API call failed", map[string]interface{}{ - "error": err.Error(), - "duration": time.Since(startTime).String(), - }) - return nil, err - } - defer resp.Body.Close() - - bodyBytes, _ := io.ReadAll(resp.Body) - apiDuration := time.Since(startTime) - - // Pretty print response JSON for debugging - var prettyJSON bytes.Buffer - err = json.Indent(&prettyJSON, bodyBytes, "", " ") - if err != nil { - // If we can't pretty print, just show as-is - logInfo("api", "Pretty print error", map[string]interface{}{ - "error": err.Error(), - }) - } - - var res struct { - Message struct { - Content string `json:"content"` - } `json:"message"` - } - if err := json.Unmarshal(bodyBytes, &res); err != nil { - logInfo("api", "JSON unmarshal error", map[string]interface{}{ - "error": err.Error(), - "duration": apiDuration.String(), - }) - return nil, err - } - - if res.Message.Content == "" { - logInfo("api", "Empty response content", map[string]interface{}{ - "duration": apiDuration.String(), - }) - return nil, fmt.Errorf("no content in response") - } - - // Parse the JSON response - handle both single object and array responses - var summaries []SummaryResult - - // Try to parse as array first - if err := json.Unmarshal([]byte(res.Message.Content), &summaries); err != nil { - // Try to extract JSON from the content in case the model wrapped it in markdown or text - contentStr := res.Message.Content - jsonStart := strings.Index(contentStr, "[") - jsonEnd := strings.LastIndex(contentStr, "]") - - if jsonStart >= 0 && jsonEnd > jsonStart { - // Try to parse as JSON array - jsonStr := contentStr[jsonStart : jsonEnd+1] - if err := json.Unmarshal([]byte(jsonStr), &summaries); err != nil { - // Try parsing as a single object for single article case - if len(articles) == 1 { - // Look for { } instead - jsonStart = strings.Index(contentStr, "{") - jsonEnd = strings.LastIndex(contentStr, "}") - - if jsonStart >= 0 && jsonEnd > jsonStart { - jsonStr := contentStr[jsonStart : jsonEnd+1] - var singleResult struct { - Summary string `json:"summary"` - Importance int `json:"importance"` - } - - if err := json.Unmarshal([]byte(jsonStr), &singleResult); err == nil { - // Create a SummaryResult with the article's ID - summaries = []SummaryResult{{ - ID: articles[0].ID, - Summary: singleResult.Summary, - Importance: singleResult.Importance, - }} - } else { - logInfo("api", "JSON parse error for single object", map[string]interface{}{ - "error": err.Error(), - "duration": apiDuration.String(), - }) - return nil, err - } - } else { - logInfo("api", "Could not find JSON object in response", map[string]interface{}{ - "duration": apiDuration.String(), - }) - return nil, fmt.Errorf("could not find JSON object in response") - } - } else { - logInfo("api", "JSON parse error for array", map[string]interface{}{ - "error": err.Error(), - "duration": apiDuration.String(), - }) - return nil, err - } - } - } else { - logInfo("api", "Could not find JSON array in response", map[string]interface{}{ - "duration": apiDuration.String(), - }) - return nil, fmt.Errorf("could not find JSON array in response") - } - } - - // Convert to map for easier lookup - resultMap := make(map[string]SummaryResult) - for _, summary := range summaries { - resultMap[summary.ID] = summary - } - - logInfo("api", "API call successful", map[string]interface{}{ - "duration": apiDuration.String(), - "durationMs": apiDuration.Milliseconds(), - "resultsCount": len(resultMap), - "msPerArticle": apiDuration.Milliseconds() / int64(len(articles)), - "statusCode": resp.StatusCode, - }) - - return resultMap, nil -} - -// logInfo logs a structured message to both console and log file -func logInfo(component, message string, data map[string]interface{}) { - // Create a copy of the data map to avoid modifying the original - logData := make(map[string]interface{}) - for k, v := range data { - logData[k] = v - } - - // Add component and message to the log data - logData["component"] = component - logData["message"] = message - - // Format console output: timestamp component: message key1=val1 key2=val2 - timestamp := time.Now().Format("15:04:05.000") - console := fmt.Sprintf("[%s] [%s] %s", timestamp, component, message) - - // Add key-value pairs to console output - keys := make([]string, 0, len(data)) - for k := range data { - keys = append(keys, k) - } - sort.Strings(keys) // Sort keys for consistent output - - for _, k := range keys { - v := data[k] - console += fmt.Sprintf(" %s=%v", k, v) - } - - // Print to console - fmt.Fprintln(os.Stderr, console) - - // Log to structured log file - logEvent("info", logData) -} diff --git a/models.go b/models.go new file mode 100644 index 0000000..1dccf46 --- /dev/null +++ b/models.go @@ -0,0 +1,97 @@ +package main + +import ( + "sync" + "time" +) + +type Article struct { + Title string `json:"title"` + Description string `json:"description"` + Link string `json:"link"` + Published time.Time `json:"published"` // When we first saw the article + OriginalDate time.Time `json:"originalDate"` // Original publication date from the feed + Source string `json:"source"` + FirstSeen time.Time `json:"firstseen"` + Seen time.Time `json:"seen"` + Summary string `json:"summary"` + Importance int `json:"importance"` + ID string `json:"id"` + BroadcastTime time.Time `json:"broadcastTime,omitempty"` +} + +type LogEntry struct { + Timestamp time.Time `json:"timestamp"` + Event string `json:"event"` + Details map[string]interface{} `json:"details"` +} + +// LogRingBuffer holds the most recent log entries in a circular buffer +type LogRingBuffer struct { + entries []LogEntry + size int + position int + count int + mutex sync.Mutex +} + +// Data structure for web UI +type DashboardData struct { + LastUpdated string + TotalArticles int + TotalBroadcast int + NewInLastHour int + UnsummarizedCount int + NextUp []Article + History []Article + RecentLogs []LogEntry +} + +const ( + dbPath = "articles.db" + + // LLM prompts + ARTICLES_PROMPT = `Summarize each of these news items in under 165 + characters, optimizing for information density (common news headline + abbreviations OK) and rate their importance from 1 to 100. + +100 means most important; 1 means least important. + +Never rate over 90 unless it is a massive event such as: war outbreak, +revolution, death of a head of state, large-scale natural disaster, mass +casualty terrorism, etc. + +IMPORTANT: Rank any headlines primarily promoting commercial products or +services as 1 (lowest importance). + +Rank any article with a headline that poses a question without providing an +answer (as an attempt to lure a reader into clicking a link) as 1 (lowest +importance). + +IMPORTANT: Boost the importance score by 10-20 points for breaking news that is less +than 60 minutes old based on its original publication date (which is provided for each article). +This helps ensure timely distribution of very recent news. + +For each article, return a JSON object with "id", "summary", and "importance" +fields. Return your response as a JSON array of objects like: [{"id": +"article_id", "summary": "...", "importance": 42}, ...] + +Here are the articles: +` + + SYSTEM_PROMPT = "You are a news analyst." + BATCH_SIZE = 10 + MAX_INDIVIDUAL_PROCESSING = 50 + + // Timing constants + RSS_CHECK_INTERVAL = 15 * time.Minute + SUMMARIZE_INTERVAL = 10 * time.Second + BROADCAST_INTERVAL = 1 * time.Hour + STARTUP_DELAY = 60 * time.Second // Delay before first broadcast + BROADCAST_PREPARATION_DELAY = 30 * time.Second // Delay before executing broadcast command + ARTICLE_FRESHNESS_WINDOW = 24 * time.Hour // Time window for considering articles fresh + + // Other constants + MAX_MESSAGE_LENGTH = 200 // Maximum length of broadcast messages in characters + MAX_LOG_ENTRIES = 1000 // Maximum number of log entries to keep in memory +) diff --git a/rss.go b/rss.go new file mode 100644 index 0000000..9056a62 --- /dev/null +++ b/rss.go @@ -0,0 +1,223 @@ +package main + +import ( + "fmt" + "net/http" + "os" + "sync" + "time" + + "github.com/mmcdole/gofeed" +) + +// Map of source names to their abbreviations +var sourceAbbreviations = map[string]string{ + "BBC": "BBC", + "CNN": "CNN", + "NYTimes": "NYT", + "Guardian": "Grd", + "Al Jazeera": "AlJ", + "NBC": "NBC", + "ABC": "ABC", + "CBS": "CBS", + "Sky News": "Sky", + "Time": "Time", + "NPR": "NPR", + "Deutsche Welle": "DW", + "France 24": "F24", + "The Independent": "Ind", + "Washington Post": "WaPo", + "WSJ": "WSJ", +} + +var feeds = map[string]string{ + "BBC": "https://feeds.bbci.co.uk/news/world/rss.xml", + "CNN": "http://rss.cnn.com/rss/edition.rss", + "NYTimes": "https://rss.nytimes.com/services/xml/rss/nyt/World.xml", + "Guardian": "https://www.theguardian.com/world/rss", + "Al Jazeera": "https://www.aljazeera.com/xml/rss/all.xml", + "NBC": "http://feeds.nbcnews.com/nbcnews/public/news", + "ABC": "https://abcnews.go.com/abcnews/topstories", + "CBS": "https://www.cbsnews.com/latest/rss/world", + "Sky News": "https://feeds.skynews.com/feeds/rss/world.xml", + "Time": "https://time.com/feed/", + "NPR": "https://feeds.npr.org/1001/rss.xml", + "Deutsche Welle": "https://rss.dw.com/rdf/rss-en-world", + "France 24": "https://www.france24.com/en/rss", + "The Independent": "https://www.independent.co.uk/news/world/rss", + "Washington Post": "https://feeds.washingtonpost.com/rss/world", + "WSJ": "https://feeds.a.dj.com/rss/RSSWorldNews.xml", +} + +// Find the maximum abbreviation length +func getMaxAbbreviationLength() int { + maxLen := 0 + for _, abbr := range sourceAbbreviations { + if len(abbr) > maxLen { + maxLen = len(abbr) + } + } + return maxLen +} + +// rssFeedChecker checks RSS feeds every 15 minutes and adds new articles to the database +func rssFeedChecker(shutdown chan struct{}, ollamaURL, ollamaModel string) { + fmt.Fprintf(os.Stderr, "[rss] Starting RSS feed checker (interval: %s)\n", RSS_CHECK_INTERVAL) + + // Run immediately on startup + checkRSSFeeds() + + // Then run on interval + ticker := time.NewTicker(RSS_CHECK_INTERVAL) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + checkRSSFeeds() + case <-shutdown: + fmt.Fprintf(os.Stderr, "[rss] Shutting down RSS feed checker\n") + return + } + } +} + +// checkRSSFeeds fetches all RSS feeds and adds new articles to the database +func checkRSSFeeds() { + articles := loadArticles() + oldCount := len(articles) + + logInfo("rss", "Checking RSS feeds", map[string]interface{}{ + "time": time.Now().Format("15:04:05"), + "articlesBeforeFetch": oldCount, + }) + + now := time.Now() + newArticles := fetchAllFeedsParallel(now) + newCount := 0 + + for _, a := range newArticles { + if _, ok := articles[a.Link]; !ok { + if a.ID == "" { + a.ID = generateID(a.Link) + } + articles[a.Link] = a + saveArticle(a) + newCount++ + logInfo("new", fmt.Sprintf("Found new article: %s", a.Title), map[string]interface{}{ + "id": a.ID, + "source": a.Source, + "published": a.Published.Format(time.RFC3339), + }) + } + } + + logInfo("rss", "Completed RSS check", map[string]interface{}{ + "articlesBeforeFetch": oldCount, + "articlesAfterFetch": oldCount + newCount, + "newArticles": newCount, + }) +} + +func fetchAllFeedsParallel(now time.Time) []Article { + type fetchResult struct { + Source string + URL string + Feed *gofeed.Feed + Err error + HTTPStatus int + Duration time.Duration + } + + var wg sync.WaitGroup + results := make(chan fetchResult, len(feeds)) + + for source, url := range feeds { + wg.Add(1) + go func(source, url string) { + defer wg.Done() + + start := time.Now() + fp := gofeed.NewParser() + client := &http.Client{Timeout: 20 * time.Second} + var httpStatus int + + resp, err := client.Get(url) + var feed *gofeed.Feed + if err == nil { + httpStatus = resp.StatusCode + defer resp.Body.Close() + feed, err = fp.Parse(resp.Body) + } + + duration := time.Since(start) + + details := map[string]interface{}{ + "source": source, + "url": url, + "status": httpStatus, + "duration": duration.Seconds(), + } + if err != nil { + details["error"] = err.Error() + } + logEvent("rss_fetch_result", details) + + if err != nil { + fmt.Fprintf(os.Stderr, "[rss] FAIL %-15s (%s) [%.2fs] ERR: %v\n", source, url, duration.Seconds(), err) + results <- fetchResult{Source: source, URL: url, Err: err, Duration: duration, HTTPStatus: httpStatus} + return + } + + fmt.Fprintf(os.Stderr, "[rss] OK %-15s (%s) [%.2fs] HTTP %d, items: %d\n", + source, url, duration.Seconds(), httpStatus, len(feed.Items)) + + results <- fetchResult{ + Source: source, + URL: url, + Feed: feed, + Duration: duration, + HTTPStatus: httpStatus, + } + }(source, url) + } + + wg.Wait() + close(results) + + var all []Article + for result := range results { + if result.Err != nil || result.Feed == nil { + continue + } + for _, item := range result.Feed.Items { + published := now + // Set published to the current time if not available from feed + if item.PublishedParsed != nil { + published = *item.PublishedParsed + } + + // Set originalDate to the feed's publication date if available + originalDate := published + + // Skip articles older than 7 days based on the feed's publication date + cutoffDate := now.AddDate(0, 0, -7) // 7 days ago + if published.Before(cutoffDate) { + // Skip this article as it's older than 7 days + continue + } + + all = append(all, Article{ + Title: item.Title, + Description: item.Description, + Link: item.Link, + Published: now, // When we first saw the article + OriginalDate: originalDate, // Original publication date from the feed + Source: result.Source, + FirstSeen: now, + ID: generateID(item.Link), + }) + } + } + return all +} diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..b20786b --- /dev/null +++ b/storage.go @@ -0,0 +1,486 @@ +package main + +import ( + "crypto/sha256" + "database/sql" + "encoding/hex" + "encoding/json" + "fmt" + "os" + "sort" + "time" +) + +// NewLogRingBuffer creates a new ring buffer with the specified capacity +func NewLogRingBuffer(capacity int) *LogRingBuffer { + return &LogRingBuffer{ + entries: make([]LogEntry, capacity), + size: capacity, + position: 0, + count: 0, + } +} + +// Add adds a log entry to the ring buffer +func (rb *LogRingBuffer) Add(entry LogEntry) { + rb.mutex.Lock() + defer rb.mutex.Unlock() + + rb.entries[rb.position] = entry + rb.position = (rb.position + 1) % rb.size + if rb.count < rb.size { + rb.count++ + } +} + +// GetAll returns all entries in the buffer from newest to oldest +func (rb *LogRingBuffer) GetAll() []LogEntry { + rb.mutex.Lock() + defer rb.mutex.Unlock() + + result := make([]LogEntry, rb.count) + + if rb.count == 0 { + return result + } + + // Copy entries in reverse chronological order (newest first) + pos := rb.position - 1 + if pos < 0 { + pos = rb.size - 1 + } + + for i := 0; i < rb.count; i++ { + result[i] = rb.entries[pos] + pos-- + if pos < 0 { + pos = rb.size - 1 + } + } + + return result +} + +func setupDatabase() error { + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS articles ( + link TEXT PRIMARY KEY, + title TEXT NOT NULL, + description TEXT, + published TIMESTAMP NOT NULL, + originalDate TIMESTAMP, + source TEXT NOT NULL, + firstseen TIMESTAMP NOT NULL, + seen TIMESTAMP, + summary TEXT, + importance INTEGER, + id TEXT, + broadcastTime TIMESTAMP + ) + `) + if err != nil { + return err + } + + // Check if columns exist + rows, err := db.Query(`PRAGMA table_info(articles)`) + if err != nil { + return err + } + defer rows.Close() + + hasIDColumn := false + hasBroadcastTimeColumn := false + hasOriginalDateColumn := false + + for rows.Next() { + var cid, notnull, pk int + var name, type_name string + var dflt_value interface{} + if err := rows.Scan(&cid, &name, &type_name, ¬null, &dflt_value, &pk); err != nil { + return err + } + if name == "id" { + hasIDColumn = true + } + if name == "broadcastTime" { + hasBroadcastTimeColumn = true + } + if name == "originalDate" { + hasOriginalDateColumn = true + } + } + + // Add missing columns if needed + if !hasIDColumn { + _, err = db.Exec(`ALTER TABLE articles ADD COLUMN id TEXT`) + if err != nil { + return err + } + } + + if !hasBroadcastTimeColumn { + _, err = db.Exec(`ALTER TABLE articles ADD COLUMN broadcastTime TIMESTAMP`) + if err != nil { + return err + } + } + + if !hasOriginalDateColumn { + _, err = db.Exec(`ALTER TABLE articles ADD COLUMN originalDate TIMESTAMP`) + if err != nil { + return err + } + + logInfo("db", "Added originalDate column to articles table", nil) + } + + return nil +} + +// Generate a deterministic ID from a URL +func generateID(url string) string { + hash := sha256.Sum256([]byte(url)) + return hex.EncodeToString(hash[:])[:26] // Return first 26 chars of the hash +} + +func saveArticle(article Article) error { + _, err := db.Exec(` + INSERT OR IGNORE INTO articles + (link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, + article.Link, article.Title, article.Description, article.Published, article.OriginalDate, + article.Source, article.FirstSeen, article.Seen, article.Summary, article.Importance, article.ID, + article.BroadcastTime) + + if err != nil { + logEvent("db_insert_error", map[string]interface{}{ + "article": article.Link, + "id": article.ID, + "error": err.Error(), + }) + } + return err +} + +func updateArticle(article Article) error { + _, err := db.Exec(` + UPDATE articles SET + title = ?, + description = ?, + published = ?, + originalDate = ?, + source = ?, + firstseen = ?, + seen = ?, + summary = ?, + importance = ?, + id = ?, + broadcastTime = ? + WHERE link = ? + `, + article.Title, article.Description, article.Published, article.OriginalDate, article.Source, + article.FirstSeen, article.Seen, article.Summary, article.Importance, article.ID, + article.BroadcastTime, article.Link) + + if err != nil { + logEvent("db_update_error", map[string]interface{}{ + "article": article.Link, + "id": article.ID, + "error": err.Error(), + }) + } + return err +} + +func loadArticles() map[string]Article { + articles := make(map[string]Article) + + rows, err := db.Query(` + SELECT link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime + FROM articles + `) + if err != nil { + logEvent("db_query_error", map[string]interface{}{ + "error": err.Error(), + }) + return articles + } + defer rows.Close() + + for rows.Next() { + var a Article + var seen sql.NullTime + var broadcastTime sql.NullTime + var originalDate sql.NullTime + + err := rows.Scan( + &a.Link, &a.Title, &a.Description, &a.Published, &originalDate, &a.Source, + &a.FirstSeen, &seen, &a.Summary, &a.Importance, &a.ID, &broadcastTime, + ) + + if err != nil { + logEvent("db_scan_error", map[string]interface{}{ + "error": err.Error(), + }) + continue + } + + if seen.Valid { + a.Seen = seen.Time + } + + if broadcastTime.Valid { + a.BroadcastTime = broadcastTime.Time + } + + if originalDate.Valid { + a.OriginalDate = originalDate.Time + } + + articles[a.Link] = a + } + + return articles +} + +// getBroadcastHistory gets the most recent broadcast articles +func getBroadcastHistory(limit int) ([]Article, error) { + rows, err := db.Query(` + SELECT link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime + FROM articles + WHERE broadcastTime IS NOT NULL + AND broadcastTime > 1 + AND broadcastTime != 0 + AND datetime(broadcastTime) != '1970-01-01 00:00:00' + AND datetime(broadcastTime) != '0001-01-01 00:00:00' + AND strftime('%Y', broadcastTime) > '2000' -- Ensure year is at least 2000 + ORDER BY broadcastTime DESC + LIMIT ? + `, limit) + + if err != nil { + return nil, err + } + defer rows.Close() + + var articles []Article + for rows.Next() { + var a Article + var seen sql.NullTime + var broadcastTime sql.NullTime + var originalDate sql.NullTime + + err := rows.Scan( + &a.Link, &a.Title, &a.Description, &a.Published, &originalDate, &a.Source, + &a.FirstSeen, &seen, &a.Summary, &a.Importance, &a.ID, &broadcastTime, + ) + + if err != nil { + continue + } + + if seen.Valid { + a.Seen = seen.Time + } + + if broadcastTime.Valid { + a.BroadcastTime = broadcastTime.Time + } + + if originalDate.Valid { + a.OriginalDate = originalDate.Time + } + + articles = append(articles, a) + } + + return articles, nil +} + +// getNextUpArticles gets the top 25 articles eligible for broadcast sorted by importance +func getNextUpArticles() ([]Article, error) { + now := time.Now() + cutoff := now.Add(-ARTICLE_FRESHNESS_WINDOW) // Time window for considering articles fresh + + rows, err := db.Query(` + SELECT link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime + FROM articles + WHERE broadcastTime IS NULL + AND summary IS NOT NULL + AND importance > 0 + AND firstseen > ? + ORDER BY importance DESC + LIMIT 25 + `, cutoff) + + if err != nil { + return nil, err + } + defer rows.Close() + + var articles []Article + for rows.Next() { + var a Article + var seen sql.NullTime + var broadcastTime sql.NullTime + var originalDate sql.NullTime + + err := rows.Scan( + &a.Link, &a.Title, &a.Description, &a.Published, &originalDate, &a.Source, + &a.FirstSeen, &seen, &a.Summary, &a.Importance, &a.ID, &broadcastTime, + ) + + if err != nil { + continue + } + + if seen.Valid { + a.Seen = seen.Time + } + + if broadcastTime.Valid { + a.BroadcastTime = broadcastTime.Time + } + + if originalDate.Valid { + a.OriginalDate = originalDate.Time + } + + articles = append(articles, a) + } + + return articles, nil +} + +// getRecentBroadcasts retrieves the n most recently broadcast articles +func getRecentBroadcasts(n int) []Article { + rows, err := db.Query(` + SELECT link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime + FROM articles + WHERE broadcastTime IS NOT NULL AND broadcastTime > 1 + ORDER BY broadcastTime DESC + LIMIT ? + `, n) + + if err != nil { + logInfo("db", "Error retrieving recent broadcasts", map[string]interface{}{ + "error": err.Error(), + }) + return []Article{} + } + defer rows.Close() + + var broadcasts []Article + for rows.Next() { + var a Article + var seen sql.NullTime + var broadcastTime sql.NullTime + var originalDate sql.NullTime + + err := rows.Scan( + &a.Link, &a.Title, &a.Description, &a.Published, &originalDate, &a.Source, + &a.FirstSeen, &seen, &a.Summary, &a.Importance, &a.ID, &broadcastTime, + ) + + if err != nil { + logInfo("db", "Error scanning broadcast article", map[string]interface{}{ + "error": err.Error(), + }) + continue + } + + if seen.Valid { + a.Seen = seen.Time + } + + if broadcastTime.Valid { + a.BroadcastTime = broadcastTime.Time + } + + if originalDate.Valid { + a.OriginalDate = originalDate.Time + } + + broadcasts = append(broadcasts, a) + } + + return broadcasts +} + +func setupLogging() { + var err error + logFile, err = os.Create(logPath) + if err != nil { + fmt.Fprintf(os.Stderr, "could not create log file: %v\n", err) + os.Exit(1) + } +} + +func flushLog() { + logMutex.Lock() + defer logMutex.Unlock() + + if logFile == nil { + return + } + + enc := json.NewEncoder(logFile) + enc.SetIndent("", " ") + _ = enc.Encode(logData) + logFile.Close() +} + +func logEvent(event string, details map[string]interface{}) { + logMutex.Lock() + defer logMutex.Unlock() + + details["timestamp"] = time.Now() + details["event"] = event + + entry := LogEntry{ + Timestamp: time.Now(), + Event: event, + Details: details, + } + + // Add to both the permanent log data and the ring buffer + logData = append(logData, entry) + logBuffer.Add(entry) +} + +// logInfo logs a structured message to both console and log file +func logInfo(component, message string, data map[string]interface{}) { + // Create a copy of the data map to avoid modifying the original + logData := make(map[string]interface{}) + for k, v := range data { + logData[k] = v + } + + // Add component and message to the log data + logData["component"] = component + logData["message"] = message + + // Format console output: timestamp component: message key1=val1 key2=val2 + timestamp := time.Now().Format("15:04:05.000") + console := fmt.Sprintf("[%s] [%s] %s", timestamp, component, message) + + // Add key-value pairs to console output + keys := make([]string, 0, len(data)) + for k := range data { + keys = append(keys, k) + } + sort.Strings(keys) // Sort keys for consistent output + + for _, k := range keys { + v := data[k] + console += fmt.Sprintf(" %s=%v", k, v) + } + + // Print to console + fmt.Fprintln(os.Stderr, console) + + // Log to structured log file + logEvent("info", logData) +} diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..560b8c1 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,290 @@ + + + + GomeshAlerter Dashboard + + + + + + +
+

GomeshAlerter Dashboard

+ + + +
+
+
Last Updated
+
{{.LastUpdated}}
+
+
+
Total Articles
+
{{.TotalArticles}}
+
+
+
Total Broadcast
+
{{.TotalBroadcast}}
+
+
+
New in Last Hour
+
{{.NewInLastHour}}
+
+
+
Awaiting Summary
+
{{.UnsummarizedCount}}
+
+
+ +
+

Next Up for Broadcast (Top 25)

+ + + + + + + + + + + + + {{range .NextUp}} + + + + + + + + + {{else}} + + + + {{end}} + +
IDImportanceSourceOriginal TitleSummaryFirst Seen
{{.ID}}{{.Importance}}{{.Source}}{{.Title}}{{.Summary}}{{.FirstSeen.Format "2006-01-02 15:04:05 MST"}}
No articles ready for broadcast
+
+ +
+

Broadcast History (Last 100)

+ + + + + + + + + + + + + {{range .History}} + + + + + + + + + {{else}} + + + + {{end}} + +
IDBroadcast TimeImportanceSourceOriginal TitleSummary
{{.ID}}{{.BroadcastTime.Format "2006-01-02 15:04:05 MST"}}{{.Importance}}{{.Source}}{{.Title}}{{.Summary}}
No broadcast history available
+
+ +
+

Recent Logs (Last 1000)

+ + + + + + + + + + {{range .RecentLogs}} + + + + + + {{else}} + + + + {{end}} + +
TimestampEventDetails
{{.Timestamp.Format "2006-01-02 15:04:05 MST"}}{{.Event}} + {{range $key, $value := .Details}} + {{if ne $key "timestamp"}} + {{if ne $key "event"}} + {{$key}}={{$value}} + {{end}} + {{end}} + {{end}} +
No log entries available
+
+
+ + \ No newline at end of file diff --git a/webserver.go b/webserver.go new file mode 100644 index 0000000..a09afb8 --- /dev/null +++ b/webserver.go @@ -0,0 +1,135 @@ +package main + +import ( + "context" + "html/template" + "net/http" + "strings" + "time" +) + +// webServer runs the web interface on port 8080 +func webServer(shutdown chan struct{}) { + // Load templates + tmpl, err := template.ParseFiles("templates/index.html") + if err != nil { + logInfo("web", "Error loading templates", map[string]interface{}{ + "error": err.Error(), + }) + return + } + + // Define HTTP handlers + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + data, err := getDashboardData() + if err != nil { + http.Error(w, "Error fetching data: "+err.Error(), http.StatusInternalServerError) + return + } + + err = tmpl.Execute(w, data) + if err != nil { + // Check if it's already too late to write headers + if !isResponseHeaderWritten(err) { + http.Error(w, "Error rendering template: "+err.Error(), http.StatusInternalServerError) + } else { + // Log the error but don't try to write headers again + logInfo("web", "Template execution error after headers sent", map[string]interface{}{ + "error": err.Error(), + }) + } + return + } + }) + + // Start the server + server := &http.Server{ + Addr: ":8080", + Handler: nil, // Use default mux + } + + // Create a goroutine for the server + go func() { + logInfo("web", "Starting web server", map[string]interface{}{ + "port": 8080, + }) + + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logInfo("web", "Web server error", map[string]interface{}{ + "error": err.Error(), + }) + } + }() + + // Wait for shutdown signal + <-shutdown + + // Create a deadline for server shutdown + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Attempt graceful shutdown + if err := server.Shutdown(ctx); err != nil { + logInfo("web", "Error during server shutdown", map[string]interface{}{ + "error": err.Error(), + }) + } + + logInfo("web", "Web server stopped", nil) +} + +// getDashboardData fetches the data needed for the dashboard +func getDashboardData() (DashboardData, error) { + articles := loadArticles() + now := time.Now() + hourAgo := now.Add(-60 * time.Minute) + + // Prepare the data structure + data := DashboardData{ + LastUpdated: now.Format("Jan 02, 2006 15:04:05 MST"), + TotalArticles: len(articles), + } + + // Count broadcast articles, recent articles, and unsummarized articles + for _, a := range articles { + if !a.BroadcastTime.IsZero() && a.BroadcastTime.Unix() > 1 { + data.TotalBroadcast++ + } + + if a.FirstSeen.After(hourAgo) { + data.NewInLastHour++ + } + + if a.Summary == "" || a.Importance == 0 { + data.UnsummarizedCount++ + } + } + + // Get broadcast history (last 100) + history, err := getBroadcastHistory(100) + if err != nil { + return data, err + } + data.History = history + + // Get next up articles (importance sorted, less than 24 hours old) + nextUp, err := getNextUpArticles() + if err != nil { + return data, err + } + data.NextUp = nextUp + + // Get recent logs + data.RecentLogs = logBuffer.GetAll() + + return data, nil +} + +// isResponseHeaderWritten checks if an error indicates headers were already written +func isResponseHeaderWritten(err error) bool { + // Check common patterns that indicate the response was already partially written + errStr := err.Error() + return strings.Contains(errStr, "write: broken pipe") || + strings.Contains(errStr, "write: connection reset by peer") || + strings.Contains(errStr, "http: superfluous response.WriteHeader") +}