diff --git a/relp_handler.go b/relp_handler.go deleted file mode 100644 index 168e0cd..0000000 --- a/relp_handler.go +++ /dev/null @@ -1,274 +0,0 @@ -package simplelog - -import ( - "context" - "encoding/json" - "fmt" - "io/ioutil" - "log" - "log/slog" - "net" - "net/url" - "os" - "path/filepath" - "runtime" - "strconv" - "time" - - "github.com/google/uuid" -) - -func getEnvAsInt(name string, defaultVal int) int { - valStr := os.Getenv(name) - if val, err := strconv.Atoi(valStr); err == nil { - return val - } - return defaultVal -} - -func getEnvAsDuration(name string, defaultVal time.Duration) time.Duration { - valStr := os.Getenv(name) - if val, err := time.ParseDuration(valStr); err == nil { - return val - } - return defaultVal -} - -var ( - cacheDir, _ = os.UserCacheDir() - diskBufferLimit = getEnvAsInt("LOGGER_DISK_BUFFER_LIMIT", 100) - diskWriteInterval = getEnvAsDuration("LOGGER_DISK_WRITE_INTERVAL", time.Second) - relpDebug = os.Getenv("RELP_DEBUG") != "" -) - -type RELPHandler struct { - relpServerURL string - relpHost string - relpPort string - conn net.Conn - ch chan Event - done chan struct{} - failedCh chan Event - timer *time.Timer -} - -func NewRELPHandler(relpURL string) (ExtendedHandler, error) { - parsedURL, err := url.Parse(relpURL) - if err != nil { - return nil, fmt.Errorf("error parsing RELP URL: %v", err) - } - if parsedURL.Scheme != "tcp" { - return nil, fmt.Errorf("the RELP URL must have the tcp scheme, got %s", parsedURL.Scheme) - } - - host, port, err := net.SplitHostPort(parsedURL.Host) - if err != nil { - return nil, fmt.Errorf("Error splitting host and port: %v", err) - } - - if err := os.MkdirAll(filepath.Join(cacheDir, "simplelog"), 0755); err != nil { - return nil, fmt.Errorf("failed to create cache directory: %v", err) - } - r := &RELPHandler{ - ch: make(chan Event, diskBufferLimit), - done: make(chan struct{}), - failedCh: make(chan Event, diskBufferLimit), - timer: time.NewTimer(diskWriteInterval), - relpHost: host, - relpPort: port, - } - if relpDebug { - log.Printf("Created new RELP handler for server at %s", r.relpServerURL) - } - - err = r.Startup() - if err != nil { - return nil, err - } - return r, nil -} - -func (r *RELPHandler) connectToRELPServer() (net.Conn, error) { - conn, err := net.Dial("tcp", net.JoinHostPort(r.relpHost, r.relpPort)) - if err != nil { - if relpDebug { - log.Printf("Failed to connect to RELP server at %s: %v", net.JoinHostPort(r.relpHost, r.relpPort), err) - } - return nil, err - } - if relpDebug { - log.Printf("Successfully connected to RELP server at %s", net.JoinHostPort(r.relpHost, r.relpPort)) - } - return conn, nil -} - -func (r *RELPHandler) Startup() error { - var err error - r.conn, err = r.connectToRELPServer() - if err != nil { - if relpDebug { - log.Printf("Failed to establish TCP connection to RELP server: %v", err) - } - return fmt.Errorf("Failed to establish TCP connection to RELP server: %v", err) - } - if relpDebug { - log.Printf("Successfully connected to RELP server at %s", r.relpServerURL) - } - go r.receiveEventsFromChannel() - go r.processFailedEvents() - go r.watchDirectoryForFailedEventFiles() - return nil -} - -func (r *RELPHandler) Enabled(ctx context.Context, level slog.Level) bool { - return true -} - -func (r *RELPHandler) WithAttrs(attrs []slog.Attr) slog.Handler { - return r -} - -func (r *RELPHandler) WithGroup(name string) slog.Handler { - return r -} - -func (r *RELPHandler) Handle(ctx context.Context, record slog.Record) error { - attrs := make(map[string]interface{}) - record.Attrs(func(attr slog.Attr) bool { - attrs[attr.Key] = attr.Value - return true - }) - jsonData, err := json.Marshal(attrs) - if err != nil { - return fmt.Errorf("error marshaling attributes: %v", err) - } - // Get the caller information - _, file, line, ok := runtime.Caller(5) - if !ok { - file = "???" - line = 0 - } - - event := NewExtendedEvent(record.Level.String(), record.Message, jsonData, file, line) - for _, handler := range cl.handlers { - if err := handler.Handle(ctx, event); err != nil { - return err - } - } - return nil -} - -func (r *RELPHandler) receiveEventsFromChannel() { - for { - select { - case event := <-r.ch: - if err := r.sendEventToRELPServer(event); err != nil { - r.failedCh <- event - } - case <-r.done: - return - } - } -} - -func (r *RELPHandler) processFailedEvents() { - for { - select { - case event := <-r.failedCh: - if err := r.sendEventToRELPServer(event); err != nil { - // If still failing, write to disk - r.writeFailedToDisk(event) - } - case <-r.timer.C: - // Flush all events in failedCh to disk - for len(r.failedCh) > 0 { - event := <-r.failedCh - r.writeFailedToDisk(event) - } - case <-r.done: - return - } - } -} - -func (r *RELPHandler) sendEventToRELPServer(event Event) error { - jsonData, err := json.Marshal(event) - if err != nil { - return fmt.Errorf("error marshaling event: %v", err) - } - _, err = r.conn.Write(jsonData) - if err != nil { - return err - } - // Implement TCP read for the acknowledgment with a timeout - r.conn.SetReadDeadline(time.Now().Add(2 * time.Second)) - ack := make([]byte, 256) - n, err := r.conn.Read(ack) - if err != nil { - return err - } - if string(ack[:n]) != "ACK" { - return fmt.Errorf("expected ACK from server, got %s", string(ack[:n])) - } - if relpDebug { - log.Printf("Received ACK from RELP server for event %s", event.ID) - } - return nil -} - -func (r *RELPHandler) watchDirectoryForFailedEventFiles() { - for { - select { - case <-r.done: - return - default: - if r.isConnected() { - if relpDebug { - log.Printf("Reconnected to RELP server at %s", r.relpServerURL) - } - files, err := ioutil.ReadDir(cacheDir) - if err != nil { - log.Printf("Error reading cache directory: %v", err) - continue - } - for _, file := range files { - filePath := filepath.Join(cacheDir, file.Name()) - data, err := ioutil.ReadFile(filePath) - if err != nil { - log.Printf("Error reading event cache file %s: %v", file.Name(), err) - continue - } - var event Event - if err := json.Unmarshal(data, &event); err != nil { - log.Printf("Error unmarshalling event from file %s: %v", file.Name(), err) - continue - } - r.ch <- event - os.Remove(filePath) // Remove file after processing - } - } - time.Sleep(10 * time.Second) // Check disk every 10 seconds - } - } -} - -func (r *RELPHandler) isConnected() bool { - _, err := r.conn.Write([]byte{}) - return err == nil -} - -func (r *RELPHandler) Shutdown() error { - close(r.done) - if r.conn != nil { - if err := r.conn.Close(); err != nil { - return fmt.Errorf("error closing TCP connection: %v", err) - } - } - return nil -} - -func (r *RELPHandler) writeFailedToDisk(event Event) { - fileName := filepath.Join(cacheDir, "simplelog", uuid.New().String()+".logevent") - data, _ := json.Marshal(event) - ioutil.WriteFile(fileName, data, 0600) -} diff --git a/simplelog.go b/simplelog.go index 1294256..08316bc 100644 --- a/simplelog.go +++ b/simplelog.go @@ -14,8 +14,7 @@ import ( ) var ( - relpServerURL = os.Getenv("LOGGER_RELP_URL") - webhookURL = os.Getenv("LOGGER_WEBHOOK_URL") + webhookURL = os.Getenv("LOGGER_WEBHOOK_URL") ) var ourCustomLogger *slog.Logger @@ -38,13 +37,6 @@ func NewMultiplexHandler() slog.Handler { } else { cl.handlers = append(cl.handlers, NewJSONHandler()) } - if relpServerURL != "" { - handler, err := NewRELPHandler(relpServerURL) - if err != nil { - log.Fatalf("Failed to initialize RELP handler: %v", err) - } - cl.handlers = append(cl.handlers, handler) - } if webhookURL != "" { handler, err := NewWebhookHandler(webhookURL) if err != nil { @@ -55,7 +47,10 @@ func NewMultiplexHandler() slog.Handler { return cl } -func (cl *MultiplexHandler) Handle(ctx context.Context, record slog.Record) error { +func (cl *MultiplexHandler) Handle( + ctx context.Context, + record slog.Record, +) error { for _, handler := range cl.handlers { if err := handler.Handle(ctx, record); err != nil { return err @@ -64,7 +59,10 @@ func (cl *MultiplexHandler) Handle(ctx context.Context, record slog.Record) erro return nil } -func (cl *MultiplexHandler) Enabled(ctx context.Context, level slog.Level) bool { +func (cl *MultiplexHandler) Enabled( + ctx context.Context, + level slog.Level, +) bool { // send us all events return true }