Compare commits
10 Commits
d547f472ca
...
68ce2c88d2
Author | SHA1 | Date | |
---|---|---|---|
68ce2c88d2 | |||
c6f9c7e560 | |||
e9f9003f1b | |||
e769487555 | |||
9958c4e352 | |||
e64ec45054 | |||
b8f2ea7b7b | |||
634cc45f3a | |||
916e5f8610 | |||
a07bb67a33 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -3,3 +3,4 @@
|
||||
.env
|
||||
articles.db*
|
||||
gomeshalerter
|
||||
*.bak
|
||||
|
5
Makefile
5
Makefile
@ -1,10 +1,13 @@
|
||||
default: clean run
|
||||
|
||||
clean:
|
||||
rm -f gomeshalerter
|
||||
rm -f gomeshalerter *.json
|
||||
|
||||
run: gomeshalerter
|
||||
exec ./gomeshalerter
|
||||
|
||||
gomeshalerter: *.go
|
||||
go build -o gomeshalerter .
|
||||
|
||||
lint:
|
||||
golangci-lint run
|
||||
|
@ -10,15 +10,19 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// broadcaster runs on startup and every hour to select and broadcast the most important article
|
||||
// broadcaster runs on startup and frequently checks if a broadcast is due
|
||||
func broadcaster(shutdown chan struct{}, dryRun bool) {
|
||||
logInfo("broadcaster", fmt.Sprintf("Starting broadcaster (waiting %d seconds before first broadcast)", int(STARTUP_DELAY.Seconds())), map[string]interface{}{
|
||||
"interval": BROADCAST_INTERVAL.String(),
|
||||
"dryRun": dryRun,
|
||||
logInfo("broadcaster", "Starting broadcaster", map[string]interface{}{
|
||||
"startupDelay": int(STARTUP_DELAY.Seconds()),
|
||||
"checkInterval": BROADCAST_CHECK_INTERVAL.String(),
|
||||
"broadcastWindow": BROADCAST_INTERVAL.String(),
|
||||
"dryRun": dryRun,
|
||||
})
|
||||
|
||||
// Sleep on startup
|
||||
logInfo("broadcaster", fmt.Sprintf("Sleeping for %d seconds before first broadcast", int(STARTUP_DELAY.Seconds())), nil)
|
||||
logInfo("broadcaster", "Sleeping before first broadcast check", map[string]interface{}{
|
||||
"seconds": int(STARTUP_DELAY.Seconds()),
|
||||
})
|
||||
select {
|
||||
case <-time.After(STARTUP_DELAY):
|
||||
// Continue after sleep
|
||||
@ -27,17 +31,32 @@ func broadcaster(shutdown chan struct{}, dryRun bool) {
|
||||
return
|
||||
}
|
||||
|
||||
// Run immediately after initial sleep
|
||||
checkAndBroadcast(dryRun)
|
||||
// Track when the last broadcast happened
|
||||
var lastBroadcastTime time.Time
|
||||
|
||||
// Then run on interval
|
||||
ticker := time.NewTicker(BROADCAST_INTERVAL)
|
||||
// Run checks frequently
|
||||
ticker := time.NewTicker(BROADCAST_CHECK_INTERVAL)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
checkAndBroadcast(dryRun)
|
||||
now := time.Now()
|
||||
timeSinceLastBroadcast := now.Sub(lastBroadcastTime)
|
||||
|
||||
// If it's been at least BROADCAST_INTERVAL since last broadcast
|
||||
// or if we haven't broadcast yet (lastBroadcastTime is zero)
|
||||
if lastBroadcastTime.IsZero() || timeSinceLastBroadcast >= BROADCAST_INTERVAL {
|
||||
logInfo("broadcaster", "Broadcast window reached, checking conditions", map[string]interface{}{
|
||||
"timeSinceLastBroadcast": timeSinceLastBroadcast.String(),
|
||||
"requiredInterval": BROADCAST_INTERVAL.String(),
|
||||
})
|
||||
|
||||
// Only update lastBroadcastTime if we actually broadcast something
|
||||
if didBroadcast := checkAndBroadcast(dryRun); didBroadcast {
|
||||
lastBroadcastTime = now
|
||||
}
|
||||
}
|
||||
case <-shutdown:
|
||||
logInfo("broadcaster", "Shutting down broadcaster", nil)
|
||||
return
|
||||
@ -46,7 +65,8 @@ func broadcaster(shutdown chan struct{}, dryRun bool) {
|
||||
}
|
||||
|
||||
// checkAndBroadcast checks if there are any unsummarized articles before broadcasting
|
||||
func checkAndBroadcast(dryRun bool) {
|
||||
// Returns true if a broadcast was made
|
||||
func checkAndBroadcast(dryRun bool) bool {
|
||||
// Check if there are any unsummarized articles
|
||||
articles := loadArticles()
|
||||
unsummarizedCount := 0
|
||||
@ -62,15 +82,16 @@ func checkAndBroadcast(dryRun bool) {
|
||||
"unsummarizedCount": unsummarizedCount,
|
||||
"totalArticles": len(articles),
|
||||
})
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
// No unsummarized articles, proceed with broadcast
|
||||
broadcastWithRedundancyCheck(dryRun)
|
||||
return broadcastWithRedundancyCheck(dryRun)
|
||||
}
|
||||
|
||||
// broadcastWithRedundancyCheck attempts to find a non-redundant article to broadcast
|
||||
func broadcastWithRedundancyCheck(dryRun bool) {
|
||||
// Returns true if a broadcast was made
|
||||
func broadcastWithRedundancyCheck(dryRun bool) bool {
|
||||
articles := loadArticles()
|
||||
now := time.Now()
|
||||
cutoff := now.Add(-ARTICLE_FRESHNESS_WINDOW) // Time window for considering articles fresh
|
||||
@ -87,7 +108,7 @@ func broadcastWithRedundancyCheck(dryRun bool) {
|
||||
logInfo("broadcaster", "No fresh articles found for broadcasting", map[string]interface{}{
|
||||
"totalArticles": len(articles),
|
||||
})
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
// Sort by importance
|
||||
@ -111,7 +132,7 @@ func broadcastWithRedundancyCheck(dryRun bool) {
|
||||
})
|
||||
// Continue with this candidate despite the error
|
||||
broadcastArticle(candidate, dryRun)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
if isRedundant {
|
||||
@ -138,13 +159,14 @@ func broadcastWithRedundancyCheck(dryRun bool) {
|
||||
"candidateNumber": i + 1,
|
||||
})
|
||||
broadcastArticle(candidate, dryRun)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
// If we got here, all candidates were redundant
|
||||
logInfo("broadcaster", "All candidates were deemed redundant, no broadcast", map[string]interface{}{
|
||||
"candidatesChecked": len(candidates),
|
||||
})
|
||||
return false
|
||||
}
|
||||
|
||||
// broadcastArticle broadcasts the chosen article
|
||||
@ -169,7 +191,8 @@ func broadcastArticle(chosen Article, dryRun bool) {
|
||||
maxSummaryLen := MAX_MESSAGE_LENGTH - len(ts) - len(sourceAbbr) - len(": [] [AI/LLM] ") - 3 // 3 for "..."
|
||||
truncatedSummary := strings.TrimSpace(chosen.Summary)[:maxSummaryLen] + "..."
|
||||
msg = fmt.Sprintf("%s: [%s] [AI/LLM] %s", ts, sourceAbbr, truncatedSummary)
|
||||
logInfo("broadcaster", fmt.Sprintf("Message truncated to fit %d character limit", MAX_MESSAGE_LENGTH), map[string]interface{}{
|
||||
logInfo("broadcaster", "Message truncated to fit character limit", map[string]interface{}{
|
||||
"limit": MAX_MESSAGE_LENGTH,
|
||||
"originalLength": len(chosen.Summary),
|
||||
"truncatedLength": maxSummaryLen,
|
||||
})
|
||||
@ -189,7 +212,9 @@ func broadcastArticle(chosen Article, dryRun bool) {
|
||||
})
|
||||
|
||||
// Wait before broadcasting to allow time to see the message
|
||||
logInfo("broadcaster", fmt.Sprintf("Waiting %d seconds before broadcasting...", int(BROADCAST_PREPARATION_DELAY.Seconds())), nil)
|
||||
logInfo("broadcaster", "Waiting before broadcasting", map[string]interface{}{
|
||||
"seconds": int(BROADCAST_PREPARATION_DELAY.Seconds()),
|
||||
})
|
||||
time.Sleep(BROADCAST_PREPARATION_DELAY)
|
||||
|
||||
// Update broadcast time and save to database
|
||||
|
75
device.go
Normal file
75
device.go
Normal file
@ -0,0 +1,75 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"time"
|
||||
)
|
||||
|
||||
// deviceManager handles periodic maintenance of the Meshtastic device
|
||||
func deviceManager(shutdown chan struct{}, dryRun bool) {
|
||||
logInfo("device", "Starting device manager", map[string]interface{}{
|
||||
"rebootInterval": DEVICE_REBOOT_INTERVAL.String(),
|
||||
"dryRun": dryRun,
|
||||
})
|
||||
|
||||
// Wait some time before first reboot to allow system startup
|
||||
initialDelay := 5 * time.Minute
|
||||
logInfo("device", "Waiting before first device reboot", map[string]interface{}{
|
||||
"initialDelay": initialDelay.String(),
|
||||
})
|
||||
|
||||
select {
|
||||
case <-time.After(initialDelay):
|
||||
// Continue after delay
|
||||
case <-shutdown:
|
||||
logInfo("device", "Shutdown signal received during initial delay", nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Reboot immediately after startup delay
|
||||
rebootDevice(dryRun)
|
||||
|
||||
// Then run on interval
|
||||
ticker := time.NewTicker(DEVICE_REBOOT_INTERVAL)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
rebootDevice(dryRun)
|
||||
case <-shutdown:
|
||||
logInfo("device", "Shutting down device manager", nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// rebootDevice executes the meshtastic --reboot command
|
||||
func rebootDevice(dryRun bool) {
|
||||
cmdStr := "meshtastic --reboot"
|
||||
|
||||
logInfo("device", "Rebooting Meshtastic device", map[string]interface{}{
|
||||
"command": cmdStr,
|
||||
"dryRun": dryRun,
|
||||
"time": time.Now().Format("15:04:05 MST"),
|
||||
})
|
||||
|
||||
if dryRun {
|
||||
logInfo("device", "DRY RUN - would execute device reboot", nil)
|
||||
return
|
||||
}
|
||||
|
||||
// Execute the reboot command
|
||||
cmd := exec.Command("meshtastic", "--reboot")
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
|
||||
if err := cmd.Run(); err != nil {
|
||||
logInfo("device", "Device reboot failed", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
} else {
|
||||
logInfo("device", "Device reboot successful", nil)
|
||||
}
|
||||
}
|
1
go.mod
1
go.mod
@ -15,6 +15,7 @@ require (
|
||||
github.com/mmcdole/goxpp v1.1.1-0.20240225020742-a0c311522b23 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/oklog/ulid/v2 v2.1.1 // indirect
|
||||
golang.org/x/net v0.4.0 // indirect
|
||||
golang.org/x/text v0.5.0 // indirect
|
||||
)
|
||||
|
3
go.sum
3
go.sum
@ -21,6 +21,9 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
|
||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s=
|
||||
github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
|
||||
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
|
20
llm.go
20
llm.go
@ -25,7 +25,9 @@ type SummaryResult struct {
|
||||
|
||||
// articleSummarizer checks for articles without summaries every 10 seconds and processes them in batches
|
||||
func articleSummarizer(shutdown chan struct{}, ollamaURL, ollamaModel string) {
|
||||
fmt.Fprintf(os.Stderr, "[summarizer] Starting article summarizer (interval: %s)\n", SUMMARIZE_INTERVAL)
|
||||
logInfo("summarizer", "Starting article summarizer", map[string]interface{}{
|
||||
"interval": SUMMARIZE_INTERVAL.String(),
|
||||
})
|
||||
|
||||
ticker := time.NewTicker(SUMMARIZE_INTERVAL)
|
||||
defer ticker.Stop()
|
||||
@ -35,7 +37,7 @@ func articleSummarizer(shutdown chan struct{}, ollamaURL, ollamaModel string) {
|
||||
case <-ticker.C:
|
||||
summarizeArticles(ollamaURL, ollamaModel)
|
||||
case <-shutdown:
|
||||
fmt.Fprintf(os.Stderr, "[summarizer] Shutting down article summarizer\n")
|
||||
logInfo("summarizer", "Shutting down article summarizer", nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -84,9 +86,11 @@ func summarizeArticles(ollamaURL, ollamaModel string) {
|
||||
batchInfo = append(batchInfo, a.ID[:8])
|
||||
}
|
||||
|
||||
logInfo("summarizer", fmt.Sprintf("Processing batch %d to %d", i+1, end), map[string]interface{}{
|
||||
logInfo("summarizer", "Processing batch", map[string]interface{}{
|
||||
"batchSize": len(batch),
|
||||
"batchIds": strings.Join(batchInfo, ","),
|
||||
"startItem": i + 1,
|
||||
"endItem": end,
|
||||
})
|
||||
|
||||
startTime := time.Now()
|
||||
@ -106,6 +110,16 @@ func summarizeArticles(ollamaURL, ollamaModel string) {
|
||||
for id, result := range summaries {
|
||||
for _, article := range batch {
|
||||
if article.ID == id {
|
||||
// Log the summary details for each article
|
||||
logInfo("summary_result", "LLM generated summary", map[string]interface{}{
|
||||
"id": article.ID,
|
||||
"title": article.Title,
|
||||
"summary": result.Summary,
|
||||
"importance": result.Importance,
|
||||
"source": article.Source,
|
||||
"length": len(result.Summary),
|
||||
})
|
||||
|
||||
article.Summary = result.Summary
|
||||
article.Importance = result.Importance
|
||||
if err := updateArticle(article); err != nil {
|
||||
|
40
main.go
40
main.go
@ -3,8 +3,7 @@ package main
|
||||
import (
|
||||
"database/sql"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
@ -25,30 +24,37 @@ var (
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Fprintf(os.Stderr, "[%s] starting gomeshalerter\n", runStart.Format("15:04:05"))
|
||||
setupLogging()
|
||||
defer flushLog()
|
||||
|
||||
logInfo("main", "Starting gomeshalerter", map[string]interface{}{
|
||||
"timestamp": runStart.Format("15:04:05"),
|
||||
})
|
||||
|
||||
var err error
|
||||
db, err = sql.Open("sqlite3", dbPath+"?_journal=WAL") // Use WAL mode for better concurrency
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to open database: %v", err)
|
||||
slog.Error("Failed to open database", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Define a cleanup function to properly close resources
|
||||
cleanup := func() {
|
||||
fmt.Fprintf(os.Stderr, "[shutdown] Closing database...\n")
|
||||
logInfo("shutdown", "Closing database", nil)
|
||||
if err := db.Close(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "[shutdown] Error closing database: %v\n", err)
|
||||
logInfo("shutdown", "Error closing database", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
}
|
||||
flushLog()
|
||||
fmt.Fprintf(os.Stderr, "[shutdown] Cleanup complete\n")
|
||||
logInfo("shutdown", "Cleanup complete", nil)
|
||||
}
|
||||
// Ensure cleanup runs on normal exit
|
||||
defer cleanup()
|
||||
|
||||
if err := setupDatabase(); err != nil {
|
||||
log.Fatalf("Failed to setup database: %v", err)
|
||||
slog.Error("Failed to setup database", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
ollamaModel := "qwen3:32b"
|
||||
@ -57,7 +63,10 @@ func main() {
|
||||
ollamaURL = "http://localhost:11434" // Default Ollama server URL
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stderr, "[ollama] Using model: %s at %s\n", ollamaModel, ollamaURL)
|
||||
logInfo("ollama", "Using model", map[string]interface{}{
|
||||
"model": ollamaModel,
|
||||
"url": ollamaURL,
|
||||
})
|
||||
|
||||
// Replace --broadcast flag with --dry-run flag (default is to broadcast)
|
||||
dryRun := flag.Bool("dry-run", false, "don't actually send to meshtastic, just print what would be sent")
|
||||
@ -75,9 +84,9 @@ func main() {
|
||||
|
||||
go func() {
|
||||
<-sigChan
|
||||
fmt.Fprintf(os.Stderr, "[shutdown] Received signal, performing cleanup before exit...\n")
|
||||
logInfo("shutdown", "Received signal, performing cleanup before exit", nil)
|
||||
cleanup()
|
||||
fmt.Fprintf(os.Stderr, "[shutdown] Exiting...\n")
|
||||
logInfo("shutdown", "Exiting", nil)
|
||||
os.Exit(0) // Exit after cleanup
|
||||
}()
|
||||
|
||||
@ -116,7 +125,14 @@ func main() {
|
||||
logCleanupWorker(shutdown)
|
||||
}()
|
||||
|
||||
// Start device manager goroutine for periodic device maintenance
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
deviceManager(shutdown, *dryRun)
|
||||
}()
|
||||
|
||||
// Wait for all goroutines to finish
|
||||
wg.Wait()
|
||||
fmt.Fprintf(os.Stderr, "[shutdown] All goroutines stopped, exiting...\n")
|
||||
logInfo("shutdown", "All goroutines stopped, exiting", nil)
|
||||
}
|
||||
|
15
models.go
15
models.go
@ -51,16 +51,19 @@ Never rate over 90 unless it is a massive event such as: war outbreak,
|
||||
revolution, death of a head of state, large-scale natural disaster, mass
|
||||
casualty terrorism, etc.
|
||||
|
||||
IMPORTANT: Rank any headlines primarily promoting commercial products or
|
||||
Rank any headlines primarily promoting commercial products or
|
||||
services as 1 (lowest importance).
|
||||
|
||||
Rank any article with a headline that poses a question without providing an
|
||||
answer (as an attempt to lure a reader into clicking a link) as 1 (lowest
|
||||
importance).
|
||||
|
||||
IMPORTANT: Boost the importance score by 10-20 points for breaking news that is less
|
||||
than 60 minutes old based on its original publication date (which is provided for each article).
|
||||
This helps ensure timely distribution of very recent news.
|
||||
Boost the importance score by 10 points for breaking news that is less than 60
|
||||
minutes old based on its original publication date (which is provided for each
|
||||
article), but only for events that need to be reported in minutes, such as
|
||||
emeregencies or other critical breaking news.
|
||||
|
||||
Do not editorialize or otherwise label the summary.
|
||||
|
||||
For each article, return a JSON object with "id", "summary", and "importance"
|
||||
fields. Return your response as a JSON array of objects like: [{"id":
|
||||
@ -70,13 +73,15 @@ Here are the articles:
|
||||
`
|
||||
|
||||
SYSTEM_PROMPT = "You are a news analyst."
|
||||
BATCH_SIZE = 10
|
||||
BATCH_SIZE = 5
|
||||
MAX_INDIVIDUAL_PROCESSING = 50
|
||||
|
||||
// Timing constants
|
||||
RSS_CHECK_INTERVAL = 15 * time.Minute
|
||||
SUMMARIZE_INTERVAL = 10 * time.Second
|
||||
BROADCAST_INTERVAL = 1 * time.Hour
|
||||
BROADCAST_CHECK_INTERVAL = 10 * time.Second // Interval to check if broadcasting is needed
|
||||
DEVICE_REBOOT_INTERVAL = 6 * time.Hour // Interval to reboot Meshtastic device
|
||||
STARTUP_DELAY = 60 * time.Second // Delay before first broadcast
|
||||
BROADCAST_PREPARATION_DELAY = 30 * time.Second // Delay before executing broadcast command
|
||||
ARTICLE_FRESHNESS_WINDOW = 24 * time.Hour // Time window for considering articles fresh
|
||||
|
27
rss.go
27
rss.go
@ -1,9 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -53,7 +51,9 @@ var feeds = map[string]string{
|
||||
|
||||
// rssFeedChecker checks RSS feeds every 15 minutes and adds new articles to the database
|
||||
func rssFeedChecker(shutdown chan struct{}, ollamaURL, ollamaModel string) {
|
||||
fmt.Fprintf(os.Stderr, "[rss] Starting RSS feed checker (interval: %s)\n", RSS_CHECK_INTERVAL)
|
||||
logInfo("rss", "Starting RSS feed checker", map[string]interface{}{
|
||||
"interval": RSS_CHECK_INTERVAL.String(),
|
||||
})
|
||||
|
||||
// Run immediately on startup
|
||||
checkRSSFeeds()
|
||||
@ -67,7 +67,7 @@ func rssFeedChecker(shutdown chan struct{}, ollamaURL, ollamaModel string) {
|
||||
case <-ticker.C:
|
||||
checkRSSFeeds()
|
||||
case <-shutdown:
|
||||
fmt.Fprintf(os.Stderr, "[rss] Shutting down RSS feed checker\n")
|
||||
logInfo("rss", "Shutting down RSS feed checker", nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -100,8 +100,9 @@ func checkRSSFeeds() {
|
||||
})
|
||||
}
|
||||
newCount++
|
||||
logInfo("new", fmt.Sprintf("Found new article: %s", a.Title), map[string]interface{}{
|
||||
logInfo("new", "Found new article", map[string]interface{}{
|
||||
"id": a.ID,
|
||||
"title": a.Title,
|
||||
"source": a.Source,
|
||||
"published": a.Published.Format(time.RFC3339),
|
||||
})
|
||||
@ -160,13 +161,23 @@ func fetchAllFeedsParallel(now time.Time) []Article {
|
||||
logEvent("rss_fetch_result", details)
|
||||
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "[rss] FAIL %-15s (%s) [%.2fs] ERR: %v\n", source, url, duration.Seconds(), err)
|
||||
logInfo("rss", "Feed fetch failed", map[string]interface{}{
|
||||
"source": source,
|
||||
"url": url,
|
||||
"duration": duration.Seconds(),
|
||||
"error": err.Error(),
|
||||
})
|
||||
results <- fetchResult{Source: source, URL: url, Err: err, Duration: duration, HTTPStatus: httpStatus}
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stderr, "[rss] OK %-15s (%s) [%.2fs] HTTP %d, items: %d\n",
|
||||
source, url, duration.Seconds(), httpStatus, len(feed.Items))
|
||||
logInfo("rss", "Feed fetch succeeded", map[string]interface{}{
|
||||
"source": source,
|
||||
"url": url,
|
||||
"duration": duration.Seconds(),
|
||||
"status": httpStatus,
|
||||
"items": len(feed.Items),
|
||||
})
|
||||
|
||||
results <- fetchResult{
|
||||
Source: source,
|
||||
|
305
storage.go
305
storage.go
@ -1,18 +1,213 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
)
|
||||
|
||||
// ANSI color codes
|
||||
const (
|
||||
colorReset = "\033[0m"
|
||||
colorRed = "\033[31m"
|
||||
colorGreen = "\033[32m"
|
||||
colorYellow = "\033[33m"
|
||||
colorBlue = "\033[34m"
|
||||
colorPurple = "\033[35m"
|
||||
colorCyan = "\033[36m"
|
||||
colorGray = "\033[37m"
|
||||
colorWhite = "\033[97m"
|
||||
bold = "\033[1m"
|
||||
)
|
||||
|
||||
// ColorizedHandler is a custom slog.Handler that outputs colorized logs
|
||||
type ColorizedHandler struct {
|
||||
w io.Writer
|
||||
level slog.Level
|
||||
timeKey string
|
||||
msgKey string
|
||||
}
|
||||
|
||||
// NewColorizedHandler creates a new ColorizedHandler
|
||||
func NewColorizedHandler(w io.Writer, opts *slog.HandlerOptions) *ColorizedHandler {
|
||||
if opts == nil {
|
||||
opts = &slog.HandlerOptions{}
|
||||
}
|
||||
return &ColorizedHandler{
|
||||
w: w,
|
||||
level: opts.Level.Level(),
|
||||
timeKey: "time",
|
||||
msgKey: slog.MessageKey,
|
||||
}
|
||||
}
|
||||
|
||||
// Enabled implements slog.Handler
|
||||
func (h *ColorizedHandler) Enabled(ctx context.Context, level slog.Level) bool {
|
||||
return level >= h.level
|
||||
}
|
||||
|
||||
// Handle implements slog.Handler
|
||||
func (h *ColorizedHandler) Handle(ctx context.Context, r slog.Record) error {
|
||||
// Skip logs below our level
|
||||
if !h.Enabled(ctx, r.Level) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Format time with milliseconds
|
||||
timeStr := r.Time.Format("15:04:05.000")
|
||||
|
||||
// Get component from attributes
|
||||
var component string
|
||||
var message string
|
||||
|
||||
// Store other attributes for printing later
|
||||
attributes := make(map[string]interface{})
|
||||
r.Attrs(func(a slog.Attr) bool {
|
||||
if a.Key == "component" {
|
||||
component = a.Value.String()
|
||||
return true
|
||||
}
|
||||
if a.Key == h.msgKey {
|
||||
message = a.Value.String()
|
||||
return true
|
||||
}
|
||||
// Skip internal or empty values
|
||||
if a.Key == h.timeKey || a.Key == "level" || a.Value.String() == "" {
|
||||
return true
|
||||
}
|
||||
attributes[a.Key] = a.Value.Any()
|
||||
return true
|
||||
})
|
||||
|
||||
// Format level with color
|
||||
var levelColor string
|
||||
var levelText string
|
||||
switch r.Level {
|
||||
case slog.LevelDebug:
|
||||
levelColor = colorGray
|
||||
levelText = "DBG"
|
||||
case slog.LevelInfo:
|
||||
levelColor = colorGreen
|
||||
levelText = "INF"
|
||||
case slog.LevelWarn:
|
||||
levelColor = colorYellow
|
||||
levelText = "WRN"
|
||||
case slog.LevelError:
|
||||
levelColor = colorRed
|
||||
levelText = "ERR"
|
||||
default:
|
||||
levelColor = colorReset
|
||||
levelText = "???"
|
||||
}
|
||||
|
||||
// Build the log line
|
||||
var sb strings.Builder
|
||||
|
||||
// Timestamp with gray color
|
||||
sb.WriteString(colorGray)
|
||||
sb.WriteString("[")
|
||||
sb.WriteString(timeStr)
|
||||
sb.WriteString("]")
|
||||
sb.WriteString(colorReset)
|
||||
sb.WriteString(" ")
|
||||
|
||||
// Level with appropriate color
|
||||
sb.WriteString(levelColor)
|
||||
sb.WriteString(levelText)
|
||||
sb.WriteString(colorReset)
|
||||
sb.WriteString(" ")
|
||||
|
||||
// Component in blue
|
||||
if component != "" {
|
||||
sb.WriteString(colorBlue)
|
||||
sb.WriteString("[")
|
||||
sb.WriteString(component)
|
||||
sb.WriteString("]")
|
||||
sb.WriteString(colorReset)
|
||||
sb.WriteString(" ")
|
||||
}
|
||||
|
||||
// Message in white+bold
|
||||
if message != "" {
|
||||
sb.WriteString(bold)
|
||||
sb.WriteString(colorWhite)
|
||||
sb.WriteString(message)
|
||||
sb.WriteString(colorReset)
|
||||
sb.WriteString(" ")
|
||||
}
|
||||
|
||||
// Sort keys for consistent output
|
||||
keys := make([]string, 0, len(attributes))
|
||||
for k := range attributes {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
|
||||
// Add attributes as key=value pairs with colors
|
||||
for _, k := range keys {
|
||||
v := attributes[k]
|
||||
sb.WriteString(colorCyan) // Key in cyan
|
||||
sb.WriteString(k)
|
||||
sb.WriteString(colorReset)
|
||||
sb.WriteString("=")
|
||||
|
||||
// Value color depends on type
|
||||
switch v := v.(type) {
|
||||
case string:
|
||||
sb.WriteString(colorYellow) // Strings in yellow
|
||||
sb.WriteString(v)
|
||||
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64:
|
||||
sb.WriteString(colorPurple) // Numbers in purple
|
||||
sb.WriteString(fmt.Sprintf("%v", v))
|
||||
case bool:
|
||||
if v {
|
||||
sb.WriteString(colorGreen) // true in green
|
||||
} else {
|
||||
sb.WriteString(colorRed) // false in red
|
||||
}
|
||||
sb.WriteString(fmt.Sprintf("%v", v))
|
||||
case error:
|
||||
sb.WriteString(colorRed) // Errors in red
|
||||
sb.WriteString(v.Error())
|
||||
default:
|
||||
sb.WriteString(colorReset) // Other types with no color
|
||||
sb.WriteString(fmt.Sprintf("%v", v))
|
||||
}
|
||||
sb.WriteString(colorReset)
|
||||
sb.WriteString(" ")
|
||||
}
|
||||
|
||||
sb.WriteString("\n")
|
||||
|
||||
// Write to output
|
||||
_, err := io.WriteString(h.w, sb.String())
|
||||
return err
|
||||
}
|
||||
|
||||
// WithAttrs implements slog.Handler
|
||||
func (h *ColorizedHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
|
||||
// This is a simplified implementation that doesn't actually store the attrs
|
||||
// In a real implementation, you would create a new handler with these attrs
|
||||
return h
|
||||
}
|
||||
|
||||
// WithGroup implements slog.Handler
|
||||
func (h *ColorizedHandler) WithGroup(name string) slog.Handler {
|
||||
// This is a simplified implementation that doesn't handle groups
|
||||
return h
|
||||
}
|
||||
|
||||
func setupDatabase() error {
|
||||
_, err := db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS articles (
|
||||
@ -217,8 +412,9 @@ func loadArticles() map[string]Article {
|
||||
return articles
|
||||
}
|
||||
|
||||
// getBroadcastHistory gets the most recent broadcast articles
|
||||
func getBroadcastHistory(limit int) ([]Article, error) {
|
||||
// getBroadcastArticles is a common function for retrieving broadcast articles
|
||||
// with consistent filtering criteria
|
||||
func getBroadcastArticles(limit int) ([]Article, error) {
|
||||
rows, err := db.Query(`
|
||||
SELECT link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime
|
||||
FROM articles
|
||||
@ -271,6 +467,11 @@ func getBroadcastHistory(limit int) ([]Article, error) {
|
||||
return articles, nil
|
||||
}
|
||||
|
||||
// getBroadcastHistory gets the most recent broadcast articles
|
||||
func getBroadcastHistory(limit int) ([]Article, error) {
|
||||
return getBroadcastArticles(limit)
|
||||
}
|
||||
|
||||
// getNextUpArticles gets the top 25 articles eligible for broadcast sorted by importance
|
||||
func getNextUpArticles() ([]Article, error) {
|
||||
now := time.Now()
|
||||
@ -328,66 +529,32 @@ func getNextUpArticles() ([]Article, error) {
|
||||
|
||||
// getRecentBroadcasts retrieves the n most recently broadcast articles
|
||||
func getRecentBroadcasts(n int) []Article {
|
||||
rows, err := db.Query(`
|
||||
SELECT link, title, description, published, originalDate, source, firstseen, seen, summary, importance, id, broadcastTime
|
||||
FROM articles
|
||||
WHERE broadcastTime IS NOT NULL AND broadcastTime > 1
|
||||
ORDER BY broadcastTime DESC
|
||||
LIMIT ?
|
||||
`, n)
|
||||
|
||||
articles, err := getBroadcastArticles(n)
|
||||
if err != nil {
|
||||
logInfo("db", "Error retrieving recent broadcasts", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
return []Article{}
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var broadcasts []Article
|
||||
for rows.Next() {
|
||||
var a Article
|
||||
var seen sql.NullTime
|
||||
var broadcastTime sql.NullTime
|
||||
var originalDate sql.NullTime
|
||||
|
||||
err := rows.Scan(
|
||||
&a.Link, &a.Title, &a.Description, &a.Published, &originalDate, &a.Source,
|
||||
&a.FirstSeen, &seen, &a.Summary, &a.Importance, &a.ID, &broadcastTime,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
logInfo("db", "Error scanning broadcast article", map[string]interface{}{
|
||||
"error": err.Error(),
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
if seen.Valid {
|
||||
a.Seen = seen.Time
|
||||
}
|
||||
|
||||
if broadcastTime.Valid {
|
||||
a.BroadcastTime = broadcastTime.Time
|
||||
}
|
||||
|
||||
if originalDate.Valid {
|
||||
a.OriginalDate = originalDate.Time
|
||||
}
|
||||
|
||||
broadcasts = append(broadcasts, a)
|
||||
}
|
||||
|
||||
return broadcasts
|
||||
return articles
|
||||
}
|
||||
|
||||
func setupLogging() {
|
||||
var err error
|
||||
logFile, err = os.Create(logPath)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "could not create log file: %v\n", err)
|
||||
slog.Error("Could not create log file", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Set up structured logger with custom colorized handler for console
|
||||
// and JSON handler for file logging
|
||||
consoleHandler := NewColorizedHandler(os.Stderr, &slog.HandlerOptions{
|
||||
Level: slog.LevelDebug,
|
||||
})
|
||||
|
||||
// Use the custom handler
|
||||
slog.SetDefault(slog.New(consoleHandler))
|
||||
}
|
||||
|
||||
func flushLog() {
|
||||
@ -429,10 +596,16 @@ func logEvent(event string, details map[string]interface{}) {
|
||||
// Add to the permanent log data (for file-based logging)
|
||||
logData = append(logData, entry)
|
||||
|
||||
// Only try to store in database if the database is initialized
|
||||
if db == nil {
|
||||
slog.Debug("Skipping database log storage - database not yet initialized")
|
||||
return
|
||||
}
|
||||
|
||||
// Store log in database
|
||||
logBytes, err := json.Marshal(entry)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error marshaling log entry: %v\n", err)
|
||||
slog.Error("Error marshaling log entry", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
@ -444,7 +617,7 @@ func logEvent(event string, details map[string]interface{}) {
|
||||
_, err = db.Exec("INSERT INTO logs (id, timestamp, log) VALUES (?, ?, ?)",
|
||||
id, timestamp, string(logBytes))
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error storing log in database: %v\n", err)
|
||||
slog.Error("Error storing log in database", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -490,7 +663,11 @@ func cleanupOldLogs() error {
|
||||
|
||||
rowsDeleted, _ := result.RowsAffected()
|
||||
if rowsDeleted > 0 {
|
||||
fmt.Fprintf(os.Stderr, "[logs] Deleted %d log entries older than one month\n", rowsDeleted)
|
||||
logInfo("logs", "Deleted old log entries", map[string]interface{}{
|
||||
"count": rowsDeleted,
|
||||
"olderThan": "1 month",
|
||||
"cutoffDate": cutoff.Format(time.RFC3339),
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -530,7 +707,7 @@ func logCleanupWorker(shutdown chan struct{}) {
|
||||
}
|
||||
|
||||
// logInfo logs a structured message to both console and log file
|
||||
func logInfo(component, message string, data map[string]interface{}) {
|
||||
func logInfo(component string, message string, data map[string]interface{}) {
|
||||
// Create a copy of the data map to avoid modifying the original
|
||||
logData := make(map[string]interface{})
|
||||
for k, v := range data {
|
||||
@ -541,25 +718,13 @@ func logInfo(component, message string, data map[string]interface{}) {
|
||||
logData["component"] = component
|
||||
logData["message"] = message
|
||||
|
||||
// Format console output: timestamp component: message key1=val1 key2=val2
|
||||
timestamp := time.Now().Format("15:04:05.000")
|
||||
console := fmt.Sprintf("[%s] [%s] %s", timestamp, component, message)
|
||||
|
||||
// Add key-value pairs to console output
|
||||
keys := make([]string, 0, len(data))
|
||||
for k := range data {
|
||||
keys = append(keys, k)
|
||||
// Use slog for structured logging to console
|
||||
attrs := []any{}
|
||||
for k, v := range logData {
|
||||
attrs = append(attrs, k, v)
|
||||
}
|
||||
sort.Strings(keys) // Sort keys for consistent output
|
||||
slog.Info(message, attrs...)
|
||||
|
||||
for _, k := range keys {
|
||||
v := data[k]
|
||||
console += fmt.Sprintf(" %s=%v", k, v)
|
||||
}
|
||||
|
||||
// Print to console
|
||||
fmt.Fprintln(os.Stderr, console)
|
||||
|
||||
// Log to structured log file
|
||||
// Log to structured log file and database
|
||||
logEvent("info", logData)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user