gomeshalerter/llm.go

639 lines
20 KiB
Go

package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
)
// Define a struct for redundancy check response
type RedundancyCheckResponse struct {
IsRedundant bool `json:"is_redundant"`
Reason string `json:"reason"`
}
type SummaryResult struct {
ID string `json:"id"`
Summary string `json:"summary"`
Importance int `json:"importance"`
}
// articleSummarizer checks for articles without summaries every 10 seconds and processes them in batches
func articleSummarizer(shutdown chan struct{}, ollamaURL, ollamaModel string) {
logInfo("summarizer", "Starting article summarizer", map[string]interface{}{
"interval": SUMMARIZE_INTERVAL.String(),
})
ticker := time.NewTicker(SUMMARIZE_INTERVAL)
defer ticker.Stop()
for {
select {
case <-ticker.C:
summarizeArticles(ollamaURL, ollamaModel)
case <-shutdown:
logInfo("summarizer", "Shutting down article summarizer", nil)
return
}
}
}
// summarizeArticles processes articles without summaries in batches
func summarizeArticles(ollamaURL, ollamaModel string) {
articles := loadArticles()
// Count articles that already have summaries
summarizedCount := 0
for _, article := range articles {
if article.Summary != "" && article.Importance != 0 {
summarizedCount++
}
}
// Collect articles that need summarization
var articlesToSummarize []Article
for _, article := range articles {
if article.Summary == "" || article.Importance == 0 {
articlesToSummarize = append(articlesToSummarize, article)
}
}
if len(articlesToSummarize) == 0 {
return // No articles to summarize
}
logInfo("summarizer", "Processing articles", map[string]interface{}{
"alreadySummarized": summarizedCount,
"toSummarize": len(articlesToSummarize),
"totalArticles": len(articles),
})
// Process in batches
for i := 0; i < len(articlesToSummarize); i += BATCH_SIZE {
end := i + BATCH_SIZE
if end > len(articlesToSummarize) {
end = len(articlesToSummarize)
}
batch := articlesToSummarize[i:end]
batchInfo := make([]string, 0, len(batch))
for _, a := range batch {
batchInfo = append(batchInfo, a.ID[:8])
}
logInfo("summarizer", "Processing batch", map[string]interface{}{
"batchSize": len(batch),
"batchIds": strings.Join(batchInfo, ","),
"startItem": i + 1,
"endItem": end,
})
startTime := time.Now()
summaries, err := processArticleBatch(ollamaURL, ollamaModel, batch)
apiDuration := time.Since(startTime)
if err != nil {
logInfo("summarizer", "Batch processing error", map[string]interface{}{
"error": err.Error(),
"duration": apiDuration.String(),
})
continue
}
// Update articles with summaries
updatedCount := 0
for id, result := range summaries {
for _, article := range batch {
if article.ID == id {
// Log the summary details for each article
logInfo("summary_result", "LLM generated summary", map[string]interface{}{
"id": article.ID,
"title": article.Title,
"summary": result.Summary,
"importance": result.Importance,
"source": article.Source,
"length": len(result.Summary),
})
article.Summary = result.Summary
article.Importance = result.Importance
if err := updateArticle(article); err != nil {
logInfo("summarizer", "Error updating article with summary", map[string]interface{}{
"id": article.ID,
"error": err.Error(),
})
continue
}
updatedCount++
break
}
}
}
logInfo("summarizer", "Batch processing complete", map[string]interface{}{
"batchSize": len(batch),
"updatedCount": updatedCount,
"duration": apiDuration.String(),
"durationMs": apiDuration.Milliseconds(),
"msPerArticle": apiDuration.Milliseconds() / int64(len(batch)),
})
// Calculate and log ETA for remaining articles
remainingCount := len(articlesToSummarize) - (i + len(batch))
if remainingCount > 0 {
// Calculate milliseconds per article from this batch
msPerArticle := apiDuration.Milliseconds() / int64(len(batch))
// Calculate remaining time
remainingTimeMs := msPerArticle * int64(remainingCount)
remainingTime := time.Duration(remainingTimeMs) * time.Millisecond
// Calculate how many batches remain
remainingBatches := (remainingCount + BATCH_SIZE - 1) / BATCH_SIZE // Ceiling division
// Estimated completion time
eta := time.Now().Add(remainingTime)
logInfo("summarizer", "Estimated time to completion", map[string]interface{}{
"remainingArticles": remainingCount,
"remainingBatches": remainingBatches,
"msPerArticle": msPerArticle,
"estimatedTimeLeft": remainingTime.String(),
"eta": eta.Format("15:04:05 MST"),
})
}
}
}
func processArticleBatch(ollamaURL, model string, articles []Article) (map[string]SummaryResult, error) {
// Prepare batch prompt
var batchPrompt strings.Builder
batchPrompt.WriteString(ARTICLES_PROMPT)
for i, article := range articles {
batchPrompt.WriteString(fmt.Sprintf("Article %d (ID: %s):\n", i+1, article.ID))
batchPrompt.WriteString(fmt.Sprintf("Title: %s\n", article.Title))
batchPrompt.WriteString(fmt.Sprintf("Content: %s\n", article.Description))
// Add original publication date information to help with breaking news ranking
batchPrompt.WriteString(fmt.Sprintf("Original Publication Date: %s\n", article.OriginalDate.Format(time.RFC3339)))
// Calculate minutes since publication for clearer guidance
minutesOld := time.Since(article.OriginalDate).Minutes()
batchPrompt.WriteString(fmt.Sprintf("Minutes Since Publication: %.0f\n\n", minutesOld))
}
payload := map[string]interface{}{
"model": model,
"messages": []map[string]string{
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": batchPrompt.String()},
},
"stream": false,
}
// Log request details
requestDetails := map[string]interface{}{
"model": model,
"batchSize": len(articles),
"apiEndpoint": ollamaURL + "/api/chat",
}
for i, article := range articles {
requestDetails[fmt.Sprintf("article%d", i+1)] = article.ID
}
logInfo("api", "Sending request to Ollama", requestDetails)
startTime := time.Now()
// Log the complete batch prompt for debugging
logInfo("summarize_prompt", "Complete prompt sent to LLM for summarization", map[string]interface{}{
"batchSize": len(articles),
"prompt": batchPrompt.String(),
})
// Pretty print request JSON for logging
payloadBytes, err := json.MarshalIndent(payload, "", " ")
if err != nil {
logInfo("api", "JSON encoding error", map[string]interface{}{
"error": err.Error(),
})
return nil, err
}
// Log the request payload
logInfo("summarize_request", "Summarization API request", map[string]interface{}{
"model": model,
"batchSize": len(articles),
"apiEndpoint": ollamaURL + "/api/chat",
"payload": string(payloadBytes),
"articleIDs": extractArticleIDs(articles),
})
req, err := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(payloadBytes))
if err != nil {
logInfo("api", "Request creation error", map[string]interface{}{
"error": err.Error(),
})
return nil, err
}
req.Header.Set("Content-Type", "application/json")
// Longer timeout for larger batches
timeout := 30 * time.Second
if len(articles) > 1 {
timeout = 120 * time.Second
}
client := &http.Client{Timeout: timeout}
resp, err := client.Do(req)
if err != nil {
logInfo("api", "API call failed", map[string]interface{}{
"error": err.Error(),
"duration": time.Since(startTime).String(),
})
return nil, err
}
defer resp.Body.Close()
bodyBytes, _ := io.ReadAll(resp.Body)
apiDuration := time.Since(startTime)
// Log the raw response for debugging
logInfo("summarize_response", "Raw LLM response for summarization", map[string]interface{}{
"statusCode": resp.StatusCode,
"response": string(bodyBytes),
"durationMs": apiDuration.Milliseconds(),
"msPerArticle": apiDuration.Milliseconds() / int64(len(articles)),
})
// Pretty print response JSON for debugging
var prettyJSON bytes.Buffer
err = json.Indent(&prettyJSON, bodyBytes, "", " ")
if err != nil {
// If we can't pretty print, just show as-is
logInfo("api", "Pretty print error", map[string]interface{}{
"error": err.Error(),
})
}
var res struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
}
if err := json.Unmarshal(bodyBytes, &res); err != nil {
logInfo("api", "JSON unmarshal error", map[string]interface{}{
"error": err.Error(),
"duration": apiDuration.String(),
})
return nil, err
}
if res.Message.Content == "" {
logInfo("api", "Empty response content", map[string]interface{}{
"duration": apiDuration.String(),
})
return nil, fmt.Errorf("no content in response")
}
// Parse the JSON response - handle both single object and array responses
var summaries []SummaryResult
// Try to parse as array first
if err := json.Unmarshal([]byte(res.Message.Content), &summaries); err != nil {
// Try to extract JSON from the content in case the model wrapped it in markdown or text
contentStr := res.Message.Content
jsonStart := strings.Index(contentStr, "[")
jsonEnd := strings.LastIndex(contentStr, "]")
if jsonStart >= 0 && jsonEnd > jsonStart {
// Try to parse as JSON array
jsonStr := contentStr[jsonStart : jsonEnd+1]
if err := json.Unmarshal([]byte(jsonStr), &summaries); err != nil {
// Try parsing as a single object for single article case
if len(articles) == 1 {
// Look for { } instead
jsonStart = strings.Index(contentStr, "{")
jsonEnd = strings.LastIndex(contentStr, "}")
if jsonStart >= 0 && jsonEnd > jsonStart {
jsonStr := contentStr[jsonStart : jsonEnd+1]
var singleResult struct {
Summary string `json:"summary"`
Importance int `json:"importance"`
}
if err := json.Unmarshal([]byte(jsonStr), &singleResult); err == nil {
// Create a SummaryResult with the article's ID
summaries = []SummaryResult{{
ID: articles[0].ID,
Summary: singleResult.Summary,
Importance: singleResult.Importance,
}}
} else {
logInfo("api", "JSON parse error for single object", map[string]interface{}{
"error": err.Error(),
"duration": apiDuration.String(),
})
return nil, err
}
} else {
logInfo("api", "Could not find JSON object in response", map[string]interface{}{
"duration": apiDuration.String(),
})
return nil, fmt.Errorf("could not find JSON object in response")
}
} else {
logInfo("api", "JSON parse error for array", map[string]interface{}{
"error": err.Error(),
"duration": apiDuration.String(),
})
return nil, err
}
}
} else {
logInfo("api", "Could not find JSON array in response", map[string]interface{}{
"duration": apiDuration.String(),
})
return nil, fmt.Errorf("could not find JSON array in response")
}
}
// Convert to map for easier lookup
resultMap := make(map[string]SummaryResult)
for _, summary := range summaries {
resultMap[summary.ID] = summary
}
logInfo("api", "API call successful", map[string]interface{}{
"duration": apiDuration.String(),
"durationMs": apiDuration.Milliseconds(),
"resultsCount": len(resultMap),
"msPerArticle": apiDuration.Milliseconds() / int64(len(articles)),
"statusCode": resp.StatusCode,
})
return resultMap, nil
}
// checkRedundancy asks the LLM if the candidate message is redundant compared to recent broadcasts
func checkRedundancy(candidate Article, recentBroadcasts []Article) (bool, string, error) {
if len(recentBroadcasts) == 0 {
return false, "No previous broadcasts to compare", nil
}
// Get the abbreviated source name
sourceAbbr := sourceAbbreviations[candidate.Source]
if sourceAbbr == "" {
if len(candidate.Source) >= 3 {
sourceAbbr = candidate.Source[:3]
} else {
sourceAbbr = candidate.Source
}
}
// Format the candidate message as it would be broadcast
ts := time.Now().Format("15:04 MST")
candidateMsg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, strings.TrimSpace(candidate.Summary))
// Format the recent broadcasts
var recentMessages []string
for i, article := range recentBroadcasts {
sourceAbbr := sourceAbbreviations[article.Source]
if sourceAbbr == "" {
if len(article.Source) >= 3 {
sourceAbbr = article.Source[:3]
} else {
sourceAbbr = article.Source
}
}
broadcastTime := article.BroadcastTime.Format("15:04 MST")
msg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", broadcastTime, sourceAbbr, strings.TrimSpace(article.Summary))
recentMessages = append(recentMessages, fmt.Sprintf("%d. %s", i+1, msg))
}
// Create the LLM prompt
prompt := fmt.Sprintf(`Determine if the following message would be redundant to broadcast given the previous broadcasts.
CANDIDATE MESSAGE TO BROADCAST:
%s
PREVIOUS BROADCASTS (most recent first):
%s
Is the candidate message redundant with any of the previously broadcast messages? A message is considered redundant if:
1. It covers substantially the same event or news story as a previous message
2. It doesn't add significant new information beyond what was already broadcast
3. It's about the same persons/entities in the same context as a previous message
You MUST respond ONLY with a valid JSON object having the following structure, and nothing else:
{"is_redundant": true/false, "reason": "explanation of your decision"}
DO NOT include any other text, explanation, or formatting in your response, ONLY the JSON.`,
candidateMsg,
strings.Join(recentMessages, "\n"),
)
// Log the complete prompt for debugging purposes
logInfo("redundancy_prompt", "Complete prompt sent to LLM for redundancy check", map[string]interface{}{
"candidateID": candidate.ID,
"prompt": prompt,
})
// Get the URL for the Ollama API
ollamaURL := os.Getenv("OLLAMA_URL")
if ollamaURL == "" {
ollamaURL = "http://localhost:11434" // Default Ollama server URL
}
// Use qwen3:32b model for consistency
ollamaModel := "qwen3:32b"
payload := map[string]interface{}{
"model": ollamaModel,
"messages": []map[string]string{
{"role": "system", "content": "You are a news redundancy analyzer. Your ONLY job is to determine if a news message is redundant compared to previously broadcast messages. You MUST respond ONLY with a valid JSON object in the format {\"is_redundant\": boolean, \"reason\": \"string\"} and nothing else."},
{"role": "user", "content": prompt},
},
"stream": false,
}
logInfo("redundancy", "Checking if article is redundant with previous broadcasts", map[string]interface{}{
"candidateID": candidate.ID,
"recentBroadcasts": len(recentBroadcasts),
})
// Convert payload to JSON
payloadBytes, err := json.Marshal(payload)
if err != nil {
return false, "", fmt.Errorf("error marshaling request: %v", err)
}
// Log the request payload
var prettyPayload bytes.Buffer
if err := json.Indent(&prettyPayload, payloadBytes, "", " "); err == nil {
logInfo("redundancy_request", "Redundancy check API request", map[string]interface{}{
"candidateID": candidate.ID,
"payload": prettyPayload.String(),
"url": ollamaURL + "/api/chat",
})
}
// Create the request
req, err := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(payloadBytes))
if err != nil {
return false, "", fmt.Errorf("error creating request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
// Send the request with timeout and retry mechanism
client := &http.Client{Timeout: 30 * time.Second}
// Try up to 3 times with backoff
var resp *http.Response
var respErr error
for attempt := 1; attempt <= 3; attempt++ {
resp, respErr = client.Do(req)
if respErr == nil {
break
}
logInfo("redundancy", "Request attempt failed", map[string]interface{}{
"attempt": attempt,
"error": respErr.Error(),
})
if attempt < 3 {
// Wait with exponential backoff before retry
backoffTime := time.Duration(attempt) * 2 * time.Second
time.Sleep(backoffTime)
}
}
// If all attempts failed, proceed with broadcasting anyway
if respErr != nil {
logInfo("redundancy", "All request attempts failed", map[string]interface{}{
"error": respErr.Error(),
})
return false, "Failed to check redundancy: " + respErr.Error(), nil
}
defer resp.Body.Close()
// Read the response
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
logInfo("redundancy", "Error reading response", map[string]interface{}{
"error": err.Error(),
})
return false, "Failed to read redundancy check response", nil
}
// Log the raw response for debugging
var prettyResponse bytes.Buffer
if err := json.Indent(&prettyResponse, bodyBytes, "", " "); err == nil {
logInfo("redundancy_response", "Raw LLM response for redundancy check", map[string]interface{}{
"candidateID": candidate.ID,
"statusCode": resp.StatusCode,
"response": prettyResponse.String(),
})
} else {
// If we can't pretty print, log as is
logInfo("redundancy_response", "Raw LLM response for redundancy check (not JSON)", map[string]interface{}{
"candidateID": candidate.ID,
"statusCode": resp.StatusCode,
"response": string(bodyBytes),
})
}
// Parse the response
var result struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
}
if err := json.Unmarshal(bodyBytes, &result); err != nil {
logInfo("redundancy", "JSON unmarshal error", map[string]interface{}{
"error": err.Error(),
"body": string(bodyBytes),
})
return false, "Failed to parse redundancy check response", nil
}
// Extract the JSON part from the response
content := result.Message.Content
content = strings.TrimSpace(content)
// Log the extracted content
logInfo("redundancy_content", "Extracted content from LLM response", map[string]interface{}{
"candidateID": candidate.ID,
"content": content,
})
// Handle case where the response might be wrapped in markdown code blocks
if strings.HasPrefix(content, "```json") {
content = strings.TrimPrefix(content, "```json")
content = strings.TrimPrefix(content, "```")
if idx := strings.Index(content, "```"); idx > 0 {
content = content[:idx]
}
} else if strings.HasPrefix(content, "```") {
content = strings.TrimPrefix(content, "```")
if idx := strings.Index(content, "```"); idx > 0 {
content = content[:idx]
}
}
content = strings.TrimSpace(content)
// Check for JSON syntax - find first { and last }
firstBrace := strings.Index(content, "{")
lastBrace := strings.LastIndex(content, "}")
if firstBrace >= 0 && lastBrace > firstBrace {
// Extract what appears to be valid JSON
content = content[firstBrace : lastBrace+1]
} else {
logInfo("redundancy", "Response doesn't contain valid JSON structure", map[string]interface{}{
"content": content,
})
// Default to non-redundant with explanation
return false, "Could not parse LLM response as JSON", nil
}
// Parse the redundancy check result
var redundancyResult RedundancyCheckResponse
if err := json.Unmarshal([]byte(content), &redundancyResult); err != nil {
logInfo("redundancy", "Failed to unmarshal JSON response", map[string]interface{}{
"error": err.Error(),
"content": content,
})
// Default to non-redundant with explanation
return false, "Error parsing redundancy check result", nil
}
logInfo("redundancy", "Redundancy check result", map[string]interface{}{
"isRedundant": redundancyResult.IsRedundant,
"reason": redundancyResult.Reason,
"candidateID": candidate.ID,
})
return redundancyResult.IsRedundant, redundancyResult.Reason, nil
}
// Helper function to extract article IDs for logging
func extractArticleIDs(articles []Article) []string {
ids := make([]string, len(articles))
for i, article := range articles {
ids[i] = article.ID
}
return ids
}