feat: add RELP handler for reliable log delivery to rsyslog (closes #5) #6

Open
clawbot wants to merge 2 commits from clawbot/simplelog:feat/relp-handler into main
3 changed files with 346 additions and 2 deletions
Showing only changes of commit 90032493b7 - Show all commits

View File

@ -19,9 +19,18 @@ Released v1.0.0 2024-06-14. Works as intended. No known bugs.
- if output is not a tty, outputs json
- supports delivering each log message via a webhook
## Planned Features
## RELP Delivery
- supports delivering logs via tcp RELP (e.g. to remote rsyslog using imrelp)
To deliver logs via RELP to a remote rsyslog server (using `imrelp`),
set the `LOGGER_RELP_URL` environment variable:
```bash
export LOGGER_RELP_URL=tcp://rsyslog.example.com:2514
```
Messages are formatted as RFC 5424 syslog and delivered reliably with
per-message acknowledgement. The connection is established lazily on
first log and reconnects automatically on failure.
## Installation

327
relp_handler.go Normal file
View File

@ -0,0 +1,327 @@
package simplelog
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net"
"net/url"
"os"
"strconv"
"strings"
"sync"
"time"
)
const (
relpVersion = "1"
relpSoftware = "simplelog,1.0.0,https://sneak.berlin/go/simplelog"
relpDefaultTimeout = 5 * time.Second
relpMaxTxnr = 999_999_999
)
// RELPHandler delivers log messages to a remote rsyslog server
// using the RELP (Reliable Event Logging Protocol).
type RELPHandler struct {
mu sync.Mutex
address string
conn net.Conn
txnr int
}
// NewRELPHandler creates a handler that sends logs via RELP.
// The relpURL should be in the form "tcp://host:port".
func NewRELPHandler(relpURL string) (*RELPHandler, error) {
u, err := url.Parse(relpURL)
if err != nil {
return nil, fmt.Errorf("invalid RELP URL: %w", err)
}
scheme := u.Scheme
if scheme == "" {
scheme = "tcp"
}
if scheme != "tcp" {
return nil, fmt.Errorf(
"unsupported RELP scheme %q, only tcp is supported",
scheme,
)
}
host := u.Host
if host == "" {
return nil, fmt.Errorf("RELP URL must include a host")
}
if _, _, err := net.SplitHostPort(host); err != nil {
host = net.JoinHostPort(host, "2514")
}
h := &RELPHandler{
address: host,
}
return h, nil
}
func (h *RELPHandler) Enabled(
_ context.Context,
_ slog.Level,
) bool {
return true
}
func (h *RELPHandler) WithAttrs(_ []slog.Attr) slog.Handler {
return h
}
func (h *RELPHandler) WithGroup(_ string) slog.Handler {
return h
}
func (h *RELPHandler) Handle(
_ context.Context,
record slog.Record,
) error {
h.mu.Lock()
defer h.mu.Unlock()
if err := h.ensureConnected(); err != nil {
return fmt.Errorf("relp connect: %w", err)
}
msg := h.formatSyslog(record)
if err := h.sendSyslog(msg); err != nil {
// Connection may be broken; close and let next call
// reconnect.
h.closeConn()
return fmt.Errorf("relp syslog: %w", err)
}
return nil
}
// ensureConnected dials and performs the RELP open handshake if
// no connection exists.
func (h *RELPHandler) ensureConnected() error {
if h.conn != nil {
return nil
}
conn, err := net.DialTimeout("tcp", h.address, relpDefaultTimeout)
if err != nil {
return err
}
h.conn = conn
h.txnr = 0
return h.open()
}
// open sends the RELP "open" command and reads the server's
// response.
func (h *RELPHandler) open() error {
offers := fmt.Sprintf(
"relp_version=%s\nrelp_software=%s\ncommands=syslog",
relpVersion,
relpSoftware,
)
if err := h.sendFrame("open", []byte(offers)); err != nil {
return fmt.Errorf("open send: %w", err)
}
_, _, err := h.readResponse()
if err != nil {
return fmt.Errorf("open rsp: %w", err)
}
return nil
}
// sendSyslog sends a single syslog message and waits for the
// acknowledgement.
func (h *RELPHandler) sendSyslog(msg []byte) error {
if err := h.sendFrame("syslog", msg); err != nil {
return err
}
code, _, err := h.readResponse()
if err != nil {
return err
}
if code != 200 {
return fmt.Errorf("server returned status %d", code)
}
return nil
}
// Close gracefully shuts down the RELP session.
func (h *RELPHandler) Close() error {
h.mu.Lock()
defer h.mu.Unlock()
if h.conn == nil {
return nil
}
// Best-effort close command.
_ = h.sendFrame("close", nil)
_, _, _ = h.readResponse()
return h.closeConn()
}
func (h *RELPHandler) closeConn() error {
if h.conn == nil {
return nil
}
err := h.conn.Close()
h.conn = nil
h.txnr = 0
return err
}
// nextTxnr returns the next transaction number.
func (h *RELPHandler) nextTxnr() int {
h.txnr++
if h.txnr > relpMaxTxnr {
h.txnr = 1
}
return h.txnr
}
// sendFrame writes a RELP frame to the connection.
// Frame format: TXNR SP COMMAND SP DATALEN [SP DATA] LF
func (h *RELPHandler) sendFrame(
command string,
data []byte,
) error {
txnr := h.nextTxnr()
dataLen := len(data)
var frame []byte
header := fmt.Sprintf("%d %s %d", txnr, command, dataLen)
if dataLen > 0 {
frame = make([]byte, 0, len(header)+1+dataLen+1)
frame = append(frame, header...)
frame = append(frame, ' ')
frame = append(frame, data...)
} else {
frame = make([]byte, 0, len(header)+1)
frame = append(frame, header...)
}
frame = append(frame, '\n')
_ = h.conn.SetWriteDeadline(
time.Now().Add(relpDefaultTimeout),
)
_, err := h.conn.Write(frame)
return err
}
// readResponse reads a RELP response frame from the connection.
// Returns the status code and any extra data.
func (h *RELPHandler) readResponse() (int, string, error) {
_ = h.conn.SetReadDeadline(
time.Now().Add(relpDefaultTimeout),
)
// Read until we hit LF. RELP frames are terminated by LF.
buf := make([]byte, 0, 1024)
one := make([]byte, 1)
for {
n, err := h.conn.Read(one)
if err != nil {
return 0, "", fmt.Errorf("read: %w", err)
}
if n == 0 {
continue
}
if one[0] == '\n' {
break
}
buf = append(buf, one[0])
if len(buf) > 128*1024 {
return 0, "", fmt.Errorf("response too large")
}
}
// Parse: TXNR SP "rsp" SP DATALEN [SP DATA]
frame := string(buf)
// Skip TXNR
parts := strings.SplitN(frame, " ", 4)
if len(parts) < 3 {
return 0, "", fmt.Errorf("malformed rsp frame: %q", frame)
}
dataLen, err := strconv.Atoi(parts[2])
if err != nil {
return 0, "", fmt.Errorf("bad datalen: %w", err)
}
var rspData string
if dataLen > 0 && len(parts) >= 4 {
rspData = parts[3]
}
// rspData format: STATUS SP HUMANTEXT [LF EXTRA]
if rspData == "" {
return 200, "", nil
}
statusStr := rspData
rest := ""
if idx := strings.IndexByte(rspData, ' '); idx >= 0 {
statusStr = rspData[:idx]
rest = rspData[idx+1:]
}
code, err := strconv.Atoi(statusStr)
if err != nil {
return 0, "", fmt.Errorf("bad status code %q: %w", statusStr, err)
}
return code, rest, nil
}
// formatSyslog formats a slog.Record as an RFC 5424 syslog
// message.
func (h *RELPHandler) formatSyslog(record slog.Record) []byte {
// Map slog levels to syslog severity.
var severity int
switch {
case record.Level >= slog.LevelError:
severity = 3 // err
case record.Level >= slog.LevelWarn:
severity = 4 // warning
case record.Level >= slog.LevelInfo:
severity = 6 // info
default:
severity = 7 // debug
}
// Facility 1 = user-level
priority := 1*8 + severity
hostname, _ := os.Hostname()
if hostname == "" {
hostname = "-"
}
ts := record.Time.UTC().Format(time.RFC3339Nano)
// Collect structured data from attributes.
attrs := make(map[string]string)
record.Attrs(func(a slog.Attr) bool {
attrs[a.Key] = a.Value.String()
return true
})
var sd string
if len(attrs) > 0 {
jsonBytes, _ := json.Marshal(attrs)
sd = string(jsonBytes)
} else {
sd = "-"
}
msg := fmt.Sprintf(
"<%d>1 %s %s simplelog - - %s %s",
priority,
ts,
hostname,
sd,
record.Message,
)
return []byte(msg)
}

View File

@ -15,6 +15,7 @@ import (
var (
webhookURL = os.Getenv("LOGGER_WEBHOOK_URL")
relpURL = os.Getenv("LOGGER_RELP_URL")
)
var ourCustomLogger *slog.Logger
@ -44,6 +45,13 @@ func NewMultiplexHandler() slog.Handler {
}
cl.handlers = append(cl.handlers, handler)
}
if relpURL != "" {
handler, err := NewRELPHandler(relpURL)
if err != nil {
log.Fatalf("Failed to initialize RELP handler: %v", err)
}
cl.handlers = append(cl.handlers, handler)
}
return cl
}