simplelog/relp_handler.go
sneak 00b2bfeeb8 aider: Fixed undefined variables and types, updated handlers to use slog.Handler interface, and defined missing event variables.
# Aider chat conversation:

USER: ../console_handler.go:15:77: undefined: slog.Field
../console_handler.go:16:51: undefined: event
../json_handler.go:14:33: undefined: slog
../json_handler.go:15:30: undefined: event
../relp_handler.go:117:33: undefined: slog
../relp_handler.go:119:15: undefined: event
../simplelog.go:20:77: undefined: slog.Field

ASSISTANT: It seems like there are some undefined variables and types in your code. Let's fix them.

In `console_handler.go`, `slog.Field` and `event` are undefined. We need to replace `slog.Field` with `simplelog.Field` and define `event`.

In `json_handler.go`, `slog` and `event` are undefined. We need to replace `slog` with `simplelog` and define `event`.

In `relp_handler.go`, `slog` and `event` are undefined. We need to replace `slog` with `simplelog` and define `event`.

In `simplelog.go`, `slog.Field` is undefined. We need to replace `slog.Field` with `simplelog.Field`.

Here are the *SEARCH/REPLACE* blocks:

console_handler.go
```go
<<<<<<< SEARCH
func (c *ConsoleHandler) Log(level slog.Level, message string
^C KeyboardInterrupt

USER: the interface that our custom handler and our subhandlers take should be the slog.Handler interface from the go standard library log/slog package.

ASSISTANT: I see, you want to use the `slog.Handler` interface from the `log/slog` package.

Let's modify the `handler.go` file to import `log/slog` and use `slog.Handler` instead of `Handler`.

handler.go
```go
<<<<<<< SEARCH
package simplelog

// Handler defines the interface for different log outputs.
type Handler interface {
	Log(event Event) error
}
=======
package simplelog

import "log/slog"

// Handler defines the interface for different log outputs.
type Handler interface {
	slog.Handler
}
>>>>>>> REPLACE
```

Now, we need to update the `Log` method in all the handlers to match the `slog.Handler` interface.

console_handler.go
```go
<<<<<<< SEARCH
func (c *ConsoleHandler) Log(level slog.Level, message string, data ...slog.Field) error {
	color.New(color.FgBlue).PrintfFunc()("%s: %s\n", event.Level, event.Message)
	return nil
}
=======
func (c *ConsoleHandler) Log(level slog.Level, message string, data ...slog.Field) error {
	color.New(color.FgBlue).PrintfFunc()("%s: %s\n", level, message)
	return nil
}
>>>>>>> REPLACE
```

json_handler.go
```go
<<<<<<< SEARCH
func (j *JSONHandler) Log(level slog.Level, message string, data ...slog.Field) error {
	jsonData, _ := json.Marshal(event)
	log.Println(string(jsonData))
	return nil
}
=======
func (j *JSONHandler) Log(level slog.Level, message string, data ...slog.Field) error {
	event := simplelog.NewEvent(level.String(), message, data)
	jsonData, _ := json.Marshal(event)
	log.Println(string(jsonData))
	return nil
}
>>>>>>> REPLACE
```

relp_handler.go
```go
<<<<<<< SEARCH
func (r *RELPHandler) Log(level slog.Level, message string, data ...slog.Field) error {
	select {
	case r.ch <- event:
		return nil // Successfully sent event to channel
	default:
		return fmt.Errorf("failed to log event: channel is full")
	}
}
=======
func (r *RELPHandler) Log(level slog.Level, message string, data ...slog.Field) error {
	event := simplelog.NewEvent(level.String(), message, data)
	select {
	case r.ch <- event:
		return nil // Successfully sent event to channel
	default:
		return fmt.Errorf("failed to log event: channel is full")
	}
}
>>>>>>> REPLACE
```

webhook_handler.go
```go
<<<<<<< SEARCH
func (w *WebhookHandler) Log(level slog.Level, message string, data ...slog.Field) error {
	jsonData, err := json.Marshal(event)
	if err != nil {
		return fmt.Errorf("error marshaling event: %v", err)
	}
	response, err := http.Post(w.webhookURL, "application/json", bytes.NewBuffer(jsonData))
	if err != nil {
		return err
	}
	defer response.Body.Close()
	return nil
}
=======
func (w *WebhookHandler) Log(level slog.Level, message string, data ...slog.Field) error {
	event := simplelog.NewEvent(level.String(), message, data)
	jsonData, err := json.Marshal(event)
	if err != nil {
		return fmt.Errorf("error marshaling event: %v", err)
	}
	response, err := http.Post(w.webhookURL, "application/json", bytes.NewBuffer(jsonData))
	if err != nil {
		return err
	}
	defer response.Body.Close()
	return nil
}
>>>>>>> REPLACE
```

simplelog.go
```go
<<<<<<< SEARCH
type CustomHandler struct {
	handlers []slog.Handler
}

