forked from sneak/simplelog
Compare commits
No commits in common. "90032493b70a9c5993735aded173bb08bd9d78ea" and "31c9ed52cb8f68d690dc57caecc1b8f7a0e5f0bc" have entirely different histories.
90032493b7
...
31c9ed52cb
13
README.md
13
README.md
@ -19,18 +19,9 @@ Released v1.0.0 2024-06-14. Works as intended. No known bugs.
|
|||||||
- if output is not a tty, outputs json
|
- if output is not a tty, outputs json
|
||||||
- supports delivering each log message via a webhook
|
- supports delivering each log message via a webhook
|
||||||
|
|
||||||
## RELP Delivery
|
## Planned Features
|
||||||
|
|
||||||
To deliver logs via RELP to a remote rsyslog server (using `imrelp`),
|
- supports delivering logs via tcp RELP (e.g. to remote rsyslog using imrelp)
|
||||||
set the `LOGGER_RELP_URL` environment variable:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
export LOGGER_RELP_URL=tcp://rsyslog.example.com:2514
|
|
||||||
```
|
|
||||||
|
|
||||||
Messages are formatted as RFC 5424 syslog and delivered reliably with
|
|
||||||
per-message acknowledgement. The connection is established lazily on
|
|
||||||
first log and reconnects automatically on failure.
|
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
|
|||||||
@ -3,9 +3,8 @@ package simplelog
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"log"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type JSONHandler struct{}
|
type JSONHandler struct{}
|
||||||
@ -16,7 +15,7 @@ func NewJSONHandler() *JSONHandler {
|
|||||||
|
|
||||||
func (j *JSONHandler) Handle(ctx context.Context, record slog.Record) error {
|
func (j *JSONHandler) Handle(ctx context.Context, record slog.Record) error {
|
||||||
jsonData, _ := json.Marshal(record)
|
jsonData, _ := json.Marshal(record)
|
||||||
fmt.Fprintln(os.Stdout, string(jsonData))
|
log.Println(string(jsonData))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
327
relp_handler.go
327
relp_handler.go
@ -1,327 +0,0 @@
|
|||||||
package simplelog
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"log/slog"
|
|
||||||
"net"
|
|
||||||
"net/url"
|
|
||||||
"os"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
relpVersion = "1"
|
|
||||||
relpSoftware = "simplelog,1.0.0,https://sneak.berlin/go/simplelog"
|
|
||||||
relpDefaultTimeout = 5 * time.Second
|
|
||||||
relpMaxTxnr = 999_999_999
|
|
||||||
)
|
|
||||||
|
|
||||||
// RELPHandler delivers log messages to a remote rsyslog server
|
|
||||||
// using the RELP (Reliable Event Logging Protocol).
|
|
||||||
type RELPHandler struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
address string
|
|
||||||
conn net.Conn
|
|
||||||
txnr int
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewRELPHandler creates a handler that sends logs via RELP.
|
|
||||||
// The relpURL should be in the form "tcp://host:port".
|
|
||||||
func NewRELPHandler(relpURL string) (*RELPHandler, error) {
|
|
||||||
u, err := url.Parse(relpURL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid RELP URL: %w", err)
|
|
||||||
}
|
|
||||||
scheme := u.Scheme
|
|
||||||
if scheme == "" {
|
|
||||||
scheme = "tcp"
|
|
||||||
}
|
|
||||||
if scheme != "tcp" {
|
|
||||||
return nil, fmt.Errorf(
|
|
||||||
"unsupported RELP scheme %q, only tcp is supported",
|
|
||||||
scheme,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
host := u.Host
|
|
||||||
if host == "" {
|
|
||||||
return nil, fmt.Errorf("RELP URL must include a host")
|
|
||||||
}
|
|
||||||
if _, _, err := net.SplitHostPort(host); err != nil {
|
|
||||||
host = net.JoinHostPort(host, "2514")
|
|
||||||
}
|
|
||||||
|
|
||||||
h := &RELPHandler{
|
|
||||||
address: host,
|
|
||||||
}
|
|
||||||
return h, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *RELPHandler) Enabled(
|
|
||||||
_ context.Context,
|
|
||||||
_ slog.Level,
|
|
||||||
) bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *RELPHandler) WithAttrs(_ []slog.Attr) slog.Handler {
|
|
||||||
return h
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *RELPHandler) WithGroup(_ string) slog.Handler {
|
|
||||||
return h
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *RELPHandler) Handle(
|
|
||||||
_ context.Context,
|
|
||||||
record slog.Record,
|
|
||||||
) error {
|
|
||||||
h.mu.Lock()
|
|
||||||
defer h.mu.Unlock()
|
|
||||||
|
|
||||||
if err := h.ensureConnected(); err != nil {
|
|
||||||
return fmt.Errorf("relp connect: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := h.formatSyslog(record)
|
|
||||||
if err := h.sendSyslog(msg); err != nil {
|
|
||||||
// Connection may be broken; close and let next call
|
|
||||||
// reconnect.
|
|
||||||
h.closeConn()
|
|
||||||
return fmt.Errorf("relp syslog: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ensureConnected dials and performs the RELP open handshake if
|
|
||||||
// no connection exists.
|
|
||||||
func (h *RELPHandler) ensureConnected() error {
|
|
||||||
if h.conn != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
conn, err := net.DialTimeout("tcp", h.address, relpDefaultTimeout)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
h.conn = conn
|
|
||||||
h.txnr = 0
|
|
||||||
|
|
||||||
return h.open()
|
|
||||||
}
|
|
||||||
|
|
||||||
// open sends the RELP "open" command and reads the server's
|
|
||||||
// response.
|
|
||||||
func (h *RELPHandler) open() error {
|
|
||||||
offers := fmt.Sprintf(
|
|
||||||
"relp_version=%s\nrelp_software=%s\ncommands=syslog",
|
|
||||||
relpVersion,
|
|
||||||
relpSoftware,
|
|
||||||
)
|
|
||||||
if err := h.sendFrame("open", []byte(offers)); err != nil {
|
|
||||||
return fmt.Errorf("open send: %w", err)
|
|
||||||
}
|
|
||||||
_, _, err := h.readResponse()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("open rsp: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// sendSyslog sends a single syslog message and waits for the
|
|
||||||
// acknowledgement.
|
|
||||||
func (h *RELPHandler) sendSyslog(msg []byte) error {
|
|
||||||
if err := h.sendFrame("syslog", msg); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
code, _, err := h.readResponse()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if code != 200 {
|
|
||||||
return fmt.Errorf("server returned status %d", code)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close gracefully shuts down the RELP session.
|
|
||||||
func (h *RELPHandler) Close() error {
|
|
||||||
h.mu.Lock()
|
|
||||||
defer h.mu.Unlock()
|
|
||||||
|
|
||||||
if h.conn == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Best-effort close command.
|
|
||||||
_ = h.sendFrame("close", nil)
|
|
||||||
_, _, _ = h.readResponse()
|
|
||||||
return h.closeConn()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *RELPHandler) closeConn() error {
|
|
||||||
if h.conn == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
err := h.conn.Close()
|
|
||||||
h.conn = nil
|
|
||||||
h.txnr = 0
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// nextTxnr returns the next transaction number.
|
|
||||||
func (h *RELPHandler) nextTxnr() int {
|
|
||||||
h.txnr++
|
|
||||||
if h.txnr > relpMaxTxnr {
|
|
||||||
h.txnr = 1
|
|
||||||
}
|
|
||||||
return h.txnr
|
|
||||||
}
|
|
||||||
|
|
||||||
// sendFrame writes a RELP frame to the connection.
|
|
||||||
// Frame format: TXNR SP COMMAND SP DATALEN [SP DATA] LF
|
|
||||||
func (h *RELPHandler) sendFrame(
|
|
||||||
command string,
|
|
||||||
data []byte,
|
|
||||||
) error {
|
|
||||||
txnr := h.nextTxnr()
|
|
||||||
dataLen := len(data)
|
|
||||||
|
|
||||||
var frame []byte
|
|
||||||
header := fmt.Sprintf("%d %s %d", txnr, command, dataLen)
|
|
||||||
if dataLen > 0 {
|
|
||||||
frame = make([]byte, 0, len(header)+1+dataLen+1)
|
|
||||||
frame = append(frame, header...)
|
|
||||||
frame = append(frame, ' ')
|
|
||||||
frame = append(frame, data...)
|
|
||||||
} else {
|
|
||||||
frame = make([]byte, 0, len(header)+1)
|
|
||||||
frame = append(frame, header...)
|
|
||||||
}
|
|
||||||
frame = append(frame, '\n')
|
|
||||||
|
|
||||||
_ = h.conn.SetWriteDeadline(
|
|
||||||
time.Now().Add(relpDefaultTimeout),
|
|
||||||
)
|
|
||||||
_, err := h.conn.Write(frame)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// readResponse reads a RELP response frame from the connection.
|
|
||||||
// Returns the status code and any extra data.
|
|
||||||
func (h *RELPHandler) readResponse() (int, string, error) {
|
|
||||||
_ = h.conn.SetReadDeadline(
|
|
||||||
time.Now().Add(relpDefaultTimeout),
|
|
||||||
)
|
|
||||||
|
|
||||||
// Read until we hit LF. RELP frames are terminated by LF.
|
|
||||||
buf := make([]byte, 0, 1024)
|
|
||||||
one := make([]byte, 1)
|
|
||||||
for {
|
|
||||||
n, err := h.conn.Read(one)
|
|
||||||
if err != nil {
|
|
||||||
return 0, "", fmt.Errorf("read: %w", err)
|
|
||||||
}
|
|
||||||
if n == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if one[0] == '\n' {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
buf = append(buf, one[0])
|
|
||||||
if len(buf) > 128*1024 {
|
|
||||||
return 0, "", fmt.Errorf("response too large")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse: TXNR SP "rsp" SP DATALEN [SP DATA]
|
|
||||||
frame := string(buf)
|
|
||||||
|
|
||||||
// Skip TXNR
|
|
||||||
parts := strings.SplitN(frame, " ", 4)
|
|
||||||
if len(parts) < 3 {
|
|
||||||
return 0, "", fmt.Errorf("malformed rsp frame: %q", frame)
|
|
||||||
}
|
|
||||||
|
|
||||||
dataLen, err := strconv.Atoi(parts[2])
|
|
||||||
if err != nil {
|
|
||||||
return 0, "", fmt.Errorf("bad datalen: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var rspData string
|
|
||||||
if dataLen > 0 && len(parts) >= 4 {
|
|
||||||
rspData = parts[3]
|
|
||||||
}
|
|
||||||
|
|
||||||
// rspData format: STATUS SP HUMANTEXT [LF EXTRA]
|
|
||||||
if rspData == "" {
|
|
||||||
return 200, "", nil
|
|
||||||
}
|
|
||||||
statusStr := rspData
|
|
||||||
rest := ""
|
|
||||||
if idx := strings.IndexByte(rspData, ' '); idx >= 0 {
|
|
||||||
statusStr = rspData[:idx]
|
|
||||||
rest = rspData[idx+1:]
|
|
||||||
}
|
|
||||||
code, err := strconv.Atoi(statusStr)
|
|
||||||
if err != nil {
|
|
||||||
return 0, "", fmt.Errorf("bad status code %q: %w", statusStr, err)
|
|
||||||
}
|
|
||||||
return code, rest, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// formatSyslog formats a slog.Record as an RFC 5424 syslog
|
|
||||||
// message.
|
|
||||||
func (h *RELPHandler) formatSyslog(record slog.Record) []byte {
|
|
||||||
// Map slog levels to syslog severity.
|
|
||||||
var severity int
|
|
||||||
switch {
|
|
||||||
case record.Level >= slog.LevelError:
|
|
||||||
severity = 3 // err
|
|
||||||
case record.Level >= slog.LevelWarn:
|
|
||||||
severity = 4 // warning
|
|
||||||
case record.Level >= slog.LevelInfo:
|
|
||||||
severity = 6 // info
|
|
||||||
default:
|
|
||||||
severity = 7 // debug
|
|
||||||
}
|
|
||||||
|
|
||||||
// Facility 1 = user-level
|
|
||||||
priority := 1*8 + severity
|
|
||||||
|
|
||||||
hostname, _ := os.Hostname()
|
|
||||||
if hostname == "" {
|
|
||||||
hostname = "-"
|
|
||||||
}
|
|
||||||
|
|
||||||
ts := record.Time.UTC().Format(time.RFC3339Nano)
|
|
||||||
|
|
||||||
// Collect structured data from attributes.
|
|
||||||
attrs := make(map[string]string)
|
|
||||||
record.Attrs(func(a slog.Attr) bool {
|
|
||||||
attrs[a.Key] = a.Value.String()
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
var sd string
|
|
||||||
if len(attrs) > 0 {
|
|
||||||
jsonBytes, _ := json.Marshal(attrs)
|
|
||||||
sd = string(jsonBytes)
|
|
||||||
} else {
|
|
||||||
sd = "-"
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := fmt.Sprintf(
|
|
||||||
"<%d>1 %s %s simplelog - - %s %s",
|
|
||||||
priority,
|
|
||||||
ts,
|
|
||||||
hostname,
|
|
||||||
sd,
|
|
||||||
record.Message,
|
|
||||||
)
|
|
||||||
return []byte(msg)
|
|
||||||
}
|
|
||||||
@ -15,7 +15,6 @@ import (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
webhookURL = os.Getenv("LOGGER_WEBHOOK_URL")
|
webhookURL = os.Getenv("LOGGER_WEBHOOK_URL")
|
||||||
relpURL = os.Getenv("LOGGER_RELP_URL")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var ourCustomLogger *slog.Logger
|
var ourCustomLogger *slog.Logger
|
||||||
@ -45,13 +44,6 @@ func NewMultiplexHandler() slog.Handler {
|
|||||||
}
|
}
|
||||||
cl.handlers = append(cl.handlers, handler)
|
cl.handlers = append(cl.handlers, handler)
|
||||||
}
|
}
|
||||||
if relpURL != "" {
|
|
||||||
handler, err := NewRELPHandler(relpURL)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to initialize RELP handler: %v", err)
|
|
||||||
}
|
|
||||||
cl.handlers = append(cl.handlers, handler)
|
|
||||||
}
|
|
||||||
return cl
|
return cl
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user