package main import ( "bytes" "encoding/json" "fmt" "io" "net/http" "os" "strings" "time" ) // Define a struct for redundancy check response type RedundancyCheckResponse struct { IsRedundant bool `json:"is_redundant"` Reason string `json:"reason"` } type SummaryResult struct { ID string `json:"id"` Summary string `json:"summary"` Importance int `json:"importance"` } // articleSummarizer checks for articles without summaries every 10 seconds and processes them in batches func articleSummarizer(shutdown chan struct{}, ollamaURL, ollamaModel string) { logInfo("summarizer", "Starting article summarizer", map[string]interface{}{ "interval": SUMMARIZE_INTERVAL.String(), }) ticker := time.NewTicker(SUMMARIZE_INTERVAL) defer ticker.Stop() for { select { case <-ticker.C: summarizeArticles(ollamaURL, ollamaModel) case <-shutdown: logInfo("summarizer", "Shutting down article summarizer", nil) return } } } // summarizeArticles processes articles without summaries in batches func summarizeArticles(ollamaURL, ollamaModel string) { articles := loadArticles() // Count articles that already have summaries summarizedCount := 0 for _, article := range articles { if article.Summary != "" && article.Importance != 0 { summarizedCount++ } } // Collect articles that need summarization var articlesToSummarize []Article for _, article := range articles { if article.Summary == "" || article.Importance == 0 { articlesToSummarize = append(articlesToSummarize, article) } } if len(articlesToSummarize) == 0 { return // No articles to summarize } logInfo("summarizer", "Processing articles", map[string]interface{}{ "alreadySummarized": summarizedCount, "toSummarize": len(articlesToSummarize), "totalArticles": len(articles), }) // Process in batches for i := 0; i < len(articlesToSummarize); i += BATCH_SIZE { end := i + BATCH_SIZE if end > len(articlesToSummarize) { end = len(articlesToSummarize) } batch := articlesToSummarize[i:end] batchInfo := make([]string, 0, len(batch)) for _, a := range batch { batchInfo = append(batchInfo, a.ID[:8]) } logInfo("summarizer", "Processing batch", map[string]interface{}{ "batchSize": len(batch), "batchIds": strings.Join(batchInfo, ","), "startItem": i + 1, "endItem": end, }) startTime := time.Now() summaries, err := processArticleBatch(ollamaURL, ollamaModel, batch) apiDuration := time.Since(startTime) if err != nil { logInfo("summarizer", "Batch processing error", map[string]interface{}{ "error": err.Error(), "duration": apiDuration.String(), }) continue } // Update articles with summaries updatedCount := 0 for id, result := range summaries { for _, article := range batch { if article.ID == id { // Log the summary details for each article logInfo("summary_result", "LLM generated summary", map[string]interface{}{ "id": article.ID, "title": article.Title, "summary": result.Summary, "importance": result.Importance, "source": article.Source, "length": len(result.Summary), }) article.Summary = result.Summary article.Importance = result.Importance if err := updateArticle(article); err != nil { logInfo("summarizer", "Error updating article with summary", map[string]interface{}{ "id": article.ID, "error": err.Error(), }) continue } updatedCount++ break } } } logInfo("summarizer", "Batch processing complete", map[string]interface{}{ "batchSize": len(batch), "updatedCount": updatedCount, "duration": apiDuration.String(), "durationMs": apiDuration.Milliseconds(), "msPerArticle": apiDuration.Milliseconds() / int64(len(batch)), }) // Calculate and log ETA for remaining articles remainingCount := len(articlesToSummarize) - (i + len(batch)) if remainingCount > 0 { // Calculate milliseconds per article from this batch msPerArticle := apiDuration.Milliseconds() / int64(len(batch)) // Calculate remaining time remainingTimeMs := msPerArticle * int64(remainingCount) remainingTime := time.Duration(remainingTimeMs) * time.Millisecond // Calculate how many batches remain remainingBatches := (remainingCount + BATCH_SIZE - 1) / BATCH_SIZE // Ceiling division // Estimated completion time eta := time.Now().Add(remainingTime) logInfo("summarizer", "Estimated time to completion", map[string]interface{}{ "remainingArticles": remainingCount, "remainingBatches": remainingBatches, "msPerArticle": msPerArticle, "estimatedTimeLeft": remainingTime.String(), "eta": eta.Format("15:04:05 MST"), }) } } } func processArticleBatch(ollamaURL, model string, articles []Article) (map[string]SummaryResult, error) { // Prepare batch prompt var batchPrompt strings.Builder batchPrompt.WriteString(ARTICLES_PROMPT) for i, article := range articles { batchPrompt.WriteString(fmt.Sprintf("Article %d (ID: %s):\n", i+1, article.ID)) batchPrompt.WriteString(fmt.Sprintf("Title: %s\n", article.Title)) batchPrompt.WriteString(fmt.Sprintf("Content: %s\n", article.Description)) // Add original publication date information to help with breaking news ranking batchPrompt.WriteString(fmt.Sprintf("Original Publication Date: %s\n", article.OriginalDate.Format(time.RFC3339))) // Calculate minutes since publication for clearer guidance minutesOld := time.Since(article.OriginalDate).Minutes() batchPrompt.WriteString(fmt.Sprintf("Minutes Since Publication: %.0f\n\n", minutesOld)) } payload := map[string]interface{}{ "model": model, "messages": []map[string]string{ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": batchPrompt.String()}, }, "stream": false, } // Log request details requestDetails := map[string]interface{}{ "model": model, "batchSize": len(articles), "apiEndpoint": ollamaURL + "/api/chat", } for i, article := range articles { requestDetails[fmt.Sprintf("article%d", i+1)] = article.ID } logInfo("api", "Sending request to Ollama", requestDetails) startTime := time.Now() // Log the complete batch prompt for debugging logInfo("summarize_prompt", "Complete prompt sent to LLM for summarization", map[string]interface{}{ "batchSize": len(articles), "prompt": batchPrompt.String(), }) // Pretty print request JSON for logging payloadBytes, err := json.MarshalIndent(payload, "", " ") if err != nil { logInfo("api", "JSON encoding error", map[string]interface{}{ "error": err.Error(), }) return nil, err } // Log the request payload logInfo("summarize_request", "Summarization API request", map[string]interface{}{ "model": model, "batchSize": len(articles), "apiEndpoint": ollamaURL + "/api/chat", "payload": string(payloadBytes), "articleIDs": extractArticleIDs(articles), }) req, err := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(payloadBytes)) if err != nil { logInfo("api", "Request creation error", map[string]interface{}{ "error": err.Error(), }) return nil, err } req.Header.Set("Content-Type", "application/json") // Longer timeout for larger batches timeout := 30 * time.Second if len(articles) > 1 { timeout = 120 * time.Second } client := &http.Client{Timeout: timeout} resp, err := client.Do(req) if err != nil { logInfo("api", "API call failed", map[string]interface{}{ "error": err.Error(), "duration": time.Since(startTime).String(), }) return nil, err } defer resp.Body.Close() bodyBytes, _ := io.ReadAll(resp.Body) apiDuration := time.Since(startTime) // Log the raw response for debugging logInfo("summarize_response", "Raw LLM response for summarization", map[string]interface{}{ "statusCode": resp.StatusCode, "response": string(bodyBytes), "durationMs": apiDuration.Milliseconds(), "msPerArticle": apiDuration.Milliseconds() / int64(len(articles)), }) // Pretty print response JSON for debugging var prettyJSON bytes.Buffer err = json.Indent(&prettyJSON, bodyBytes, "", " ") if err != nil { // If we can't pretty print, just show as-is logInfo("api", "Pretty print error", map[string]interface{}{ "error": err.Error(), }) } var res struct { Message struct { Content string `json:"content"` } `json:"message"` } if err := json.Unmarshal(bodyBytes, &res); err != nil { logInfo("api", "JSON unmarshal error", map[string]interface{}{ "error": err.Error(), "duration": apiDuration.String(), }) return nil, err } if res.Message.Content == "" { logInfo("api", "Empty response content", map[string]interface{}{ "duration": apiDuration.String(), }) return nil, fmt.Errorf("no content in response") } // Parse the JSON response - handle both single object and array responses var summaries []SummaryResult // Try to parse as array first if err := json.Unmarshal([]byte(res.Message.Content), &summaries); err != nil { // Try to extract JSON from the content in case the model wrapped it in markdown or text contentStr := res.Message.Content jsonStart := strings.Index(contentStr, "[") jsonEnd := strings.LastIndex(contentStr, "]") if jsonStart >= 0 && jsonEnd > jsonStart { // Try to parse as JSON array jsonStr := contentStr[jsonStart : jsonEnd+1] if err := json.Unmarshal([]byte(jsonStr), &summaries); err != nil { // Try parsing as a single object for single article case if len(articles) == 1 { // Look for { } instead jsonStart = strings.Index(contentStr, "{") jsonEnd = strings.LastIndex(contentStr, "}") if jsonStart >= 0 && jsonEnd > jsonStart { jsonStr := contentStr[jsonStart : jsonEnd+1] var singleResult struct { Summary string `json:"summary"` Importance int `json:"importance"` } if err := json.Unmarshal([]byte(jsonStr), &singleResult); err == nil { // Create a SummaryResult with the article's ID summaries = []SummaryResult{{ ID: articles[0].ID, Summary: singleResult.Summary, Importance: singleResult.Importance, }} } else { logInfo("api", "JSON parse error for single object", map[string]interface{}{ "error": err.Error(), "duration": apiDuration.String(), }) return nil, err } } else { logInfo("api", "Could not find JSON object in response", map[string]interface{}{ "duration": apiDuration.String(), }) return nil, fmt.Errorf("could not find JSON object in response") } } else { logInfo("api", "JSON parse error for array", map[string]interface{}{ "error": err.Error(), "duration": apiDuration.String(), }) return nil, err } } } else { logInfo("api", "Could not find JSON array in response", map[string]interface{}{ "duration": apiDuration.String(), }) return nil, fmt.Errorf("could not find JSON array in response") } } // Convert to map for easier lookup resultMap := make(map[string]SummaryResult) for _, summary := range summaries { resultMap[summary.ID] = summary } logInfo("api", "API call successful", map[string]interface{}{ "duration": apiDuration.String(), "durationMs": apiDuration.Milliseconds(), "resultsCount": len(resultMap), "msPerArticle": apiDuration.Milliseconds() / int64(len(articles)), "statusCode": resp.StatusCode, }) return resultMap, nil } // checkRedundancy asks the LLM if the candidate message is redundant compared to recent broadcasts func checkRedundancy(candidate Article, recentBroadcasts []Article) (bool, string, error) { if len(recentBroadcasts) == 0 { return false, "No previous broadcasts to compare", nil } // Get the abbreviated source name sourceAbbr := sourceAbbreviations[candidate.Source] if sourceAbbr == "" { if len(candidate.Source) >= 3 { sourceAbbr = candidate.Source[:3] } else { sourceAbbr = candidate.Source } } // Format the candidate message as it would be broadcast ts := time.Now().Format("15:04 MST") candidateMsg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, strings.TrimSpace(candidate.Summary)) // Format the recent broadcasts var recentMessages []string for i, article := range recentBroadcasts { sourceAbbr := sourceAbbreviations[article.Source] if sourceAbbr == "" { if len(article.Source) >= 3 { sourceAbbr = article.Source[:3] } else { sourceAbbr = article.Source } } broadcastTime := article.BroadcastTime.Format("15:04 MST") msg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", broadcastTime, sourceAbbr, strings.TrimSpace(article.Summary)) recentMessages = append(recentMessages, fmt.Sprintf("%d. %s", i+1, msg)) } // Create the LLM prompt prompt := fmt.Sprintf(`Determine if the following message would be redundant to broadcast given the previous broadcasts. CANDIDATE MESSAGE TO BROADCAST: %s PREVIOUS BROADCASTS (most recent first): %s Is the candidate message redundant with any of the previously broadcast messages? A message is considered redundant if: 1. It covers substantially the same event or news story as a previous message 2. It doesn't add significant new information beyond what was already broadcast 3. It's about the same persons/entities in the same context as a previous message You MUST respond ONLY with a valid JSON object having the following structure, and nothing else: {"is_redundant": true/false, "reason": "explanation of your decision"} DO NOT include any other text, explanation, or formatting in your response, ONLY the JSON.`, candidateMsg, strings.Join(recentMessages, "\n"), ) // Log the complete prompt for debugging purposes logInfo("redundancy_prompt", "Complete prompt sent to LLM for redundancy check", map[string]interface{}{ "candidateID": candidate.ID, "prompt": prompt, }) // Get the URL for the Ollama API ollamaURL := os.Getenv("OLLAMA_URL") if ollamaURL == "" { ollamaURL = "http://localhost:11434" // Default Ollama server URL } // Use qwen3:32b model for consistency ollamaModel := "qwen3:32b" payload := map[string]interface{}{ "model": ollamaModel, "messages": []map[string]string{ {"role": "system", "content": "You are a news redundancy analyzer. Your ONLY job is to determine if a news message is redundant compared to previously broadcast messages. You MUST respond ONLY with a valid JSON object in the format {\"is_redundant\": boolean, \"reason\": \"string\"} and nothing else."}, {"role": "user", "content": prompt}, }, "stream": false, } logInfo("redundancy", "Checking if article is redundant with previous broadcasts", map[string]interface{}{ "candidateID": candidate.ID, "recentBroadcasts": len(recentBroadcasts), }) // Convert payload to JSON payloadBytes, err := json.Marshal(payload) if err != nil { return false, "", fmt.Errorf("error marshaling request: %v", err) } // Log the request payload var prettyPayload bytes.Buffer if err := json.Indent(&prettyPayload, payloadBytes, "", " "); err == nil { logInfo("redundancy_request", "Redundancy check API request", map[string]interface{}{ "candidateID": candidate.ID, "payload": prettyPayload.String(), "url": ollamaURL + "/api/chat", }) } // Create the request req, err := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(payloadBytes)) if err != nil { return false, "", fmt.Errorf("error creating request: %v", err) } req.Header.Set("Content-Type", "application/json") // Send the request with timeout and retry mechanism client := &http.Client{Timeout: 30 * time.Second} // Try up to 3 times with backoff var resp *http.Response var respErr error for attempt := 1; attempt <= 3; attempt++ { resp, respErr = client.Do(req) if respErr == nil { break } logInfo("redundancy", "Request attempt failed", map[string]interface{}{ "attempt": attempt, "error": respErr.Error(), }) if attempt < 3 { // Wait with exponential backoff before retry backoffTime := time.Duration(attempt) * 2 * time.Second time.Sleep(backoffTime) } } // If all attempts failed, proceed with broadcasting anyway if respErr != nil { logInfo("redundancy", "All request attempts failed", map[string]interface{}{ "error": respErr.Error(), }) return false, "Failed to check redundancy: " + respErr.Error(), nil } defer resp.Body.Close() // Read the response bodyBytes, err := io.ReadAll(resp.Body) if err != nil { logInfo("redundancy", "Error reading response", map[string]interface{}{ "error": err.Error(), }) return false, "Failed to read redundancy check response", nil } // Log the raw response for debugging var prettyResponse bytes.Buffer if err := json.Indent(&prettyResponse, bodyBytes, "", " "); err == nil { logInfo("redundancy_response", "Raw LLM response for redundancy check", map[string]interface{}{ "candidateID": candidate.ID, "statusCode": resp.StatusCode, "response": prettyResponse.String(), }) } else { // If we can't pretty print, log as is logInfo("redundancy_response", "Raw LLM response for redundancy check (not JSON)", map[string]interface{}{ "candidateID": candidate.ID, "statusCode": resp.StatusCode, "response": string(bodyBytes), }) } // Parse the response var result struct { Message struct { Content string `json:"content"` } `json:"message"` } if err := json.Unmarshal(bodyBytes, &result); err != nil { logInfo("redundancy", "JSON unmarshal error", map[string]interface{}{ "error": err.Error(), "body": string(bodyBytes), }) return false, "Failed to parse redundancy check response", nil } // Extract the JSON part from the response content := result.Message.Content content = strings.TrimSpace(content) // Log the extracted content logInfo("redundancy_content", "Extracted content from LLM response", map[string]interface{}{ "candidateID": candidate.ID, "content": content, }) // Handle case where the response might be wrapped in markdown code blocks if strings.HasPrefix(content, "```json") { content = strings.TrimPrefix(content, "```json") content = strings.TrimPrefix(content, "```") if idx := strings.Index(content, "```"); idx > 0 { content = content[:idx] } } else if strings.HasPrefix(content, "```") { content = strings.TrimPrefix(content, "```") if idx := strings.Index(content, "```"); idx > 0 { content = content[:idx] } } content = strings.TrimSpace(content) // Check for JSON syntax - find first { and last } firstBrace := strings.Index(content, "{") lastBrace := strings.LastIndex(content, "}") if firstBrace >= 0 && lastBrace > firstBrace { // Extract what appears to be valid JSON content = content[firstBrace : lastBrace+1] } else { logInfo("redundancy", "Response doesn't contain valid JSON structure", map[string]interface{}{ "content": content, }) // Default to non-redundant with explanation return false, "Could not parse LLM response as JSON", nil } // Parse the redundancy check result var redundancyResult RedundancyCheckResponse if err := json.Unmarshal([]byte(content), &redundancyResult); err != nil { logInfo("redundancy", "Failed to unmarshal JSON response", map[string]interface{}{ "error": err.Error(), "content": content, }) // Default to non-redundant with explanation return false, "Error parsing redundancy check result", nil } logInfo("redundancy", "Redundancy check result", map[string]interface{}{ "isRedundant": redundancyResult.IsRedundant, "reason": redundancyResult.Reason, "candidateID": candidate.ID, }) return redundancyResult.IsRedundant, redundancyResult.Reason, nil } // Helper function to extract article IDs for logging func extractArticleIDs(articles []Article) []string { ids := make([]string, len(articles)) for i, article := range articles { ids[i] = article.ID } return ids }