func (cl *CustomHandler) Log(level slog.Level, message string, data ...slog.Field) error {
	for _, handler := range cl.handlers {
		if err := handler.Log(level, message, data...); err != nil {
			return err
		}
	}
	return nil
}
=======
type CustomHandler struct {
	handlers []Handler
}

func (cl *CustomHandler) Log(level slog.Level, message string, data ...slog.Field) error {
	for _, handler := range cl.handlers {
		if err := handler.Log(level, message, data...); err != nil {
			return err
		}
	}
	return nil
}
>>>>>>> REPLACE
```

Please let me know if you need further changes.
2024-05-14 04:52:13 -07:00

241 lines
6.0 KiB
Go

package simplelog
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/url"
"os"
"path/filepath"
"strconv"
"time"
"github.com/google/uuid"
)
func getEnvAsInt(name string, defaultVal int) int {
valStr := os.Getenv(name)
if val, err := strconv.Atoi(valStr); err == nil {
return val
}
return defaultVal
}
func getEnvAsDuration(name string, defaultVal time.Duration) time.Duration {
valStr := os.Getenv(name)
if val, err := time.ParseDuration(valStr); err == nil {
return val
}
return defaultVal
}
var (
cacheDir = os.Getenv("LOGGER_CACHE_DIR")
diskBufferLimit = getEnvAsInt("LOGGER_DISK_BUFFER_LIMIT", 100)
diskWriteInterval = getEnvAsDuration("LOGGER_DISK_WRITE_INTERVAL", time.Second)
relpDebug = os.Getenv("RELP_DEBUG") != ""
)
type RELPHandler struct {
relpServerURL string
conn net.Conn
ch chan Event
done chan struct{}
failedCh chan Event
timer *time.Timer
}
func NewRELPHandler(relpURL string) (*RELPHandler, error) {
parsedURL, err := url.Parse(relpURL)
if err != nil {
return nil, fmt.Errorf("Error parsing RELP URL: %v", err)
}
if parsedURL.Scheme != "tcp" {
return nil, fmt.Errorf("RELP URL must have the tcp scheme, got %s", parsedURL.Scheme)
}
if err := os.MkdirAll(cacheDir, 0755); err != nil {
return nil, fmt.Errorf("Failed to create cache directory: %v", err)
}
r := &RELPHandler{
relpServerURL: parsedURL.Host,
ch: make(chan Event, diskBufferLimit),
done: make(chan struct{}),
failedCh: make(chan Event, diskBufferLimit),
timer: time.NewTimer(diskWriteInterval),
}
if relpDebug {
log.Printf("Created new RELP handler for server at %s", r.relpServerURL)
}
return r, nil
}
func (r *RELPHandler) connectToRELPServer() (net.Conn, error) {
parsedURL, err := url.Parse(r.relpServerURL)
if err != nil {
return nil, fmt.Errorf("Error parsing RELP URL: %v", err)
}
if parsedURL.Scheme != "tcp" {
return nil, fmt.Errorf("RELP URL must have the tcp scheme, got %s", parsedURL.Scheme)
}
host, port, err := net.SplitHostPort(parsedURL.Host)
if err != nil {
return nil, fmt.Errorf("Error splitting host and port: %v", err)
}
conn, err := net.Dial("tcp", net.JoinHostPort(host, port))
if err != nil {
if relpDebug {
log.Printf("Failed to connect to RELP server at %s: %v", net.JoinHostPort(host, port), err)
}
return nil, err
}
if relpDebug {
log.Printf("Successfully connected to RELP server at %s", net.JoinHostPort(host, port))
}
return conn, nil
}
func (r *RELPHandler) Startup() error {
var err error
r.conn, err = r.connectToRELPServer()
if err != nil {
if relpDebug {
log.Printf("Failed to establish TCP connection to RELP server: %v", err)
}
return fmt.Errorf("Failed to establish TCP connection to RELP server: %v", err)
}
if relpDebug {
log.Printf("Successfully connected to RELP server at %s", r.relpServerURL)
}
go r.receiveEventsFromChannel()
go r.processFailedEvents()
go r.watchDirectoryForFailedEventFiles()
return nil
}
func (r *RELPHandler) Log(level slog.Level, message string, data ...slog.Field) error {
event := simplelog.NewEvent(level.String(), message, data)
select {
case r.ch <- event:
return nil // Successfully sent event to channel
default:
return fmt.Errorf("failed to log event: channel is full")
}
}
func (r *RELPHandler) receiveEventsFromChannel() {
for {
select {
case event := <-r.ch:
if err := r.sendEventToRELPServer(event); err != nil {
r.failedCh <- event
}
case <-r.done:
return
}
}
}
func (r *RELPHandler) processFailedEvents() {
for {
select {
case event := <-r.failedCh:
if err := r.sendEventToRELPServer(event); err != nil {
// If still failing, write to disk
r.writeFailedToDisk(event)
}
case <-r.timer.C:
// Flush all events in failedCh to disk
for len(r.failedCh) > 0 {
event := <-r.failedCh
r.writeFailedToDisk(event)
}
case <-r.done:
return
}
}
}
func (r *RELPHandler) sendEventToRELPServer(event Event) error {
jsonData, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("error marshaling event: %v", err)
}
_, err = r.conn.Write(jsonData)
if err != nil {
return err
}
// Implement TCP read for the acknowledgment with a timeout
r.conn.SetReadDeadline(time.Now().Add(2 * time.Second))
ack := make([]byte, 256)
n, err := r.conn.Read(ack)
if err != nil {
return err
}
if string(ack[:n]) != "ACK" {
return fmt.Errorf("expected ACK from server, got %s", string(ack[:n]))
}
if relpDebug {
log.Printf("Received ACK from RELP server for event %s", event.ID)
}
return nil
}
func (r *RELPHandler) watchDirectoryForFailedEventFiles() {
for {
select {
case <-r.done:
return
default:
if r.isConnected() {
if relpDebug {
log.Printf("Reconnected to RELP server at %s", r.relpServerURL)
}
files, err := ioutil.ReadDir(cacheDir)
if err != nil {
log.Printf("Error reading cache directory: %v", err)
continue
}
for _, file := range files {
filePath := filepath.Join(cacheDir, file.Name())
data, err := ioutil.ReadFile(filePath)
if err != nil {
log.Printf("Error reading event cache file %s: %v", file.Name(), err)
continue
}
var event Event
if err := json.Unmarshal(data, &event); err != nil {
log.Printf("Error unmarshalling event from file %s: %v", file.Name(), err)
continue
}
r.ch <- event
os.Remove(filePath) // Remove file after processing
}
}
time.Sleep(10 * time.Second) // Check disk every 10 seconds
}
}
}
func (r *RELPHandler) isConnected() bool {
_, err := r.conn.Write([]byte{})
return err == nil
}
func (r *RELPHandler) Shutdown() error {
close(r.done)
if r.conn != nil {
if err := r.conn.Close(); err != nil {
return fmt.Errorf("error closing TCP connection: %v", err)
}
}
return nil
}
func (r *RELPHandler) writeFailedToDisk(event Event) {
fileName := filepath.Join(cacheDir, uuid.New().String()+".logevent")
data, _ := json.Marshal(event)
ioutil.WriteFile(fileName, data, 0600)
}