548 lines
17 KiB
Go
548 lines
17 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// Define a struct for redundancy check response
|
|
type RedundancyCheckResponse struct {
|
|
IsRedundant bool `json:"is_redundant"`
|
|
Reason string `json:"reason"`
|
|
}
|
|
|
|
type SummaryResult struct {
|
|
ID string `json:"id"`
|
|
Summary string `json:"summary"`
|
|
Importance int `json:"importance"`
|
|
}
|
|
|
|
// articleSummarizer checks for articles without summaries every 10 seconds and processes them in batches
|
|
func articleSummarizer(shutdown chan struct{}, ollamaURL, ollamaModel string) {
|
|
fmt.Fprintf(os.Stderr, "[summarizer] Starting article summarizer (interval: %s)\n", SUMMARIZE_INTERVAL)
|
|
|
|
ticker := time.NewTicker(SUMMARIZE_INTERVAL)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
summarizeArticles(ollamaURL, ollamaModel)
|
|
case <-shutdown:
|
|
fmt.Fprintf(os.Stderr, "[summarizer] Shutting down article summarizer\n")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// summarizeArticles processes articles without summaries in batches
|
|
func summarizeArticles(ollamaURL, ollamaModel string) {
|
|
articles := loadArticles()
|
|
|
|
// Count articles that already have summaries
|
|
summarizedCount := 0
|
|
for _, article := range articles {
|
|
if article.Summary != "" && article.Importance != 0 {
|
|
summarizedCount++
|
|
}
|
|
}
|
|
|
|
// Collect articles that need summarization
|
|
var articlesToSummarize []Article
|
|
for _, article := range articles {
|
|
if article.Summary == "" || article.Importance == 0 {
|
|
articlesToSummarize = append(articlesToSummarize, article)
|
|
}
|
|
}
|
|
|
|
if len(articlesToSummarize) == 0 {
|
|
return // No articles to summarize
|
|
}
|
|
|
|
logInfo("summarizer", "Processing articles", map[string]interface{}{
|
|
"alreadySummarized": summarizedCount,
|
|
"toSummarize": len(articlesToSummarize),
|
|
"totalArticles": len(articles),
|
|
})
|
|
|
|
// Process in batches
|
|
for i := 0; i < len(articlesToSummarize); i += BATCH_SIZE {
|
|
end := i + BATCH_SIZE
|
|
if end > len(articlesToSummarize) {
|
|
end = len(articlesToSummarize)
|
|
}
|
|
|
|
batch := articlesToSummarize[i:end]
|
|
batchInfo := make([]string, 0, len(batch))
|
|
for _, a := range batch {
|
|
batchInfo = append(batchInfo, a.ID[:8])
|
|
}
|
|
|
|
logInfo("summarizer", fmt.Sprintf("Processing batch %d to %d", i+1, end), map[string]interface{}{
|
|
"batchSize": len(batch),
|
|
"batchIds": strings.Join(batchInfo, ","),
|
|
})
|
|
|
|
startTime := time.Now()
|
|
summaries, err := processArticleBatch(ollamaURL, ollamaModel, batch)
|
|
apiDuration := time.Since(startTime)
|
|
|
|
if err != nil {
|
|
logInfo("summarizer", "Batch processing error", map[string]interface{}{
|
|
"error": err.Error(),
|
|
"duration": apiDuration.String(),
|
|
})
|
|
continue
|
|
}
|
|
|
|
// Update articles with summaries
|
|
updatedCount := 0
|
|
for id, result := range summaries {
|
|
for _, article := range batch {
|
|
if article.ID == id {
|
|
article.Summary = result.Summary
|
|
article.Importance = result.Importance
|
|
updateArticle(article)
|
|
updatedCount++
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
logInfo("summarizer", "Batch processing complete", map[string]interface{}{
|
|
"batchSize": len(batch),
|
|
"updatedCount": updatedCount,
|
|
"duration": apiDuration.String(),
|
|
"durationMs": apiDuration.Milliseconds(),
|
|
"msPerArticle": apiDuration.Milliseconds() / int64(len(batch)),
|
|
})
|
|
|
|
// Calculate and log ETA for remaining articles
|
|
remainingCount := len(articlesToSummarize) - (i + len(batch))
|
|
if remainingCount > 0 {
|
|
// Calculate milliseconds per article from this batch
|
|
msPerArticle := apiDuration.Milliseconds() / int64(len(batch))
|
|
|
|
// Calculate remaining time
|
|
remainingTimeMs := msPerArticle * int64(remainingCount)
|
|
remainingTime := time.Duration(remainingTimeMs) * time.Millisecond
|
|
|
|
// Calculate how many batches remain
|
|
remainingBatches := (remainingCount + BATCH_SIZE - 1) / BATCH_SIZE // Ceiling division
|
|
|
|
// Estimated completion time
|
|
eta := time.Now().Add(remainingTime)
|
|
|
|
logInfo("summarizer", "Estimated time to completion", map[string]interface{}{
|
|
"remainingArticles": remainingCount,
|
|
"remainingBatches": remainingBatches,
|
|
"msPerArticle": msPerArticle,
|
|
"estimatedTimeLeft": remainingTime.String(),
|
|
"eta": eta.Format("15:04:05 MST"),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func processArticleBatch(ollamaURL, model string, articles []Article) (map[string]SummaryResult, error) {
|
|
// Prepare batch prompt
|
|
var batchPrompt strings.Builder
|
|
batchPrompt.WriteString(ARTICLES_PROMPT)
|
|
|
|
for i, article := range articles {
|
|
batchPrompt.WriteString(fmt.Sprintf("Article %d (ID: %s):\n", i+1, article.ID))
|
|
batchPrompt.WriteString(fmt.Sprintf("Title: %s\n", article.Title))
|
|
batchPrompt.WriteString(fmt.Sprintf("Content: %s\n", article.Description))
|
|
|
|
// Add original publication date information to help with breaking news ranking
|
|
batchPrompt.WriteString(fmt.Sprintf("Original Publication Date: %s\n", article.OriginalDate.Format(time.RFC3339)))
|
|
|
|
// Calculate minutes since publication for clearer guidance
|
|
minutesOld := time.Since(article.OriginalDate).Minutes()
|
|
batchPrompt.WriteString(fmt.Sprintf("Minutes Since Publication: %.0f\n\n", minutesOld))
|
|
}
|
|
|
|
payload := map[string]interface{}{
|
|
"model": model,
|
|
"messages": []map[string]string{
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": batchPrompt.String()},
|
|
},
|
|
"stream": false,
|
|
}
|
|
|
|
// Log request details
|
|
requestDetails := map[string]interface{}{
|
|
"model": model,
|
|
"batchSize": len(articles),
|
|
"apiEndpoint": ollamaURL + "/api/chat",
|
|
}
|
|
|
|
for i, article := range articles {
|
|
requestDetails[fmt.Sprintf("article%d", i+1)] = article.ID
|
|
}
|
|
|
|
logInfo("api", "Sending request to Ollama", requestDetails)
|
|
startTime := time.Now()
|
|
|
|
// Pretty print request JSON for logging
|
|
payloadBytes, err := json.MarshalIndent(payload, "", " ")
|
|
if err != nil {
|
|
logInfo("api", "JSON encoding error", map[string]interface{}{
|
|
"error": err.Error(),
|
|
})
|
|
return nil, err
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(payloadBytes))
|
|
if err != nil {
|
|
logInfo("api", "Request creation error", map[string]interface{}{
|
|
"error": err.Error(),
|
|
})
|
|
return nil, err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
// Longer timeout for larger batches
|
|
timeout := 30 * time.Second
|
|
if len(articles) > 1 {
|
|
timeout = 120 * time.Second
|
|
}
|
|
client := &http.Client{Timeout: timeout}
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
logInfo("api", "API call failed", map[string]interface{}{
|
|
"error": err.Error(),
|
|
"duration": time.Since(startTime).String(),
|
|
})
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
bodyBytes, _ := io.ReadAll(resp.Body)
|
|
apiDuration := time.Since(startTime)
|
|
|
|
// Pretty print response JSON for debugging
|
|
var prettyJSON bytes.Buffer
|
|
err = json.Indent(&prettyJSON, bodyBytes, "", " ")
|
|
if err != nil {
|
|
// If we can't pretty print, just show as-is
|
|
logInfo("api", "Pretty print error", map[string]interface{}{
|
|
"error": err.Error(),
|
|
})
|
|
}
|
|
|
|
var res struct {
|
|
Message struct {
|
|
Content string `json:"content"`
|
|
} `json:"message"`
|
|
}
|
|
if err := json.Unmarshal(bodyBytes, &res); err != nil {
|
|
logInfo("api", "JSON unmarshal error", map[string]interface{}{
|
|
"error": err.Error(),
|
|
"duration": apiDuration.String(),
|
|
})
|
|
return nil, err
|
|
}
|
|
|
|
if res.Message.Content == "" {
|
|
logInfo("api", "Empty response content", map[string]interface{}{
|
|
"duration": apiDuration.String(),
|
|
})
|
|
return nil, fmt.Errorf("no content in response")
|
|
}
|
|
|
|
// Parse the JSON response - handle both single object and array responses
|
|
var summaries []SummaryResult
|
|
|
|
// Try to parse as array first
|
|
if err := json.Unmarshal([]byte(res.Message.Content), &summaries); err != nil {
|
|
// Try to extract JSON from the content in case the model wrapped it in markdown or text
|
|
contentStr := res.Message.Content
|
|
jsonStart := strings.Index(contentStr, "[")
|
|
jsonEnd := strings.LastIndex(contentStr, "]")
|
|
|
|
if jsonStart >= 0 && jsonEnd > jsonStart {
|
|
// Try to parse as JSON array
|
|
jsonStr := contentStr[jsonStart : jsonEnd+1]
|
|
if err := json.Unmarshal([]byte(jsonStr), &summaries); err != nil {
|
|
// Try parsing as a single object for single article case
|
|
if len(articles) == 1 {
|
|
// Look for { } instead
|
|
jsonStart = strings.Index(contentStr, "{")
|
|
jsonEnd = strings.LastIndex(contentStr, "}")
|
|
|
|
if jsonStart >= 0 && jsonEnd > jsonStart {
|
|
jsonStr := contentStr[jsonStart : jsonEnd+1]
|
|
var singleResult struct {
|
|
Summary string `json:"summary"`
|
|
Importance int `json:"importance"`
|
|
}
|
|
|
|
if err := json.Unmarshal([]byte(jsonStr), &singleResult); err == nil {
|
|
// Create a SummaryResult with the article's ID
|
|
summaries = []SummaryResult{{
|
|
ID: articles[0].ID,
|
|
Summary: singleResult.Summary,
|
|
Importance: singleResult.Importance,
|
|
}}
|
|
} else {
|
|
logInfo("api", "JSON parse error for single object", map[string]interface{}{
|
|
"error": err.Error(),
|
|
"duration": apiDuration.String(),
|
|
})
|
|
return nil, err
|
|
}
|
|
} else {
|
|
logInfo("api", "Could not find JSON object in response", map[string]interface{}{
|
|
"duration": apiDuration.String(),
|
|
})
|
|
return nil, fmt.Errorf("could not find JSON object in response")
|
|
}
|
|
} else {
|
|
logInfo("api", "JSON parse error for array", map[string]interface{}{
|
|
"error": err.Error(),
|
|
"duration": apiDuration.String(),
|
|
})
|
|
return nil, err
|
|
}
|
|
}
|
|
} else {
|
|
logInfo("api", "Could not find JSON array in response", map[string]interface{}{
|
|
"duration": apiDuration.String(),
|
|
})
|
|
return nil, fmt.Errorf("could not find JSON array in response")
|
|
}
|
|
}
|
|
|
|
// Convert to map for easier lookup
|
|
resultMap := make(map[string]SummaryResult)
|
|
for _, summary := range summaries {
|
|
resultMap[summary.ID] = summary
|
|
}
|
|
|
|
logInfo("api", "API call successful", map[string]interface{}{
|
|
"duration": apiDuration.String(),
|
|
"durationMs": apiDuration.Milliseconds(),
|
|
"resultsCount": len(resultMap),
|
|
"msPerArticle": apiDuration.Milliseconds() / int64(len(articles)),
|
|
"statusCode": resp.StatusCode,
|
|
})
|
|
|
|
return resultMap, nil
|
|
}
|
|
|
|
// checkRedundancy asks the LLM if the candidate message is redundant compared to recent broadcasts
|
|
func checkRedundancy(candidate Article, recentBroadcasts []Article) (bool, string, error) {
|
|
if len(recentBroadcasts) == 0 {
|
|
return false, "No previous broadcasts to compare", nil
|
|
}
|
|
|
|
// Get the abbreviated source name
|
|
sourceAbbr := sourceAbbreviations[candidate.Source]
|
|
if sourceAbbr == "" {
|
|
if len(candidate.Source) >= 3 {
|
|
sourceAbbr = candidate.Source[:3]
|
|
} else {
|
|
sourceAbbr = candidate.Source
|
|
}
|
|
}
|
|
|
|
// Format the candidate message as it would be broadcast
|
|
ts := time.Now().Format("15:04 MST")
|
|
candidateMsg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, strings.TrimSpace(candidate.Summary))
|
|
|
|
// Format the recent broadcasts
|
|
var recentMessages []string
|
|
for i, article := range recentBroadcasts {
|
|
sourceAbbr := sourceAbbreviations[article.Source]
|
|
if sourceAbbr == "" {
|
|
if len(article.Source) >= 3 {
|
|
sourceAbbr = article.Source[:3]
|
|
} else {
|
|
sourceAbbr = article.Source
|
|
}
|
|
}
|
|
|
|
broadcastTime := article.BroadcastTime.Format("15:04 MST")
|
|
msg := fmt.Sprintf("%s: [%s] [AI/LLM] %s", broadcastTime, sourceAbbr, strings.TrimSpace(article.Summary))
|
|
recentMessages = append(recentMessages, fmt.Sprintf("%d. %s", i+1, msg))
|
|
}
|
|
|
|
// Create the LLM prompt
|
|
prompt := fmt.Sprintf(`Determine if the following message would be redundant to broadcast given the previous broadcasts.
|
|
|
|
CANDIDATE MESSAGE TO BROADCAST:
|
|
%s
|
|
|
|
PREVIOUS BROADCASTS (most recent first):
|
|
%s
|
|
|
|
Is the candidate message redundant with any of the previously broadcast messages? A message is considered redundant if:
|
|
1. It covers substantially the same event or news story as a previous message
|
|
2. It doesn't add significant new information beyond what was already broadcast
|
|
3. It's about the same persons/entities in the same context as a previous message
|
|
|
|
You MUST respond ONLY with a valid JSON object having the following structure, and nothing else:
|
|
{"is_redundant": true/false, "reason": "explanation of your decision"}
|
|
|
|
DO NOT include any other text, explanation, or formatting in your response, ONLY the JSON.`,
|
|
candidateMsg,
|
|
strings.Join(recentMessages, "\n"),
|
|
)
|
|
|
|
// Get the URL for the Ollama API
|
|
ollamaURL := os.Getenv("OLLAMA_URL")
|
|
if ollamaURL == "" {
|
|
ollamaURL = "http://localhost:11434" // Default Ollama server URL
|
|
}
|
|
|
|
// Use qwen3:32b model for consistency
|
|
ollamaModel := "qwen3:32b"
|
|
|
|
payload := map[string]interface{}{
|
|
"model": ollamaModel,
|
|
"messages": []map[string]string{
|
|
{"role": "system", "content": "You are a news redundancy analyzer. Your ONLY job is to determine if a news message is redundant compared to previously broadcast messages. You MUST respond ONLY with a valid JSON object in the format {\"is_redundant\": boolean, \"reason\": \"string\"} and nothing else."},
|
|
{"role": "user", "content": prompt},
|
|
},
|
|
"stream": false,
|
|
}
|
|
|
|
logInfo("redundancy", "Checking if article is redundant with previous broadcasts", map[string]interface{}{
|
|
"candidateID": candidate.ID,
|
|
"recentBroadcasts": len(recentBroadcasts),
|
|
})
|
|
|
|
// Convert payload to JSON
|
|
payloadBytes, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return false, "", fmt.Errorf("error marshaling request: %v", err)
|
|
}
|
|
|
|
// Create the request
|
|
req, err := http.NewRequest("POST", ollamaURL+"/api/chat", bytes.NewReader(payloadBytes))
|
|
if err != nil {
|
|
return false, "", fmt.Errorf("error creating request: %v", err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
// Send the request with timeout and retry mechanism
|
|
client := &http.Client{Timeout: 30 * time.Second}
|
|
|
|
// Try up to 3 times with backoff
|
|
var resp *http.Response
|
|
var respErr error
|
|
|
|
for attempt := 1; attempt <= 3; attempt++ {
|
|
resp, respErr = client.Do(req)
|
|
if respErr == nil {
|
|
break
|
|
}
|
|
|
|
logInfo("redundancy", "Request attempt failed", map[string]interface{}{
|
|
"attempt": attempt,
|
|
"error": respErr.Error(),
|
|
})
|
|
|
|
if attempt < 3 {
|
|
// Wait with exponential backoff before retry
|
|
backoffTime := time.Duration(attempt) * 2 * time.Second
|
|
time.Sleep(backoffTime)
|
|
}
|
|
}
|
|
|
|
// If all attempts failed, proceed with broadcasting anyway
|
|
if respErr != nil {
|
|
logInfo("redundancy", "All request attempts failed", map[string]interface{}{
|
|
"error": respErr.Error(),
|
|
})
|
|
return false, "Failed to check redundancy: " + respErr.Error(), nil
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Read the response
|
|
bodyBytes, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
logInfo("redundancy", "Error reading response", map[string]interface{}{
|
|
"error": err.Error(),
|
|
})
|
|
return false, "Failed to read redundancy check response", nil
|
|
}
|
|
|
|
// Parse the response
|
|
var result struct {
|
|
Message struct {
|
|
Content string `json:"content"`
|
|
} `json:"message"`
|
|
}
|
|
if err := json.Unmarshal(bodyBytes, &result); err != nil {
|
|
logInfo("redundancy", "JSON unmarshal error", map[string]interface{}{
|
|
"error": err.Error(),
|
|
"body": string(bodyBytes),
|
|
})
|
|
return false, "Failed to parse redundancy check response", nil
|
|
}
|
|
|
|
// Extract the JSON part from the response
|
|
content := result.Message.Content
|
|
content = strings.TrimSpace(content)
|
|
|
|
// Handle case where the response might be wrapped in markdown code blocks
|
|
if strings.HasPrefix(content, "```json") {
|
|
content = strings.TrimPrefix(content, "```json")
|
|
content = strings.TrimPrefix(content, "```")
|
|
if idx := strings.Index(content, "```"); idx > 0 {
|
|
content = content[:idx]
|
|
}
|
|
} else if strings.HasPrefix(content, "```") {
|
|
content = strings.TrimPrefix(content, "```")
|
|
if idx := strings.Index(content, "```"); idx > 0 {
|
|
content = content[:idx]
|
|
}
|
|
}
|
|
|
|
content = strings.TrimSpace(content)
|
|
|
|
// Check for JSON syntax - find first { and last }
|
|
firstBrace := strings.Index(content, "{")
|
|
lastBrace := strings.LastIndex(content, "}")
|
|
|
|
if firstBrace >= 0 && lastBrace > firstBrace {
|
|
// Extract what appears to be valid JSON
|
|
content = content[firstBrace : lastBrace+1]
|
|
} else {
|
|
logInfo("redundancy", "Response doesn't contain valid JSON structure", map[string]interface{}{
|
|
"content": content,
|
|
})
|
|
// Default to non-redundant with explanation
|
|
return false, "Could not parse LLM response as JSON", nil
|
|
}
|
|
|
|
// Parse the redundancy check result
|
|
var redundancyResult RedundancyCheckResponse
|
|
if err := json.Unmarshal([]byte(content), &redundancyResult); err != nil {
|
|
logInfo("redundancy", "Failed to unmarshal JSON response", map[string]interface{}{
|
|
"error": err.Error(),
|
|
"content": content,
|
|
})
|
|
// Default to non-redundant with explanation
|
|
return false, "Error parsing redundancy check result", nil
|
|
}
|
|
|
|
logInfo("redundancy", "Redundancy check result", map[string]interface{}{
|
|
"isRedundant": redundancyResult.IsRedundant,
|
|
"reason": redundancyResult.Reason,
|
|
"candidateID": candidate.ID,
|
|
})
|
|
|
|
return redundancyResult.IsRedundant, redundancyResult.Reason, nil
|
|
}
|