gomeshalerter/rss.go

192 lines
4.5 KiB
Go

package main
import (
"net/http"
"sync"
"time"
"github.com/mmcdole/gofeed"
)
// This function was unused and removed to satisfy linter
// rssFeedChecker checks RSS feeds every 15 minutes and adds new articles to the database
func rssFeedChecker(shutdown chan struct{}, ollamaURL, ollamaModel string) {
logInfo("rss", "Starting RSS feed checker", map[string]interface{}{
"interval": RSS_CHECK_INTERVAL.String(),
})
// Run immediately on startup
checkRSSFeeds()
// Then run on interval
ticker := time.NewTicker(RSS_CHECK_INTERVAL)
defer ticker.Stop()
for {
select {
case <-ticker.C:
checkRSSFeeds()
case <-shutdown:
logInfo("rss", "Shutting down RSS feed checker", nil)
return
}
}
}
// checkRSSFeeds fetches all RSS feeds and adds new articles to the database
func checkRSSFeeds() {
articles := loadArticles()
oldCount := len(articles)
logInfo("rss", "Checking RSS feeds", map[string]interface{}{
"time": time.Now().Format("15:04:05"),
"articlesBeforeFetch": oldCount,
})
now := time.Now()
newArticles := fetchAllFeedsParallel(now)
newCount := 0
for _, a := range newArticles {
if _, ok := articles[a.Link]; !ok {
if a.ID == "" {
a.ID = generateID(a.Link)
}
articles[a.Link] = a
if err := saveArticle(a); err != nil {
logInfo("rss", "Error saving article", map[string]interface{}{
"id": a.ID,
"error": err.Error(),
})
}
newCount++
logInfo("new", "Found new article", map[string]interface{}{
"id": a.ID,
"title": a.Title,
"source": a.Source,
"published": a.Published.Format(time.RFC3339),
})
}
}
logInfo("rss", "Completed RSS check", map[string]interface{}{
"articlesBeforeFetch": oldCount,
"articlesAfterFetch": oldCount + newCount,
"newArticles": newCount,
})
}
func fetchAllFeedsParallel(now time.Time) []Article {
type fetchResult struct {
Source string
URL string
Feed *gofeed.Feed
Err error
HTTPStatus int
Duration time.Duration
}
var wg sync.WaitGroup
results := make(chan fetchResult, len(feeds))
for source, url := range feeds {
wg.Add(1)
go func(source, url string) {
defer wg.Done()
start := time.Now()
fp := gofeed.NewParser()
client := &http.Client{Timeout: 20 * time.Second}
var httpStatus int
resp, err := client.Get(url)
var feed *gofeed.Feed
if err == nil {
httpStatus = resp.StatusCode
defer resp.Body.Close()
feed, err = fp.Parse(resp.Body)
}
duration := time.Since(start)
details := map[string]interface{}{
"source": source,
"url": url,
"status": httpStatus,
"duration": duration.Seconds(),
}
if err != nil {
details["error"] = err.Error()
}
logEvent("rss_fetch_result", details)
if err != nil {
logInfo("rss", "Feed fetch failed", map[string]interface{}{
"source": source,
"url": url,
"duration": duration.Seconds(),
"error": err.Error(),
})
results <- fetchResult{Source: source, URL: url, Err: err, Duration: duration, HTTPStatus: httpStatus}
return
}
logInfo("rss", "Feed fetch succeeded", map[string]interface{}{
"source": source,
"url": url,
"duration": duration.Seconds(),
"status": httpStatus,
"items": len(feed.Items),
})
results <- fetchResult{
Source: source,
URL: url,
Feed: feed,
Duration: duration,
HTTPStatus: httpStatus,
}
}(source, url)
}
wg.Wait()
close(results)
var all []Article
for result := range results {
if result.Err != nil || result.Feed == nil {
continue
}
for _, item := range result.Feed.Items {
published := now
// Set published to the current time if not available from feed
if item.PublishedParsed != nil {
published = *item.PublishedParsed
}
// Set originalDate to the feed's publication date if available
originalDate := published
// Skip articles older than 7 days based on the feed's publication date
cutoffDate := now.AddDate(0, 0, -7) // 7 days ago
if published.Before(cutoffDate) {
// Skip this article as it's older than 7 days
continue
}
all = append(all, Article{
Title: item.Title,
Description: item.Description,
Link: item.Link,
Published: now, // When we first saw the article
OriginalDate: originalDate, // Original publication date from the feed
Source: result.Source,
FirstSeen: now,
ID: generateID(item.Link),
})
}
}
return all
}