diff --git a/README.md b/README.md index 3f6222e..e9f21c8 100644 --- a/README.md +++ b/README.md @@ -19,9 +19,18 @@ Released v1.0.0 2024-06-14. Works as intended. No known bugs. - if output is not a tty, outputs json - supports delivering each log message via a webhook -## Planned Features +## RELP Delivery -- supports delivering logs via tcp RELP (e.g. to remote rsyslog using imrelp) +To deliver logs via RELP to a remote rsyslog server (using `imrelp`), +set the `LOGGER_RELP_URL` environment variable: + +```bash +export LOGGER_RELP_URL=tcp://rsyslog.example.com:2514 +``` + +Messages are formatted as RFC 5424 syslog and delivered reliably with +per-message acknowledgement. The connection is established lazily on +first log and reconnects automatically on failure. ## Installation diff --git a/relp_handler.go b/relp_handler.go new file mode 100644 index 0000000..dbe8327 --- /dev/null +++ b/relp_handler.go @@ -0,0 +1,327 @@ +package simplelog + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "net" + "net/url" + "os" + "strconv" + "strings" + "sync" + "time" +) + +const ( + relpVersion = "1" + relpSoftware = "simplelog,1.0.0,https://sneak.berlin/go/simplelog" + relpDefaultTimeout = 5 * time.Second + relpMaxTxnr = 999_999_999 +) + +// RELPHandler delivers log messages to a remote rsyslog server +// using the RELP (Reliable Event Logging Protocol). +type RELPHandler struct { + mu sync.Mutex + address string + conn net.Conn + txnr int +} + +// NewRELPHandler creates a handler that sends logs via RELP. +// The relpURL should be in the form "tcp://host:port". +func NewRELPHandler(relpURL string) (*RELPHandler, error) { + u, err := url.Parse(relpURL) + if err != nil { + return nil, fmt.Errorf("invalid RELP URL: %w", err) + } + scheme := u.Scheme + if scheme == "" { + scheme = "tcp" + } + if scheme != "tcp" { + return nil, fmt.Errorf( + "unsupported RELP scheme %q, only tcp is supported", + scheme, + ) + } + host := u.Host + if host == "" { + return nil, fmt.Errorf("RELP URL must include a host") + } + if _, _, err := net.SplitHostPort(host); err != nil { + host = net.JoinHostPort(host, "2514") + } + + h := &RELPHandler{ + address: host, + } + return h, nil +} + +func (h *RELPHandler) Enabled( + _ context.Context, + _ slog.Level, +) bool { + return true +} + +func (h *RELPHandler) WithAttrs(_ []slog.Attr) slog.Handler { + return h +} + +func (h *RELPHandler) WithGroup(_ string) slog.Handler { + return h +} + +func (h *RELPHandler) Handle( + _ context.Context, + record slog.Record, +) error { + h.mu.Lock() + defer h.mu.Unlock() + + if err := h.ensureConnected(); err != nil { + return fmt.Errorf("relp connect: %w", err) + } + + msg := h.formatSyslog(record) + if err := h.sendSyslog(msg); err != nil { + // Connection may be broken; close and let next call + // reconnect. + h.closeConn() + return fmt.Errorf("relp syslog: %w", err) + } + return nil +} + +// ensureConnected dials and performs the RELP open handshake if +// no connection exists. +func (h *RELPHandler) ensureConnected() error { + if h.conn != nil { + return nil + } + + conn, err := net.DialTimeout("tcp", h.address, relpDefaultTimeout) + if err != nil { + return err + } + h.conn = conn + h.txnr = 0 + + return h.open() +} + +// open sends the RELP "open" command and reads the server's +// response. +func (h *RELPHandler) open() error { + offers := fmt.Sprintf( + "relp_version=%s\nrelp_software=%s\ncommands=syslog", + relpVersion, + relpSoftware, + ) + if err := h.sendFrame("open", []byte(offers)); err != nil { + return fmt.Errorf("open send: %w", err) + } + _, _, err := h.readResponse() + if err != nil { + return fmt.Errorf("open rsp: %w", err) + } + return nil +} + +// sendSyslog sends a single syslog message and waits for the +// acknowledgement. +func (h *RELPHandler) sendSyslog(msg []byte) error { + if err := h.sendFrame("syslog", msg); err != nil { + return err + } + code, _, err := h.readResponse() + if err != nil { + return err + } + if code != 200 { + return fmt.Errorf("server returned status %d", code) + } + return nil +} + +// Close gracefully shuts down the RELP session. +func (h *RELPHandler) Close() error { + h.mu.Lock() + defer h.mu.Unlock() + + if h.conn == nil { + return nil + } + + // Best-effort close command. + _ = h.sendFrame("close", nil) + _, _, _ = h.readResponse() + return h.closeConn() +} + +func (h *RELPHandler) closeConn() error { + if h.conn == nil { + return nil + } + err := h.conn.Close() + h.conn = nil + h.txnr = 0 + return err +} + +// nextTxnr returns the next transaction number. +func (h *RELPHandler) nextTxnr() int { + h.txnr++ + if h.txnr > relpMaxTxnr { + h.txnr = 1 + } + return h.txnr +} + +// sendFrame writes a RELP frame to the connection. +// Frame format: TXNR SP COMMAND SP DATALEN [SP DATA] LF +func (h *RELPHandler) sendFrame( + command string, + data []byte, +) error { + txnr := h.nextTxnr() + dataLen := len(data) + + var frame []byte + header := fmt.Sprintf("%d %s %d", txnr, command, dataLen) + if dataLen > 0 { + frame = make([]byte, 0, len(header)+1+dataLen+1) + frame = append(frame, header...) + frame = append(frame, ' ') + frame = append(frame, data...) + } else { + frame = make([]byte, 0, len(header)+1) + frame = append(frame, header...) + } + frame = append(frame, '\n') + + _ = h.conn.SetWriteDeadline( + time.Now().Add(relpDefaultTimeout), + ) + _, err := h.conn.Write(frame) + return err +} + +// readResponse reads a RELP response frame from the connection. +// Returns the status code and any extra data. +func (h *RELPHandler) readResponse() (int, string, error) { + _ = h.conn.SetReadDeadline( + time.Now().Add(relpDefaultTimeout), + ) + + // Read until we hit LF. RELP frames are terminated by LF. + buf := make([]byte, 0, 1024) + one := make([]byte, 1) + for { + n, err := h.conn.Read(one) + if err != nil { + return 0, "", fmt.Errorf("read: %w", err) + } + if n == 0 { + continue + } + if one[0] == '\n' { + break + } + buf = append(buf, one[0]) + if len(buf) > 128*1024 { + return 0, "", fmt.Errorf("response too large") + } + } + + // Parse: TXNR SP "rsp" SP DATALEN [SP DATA] + frame := string(buf) + + // Skip TXNR + parts := strings.SplitN(frame, " ", 4) + if len(parts) < 3 { + return 0, "", fmt.Errorf("malformed rsp frame: %q", frame) + } + + dataLen, err := strconv.Atoi(parts[2]) + if err != nil { + return 0, "", fmt.Errorf("bad datalen: %w", err) + } + + var rspData string + if dataLen > 0 && len(parts) >= 4 { + rspData = parts[3] + } + + // rspData format: STATUS SP HUMANTEXT [LF EXTRA] + if rspData == "" { + return 200, "", nil + } + statusStr := rspData + rest := "" + if idx := strings.IndexByte(rspData, ' '); idx >= 0 { + statusStr = rspData[:idx] + rest = rspData[idx+1:] + } + code, err := strconv.Atoi(statusStr) + if err != nil { + return 0, "", fmt.Errorf("bad status code %q: %w", statusStr, err) + } + return code, rest, nil +} + +// formatSyslog formats a slog.Record as an RFC 5424 syslog +// message. +func (h *RELPHandler) formatSyslog(record slog.Record) []byte { + // Map slog levels to syslog severity. + var severity int + switch { + case record.Level >= slog.LevelError: + severity = 3 // err + case record.Level >= slog.LevelWarn: + severity = 4 // warning + case record.Level >= slog.LevelInfo: + severity = 6 // info + default: + severity = 7 // debug + } + + // Facility 1 = user-level + priority := 1*8 + severity + + hostname, _ := os.Hostname() + if hostname == "" { + hostname = "-" + } + + ts := record.Time.UTC().Format(time.RFC3339Nano) + + // Collect structured data from attributes. + attrs := make(map[string]string) + record.Attrs(func(a slog.Attr) bool { + attrs[a.Key] = a.Value.String() + return true + }) + + var sd string + if len(attrs) > 0 { + jsonBytes, _ := json.Marshal(attrs) + sd = string(jsonBytes) + } else { + sd = "-" + } + + msg := fmt.Sprintf( + "<%d>1 %s %s simplelog - - %s %s", + priority, + ts, + hostname, + sd, + record.Message, + ) + return []byte(msg) +} diff --git a/simplelog.go b/simplelog.go index 08316bc..1bf13f0 100644 --- a/simplelog.go +++ b/simplelog.go @@ -15,6 +15,7 @@ import ( var ( webhookURL = os.Getenv("LOGGER_WEBHOOK_URL") + relpURL = os.Getenv("LOGGER_RELP_URL") ) var ourCustomLogger *slog.Logger @@ -44,6 +45,13 @@ func NewMultiplexHandler() slog.Handler { } cl.handlers = append(cl.handlers, handler) } + if relpURL != "" { + handler, err := NewRELPHandler(relpURL) + if err != nil { + log.Fatalf("Failed to initialize RELP handler: %v", err) + } + cl.handlers = append(cl.handlers, handler) + } return cl }