package simplelog import ( "context" "encoding/json" "fmt" "io/ioutil" "log" "log/slog" "net" "net/url" "os" "path/filepath" "runtime" "strconv" "time" "github.com/google/uuid" ) func getEnvAsInt(name string, defaultVal int) int { valStr := os.Getenv(name) if val, err := strconv.Atoi(valStr); err == nil { return val } return defaultVal } func getEnvAsDuration(name string, defaultVal time.Duration) time.Duration { valStr := os.Getenv(name) if val, err := time.ParseDuration(valStr); err == nil { return val } return defaultVal } var ( cacheDir, _ = os.UserCacheDir() diskBufferLimit = getEnvAsInt("LOGGER_DISK_BUFFER_LIMIT", 100) diskWriteInterval = getEnvAsDuration("LOGGER_DISK_WRITE_INTERVAL", time.Second) relpDebug = os.Getenv("RELP_DEBUG") != "" ) type RELPHandler struct { relpServerURL string relpHost string relpPort string conn net.Conn ch chan Event done chan struct{} failedCh chan Event timer *time.Timer } func NewRELPHandler(relpURL string) (ExtendedHandler, error) { parsedURL, err := url.Parse(relpURL) if err != nil { return nil, fmt.Errorf("error parsing RELP URL: %v", err) } if parsedURL.Scheme != "tcp" { return nil, fmt.Errorf("the RELP URL must have the tcp scheme, got %s", parsedURL.Scheme) } host, port, err := net.SplitHostPort(parsedURL.Host) if err != nil { return nil, fmt.Errorf("Error splitting host and port: %v", err) } if err := os.MkdirAll(filepath.Join(cacheDir, "simplelog"), 0755); err != nil { return nil, fmt.Errorf("failed to create cache directory: %v", err) } r := &RELPHandler{ ch: make(chan Event, diskBufferLimit), done: make(chan struct{}), failedCh: make(chan Event, diskBufferLimit), timer: time.NewTimer(diskWriteInterval), relpHost: host, relpPort: port, } if relpDebug { log.Printf("Created new RELP handler for server at %s", r.relpServerURL) } err = r.Startup() if err != nil { return nil, err } return r, nil } func (r *RELPHandler) connectToRELPServer() (net.Conn, error) { conn, err := net.Dial("tcp", net.JoinHostPort(r.relpHost, r.relpPort)) if err != nil { if relpDebug { log.Printf("Failed to connect to RELP server at %s: %v", net.JoinHostPort(r.relpHost, r.relpPort), err) } return nil, err } if relpDebug { log.Printf("Successfully connected to RELP server at %s", net.JoinHostPort(r.relpHost, r.relpPort)) } return conn, nil } func (r *RELPHandler) Startup() error { var err error r.conn, err = r.connectToRELPServer() if err != nil { if relpDebug { log.Printf("Failed to establish TCP connection to RELP server: %v", err) } return fmt.Errorf("Failed to establish TCP connection to RELP server: %v", err) } if relpDebug { log.Printf("Successfully connected to RELP server at %s", r.relpServerURL) } go r.receiveEventsFromChannel() go r.processFailedEvents() go r.watchDirectoryForFailedEventFiles() return nil } func (r *RELPHandler) Enabled(ctx context.Context, level slog.Level) bool { return true } func (r *RELPHandler) WithAttrs(attrs []slog.Attr) slog.Handler { return r } func (r *RELPHandler) WithGroup(name string) slog.Handler { return r } func (r *RELPHandler) Handle(ctx context.Context, record slog.Record) error { attrs := make(map[string]interface{}) record.Attrs(func(attr slog.Attr) bool { attrs[attr.Key] = attr.Value return true }) jsonData, err := json.Marshal(attrs) if err != nil { return fmt.Errorf("error marshaling attributes: %v", err) } // Get the caller information _, file, line, ok := runtime.Caller(5) if !ok { file = "???" line = 0 } event := NewExtendedEvent(record.Level.String(), record.Message, jsonData, file, line) for _, handler := range cl.handlers { if err := handler.Handle(ctx, event); err != nil { return err } } return nil } func (r *RELPHandler) receiveEventsFromChannel() { for { select { case event := <-r.ch: if err := r.sendEventToRELPServer(event); err != nil { r.failedCh <- event } case <-r.done: return } } } func (r *RELPHandler) processFailedEvents() { for { select { case event := <-r.failedCh: if err := r.sendEventToRELPServer(event); err != nil { // If still failing, write to disk r.writeFailedToDisk(event) } case <-r.timer.C: // Flush all events in failedCh to disk for len(r.failedCh) > 0 { event := <-r.failedCh r.writeFailedToDisk(event) } case <-r.done: return } } } func (r *RELPHandler) sendEventToRELPServer(event Event) error { jsonData, err := json.Marshal(event) if err != nil { return fmt.Errorf("error marshaling event: %v", err) } _, err = r.conn.Write(jsonData) if err != nil { return err } // Implement TCP read for the acknowledgment with a timeout r.conn.SetReadDeadline(time.Now().Add(2 * time.Second)) ack := make([]byte, 256) n, err := r.conn.Read(ack) if err != nil { return err } if string(ack[:n]) != "ACK" { return fmt.Errorf("expected ACK from server, got %s", string(ack[:n])) } if relpDebug { log.Printf("Received ACK from RELP server for event %s", event.ID) } return nil } func (r *RELPHandler) watchDirectoryForFailedEventFiles() { for { select { case <-r.done: return default: if r.isConnected() { if relpDebug { log.Printf("Reconnected to RELP server at %s", r.relpServerURL) } files, err := ioutil.ReadDir(cacheDir) if err != nil { log.Printf("Error reading cache directory: %v", err) continue } for _, file := range files { filePath := filepath.Join(cacheDir, file.Name()) data, err := ioutil.ReadFile(filePath) if err != nil { log.Printf("Error reading event cache file %s: %v", file.Name(), err) continue } var event Event if err := json.Unmarshal(data, &event); err != nil { log.Printf("Error unmarshalling event from file %s: %v", file.Name(), err) continue } r.ch <- event os.Remove(filePath) // Remove file after processing } } time.Sleep(10 * time.Second) // Check disk every 10 seconds } } } func (r *RELPHandler) isConnected() bool { _, err := r.conn.Write([]byte{}) return err == nil } func (r *RELPHandler) Shutdown() error { close(r.done) if r.conn != nil { if err := r.conn.Close(); err != nil { return fmt.Errorf("error closing TCP connection: %v", err) } } return nil } func (r *RELPHandler) writeFailedToDisk(event Event) { fileName := filepath.Join(cacheDir, "simplelog", uuid.New().String()+".logevent") data, _ := json.Marshal(event) ioutil.WriteFile(fileName, data, 0600) }