package simplelog import ( "encoding/json" "fmt" "io/ioutil" "log" "net" "net/url" "os" "path/filepath" "time" "github.com/google/uuid" ) var ( cacheDir = os.Getenv("LOGGER_CACHE_DIR") diskBufferLimit = getEnvAsInt("LOGGER_DISK_BUFFER_LIMIT", 100) diskWriteInterval = getEnvAsDuration("LOGGER_DISK_WRITE_INTERVAL", time.Second) ) type RELPHandler struct { relpServerURL string conn net.Conn ch chan Event done chan struct{} failedCh chan Event timer *time.Timer } func NewRELPHandler(relpURL string) (*RELPHandler, error) { parsedURL, err := url.Parse(relpURL) if err != nil { return nil, fmt.Errorf("Error parsing RELP URL: %v", err) } if parsedURL.Scheme != "tcp" { return nil, fmt.Errorf("RELP URL must have the tcp scheme, got %s", parsedURL.Scheme) } if err := os.MkdirAll(cacheDir, 0755); err != nil { return nil, fmt.Errorf("Failed to create cache directory: %v", err) } return &RELPHandler{ relpServerURL: parsedURL.Host, ch: make(chan Event, diskBufferLimit), done: make(chan struct{}), failedCh: make(chan Event, diskBufferLimit), timer: time.NewTimer(diskWriteInterval), }, nil } func (r *RELPHandler) Startup() error { var err error r.conn, err = r.connectToRELPServer() if err != nil { return fmt.Errorf("Failed to establish TCP connection: %v", err) } go r.receiveEventsFromChannel() go r.processFailedEvents() go r.watchDirectoryForFailedEventFiles() return nil } func (r *RELPHandler) Log(event Event) error { select { case r.ch <- event: return nil // Successfully sent event to channel default: return fmt.Errorf("failed to log event: channel is full") } } func (r *RELPHandler) receiveEventsFromChannel() { for { select { case event := <-r.ch: if err := r.sendEventToRELPServer(event); err != nil { r.failedCh <- event } case <-r.done: return } } } func (r *RELPHandler) processFailedEvents() { for { select { case event := <-r.failedCh: if err := r.sendEventToRELPServer(event); err != nil { // If still failing, write to disk r.writeFailedToDisk(event) } case <-r.timer.C: // Flush all events in failedCh to disk for len(r.failedCh) > 0 { event := <-r.failedCh r.writeFailedToDisk(event) } case <-r.done: return } } } func (r *RELPHandler) sendEventToRELPServer(event Event) error { jsonData, err := json.Marshal(event) if err != nil { return fmt.Errorf("error marshaling event: %v", err) } _, err = r.conn.Write(jsonData) if err != nil { return err } // Implement TCP read for the acknowledgment with a timeout r.conn.SetReadDeadline(time.Now().Add(2 * time.Second)) ack := make([]byte, 256) n, err := r.conn.Read(ack) if err != nil || string(ack[:n]) != "ACK" { return err } return nil } func (r *RELPHandler) watchDirectoryForFailedEventFiles() { for { select { case <-r.done: return default: if r.isConnected() { files, err := ioutil.ReadDir(cacheDir) if err != nil { log.Printf("Error reading cache directory: %v", err) continue } for _, file := range files { filePath := filepath.Join(cacheDir, file.Name()) data, err := ioutil.ReadFile(filePath) if err != nil { log.Printf("Error reading file %s: %v", file.Name(), err) continue } var event Event if err := json.Unmarshal(data, &event); err != nil { log.Printf("Error unmarshalling event from file %s: %v", file.Name(), err) continue } r.ch <- event os.Remove(filePath) // Remove file after processing } } time.Sleep(10 * time.Second) // Check disk every 10 seconds } } } func (r *RELPHandler) isConnected() bool { _, err := r.conn.Write([]byte{}) return err == nil } func (r *RELPHandler) Shutdown() error { close(r.done) if r.conn != nil { if err := r.conn.Close(); err != nil { return fmt.Errorf("error closing TCP connection: %v", err) } } return nil } func (r *RELPHandler) writeFailedToDisk(event Event) { fileName := filepath.Join(cacheDir, uuid.New().String()+".logevent") data, _ := json.Marshal(event) ioutil.WriteFile(fileName, data, 0600) }