Compare commits
17 Commits
29c31d44f8
...
feat/relp-
| Author | SHA1 | Date | |
|---|---|---|---|
| f311d9c781 | |||
| 9121da9aae | |||
| 74ce052b77 | |||
| 1eef38a5fa | |||
| 97a82e9b2c | |||
|
|
90032493b7 | ||
|
|
b20bc2bd4e | ||
|
|
869b7ca4c3 | ||
| 31c9ed52cb | |||
| 28d0d041b0 | |||
| 278cb73053 | |||
| ea0c84547f | |||
| a852d938e7 | |||
| a660203e8f | |||
| 000fe293ee | |||
| d8f35dd031 | |||
| 60cb410c32 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1 +1,2 @@
|
|||||||
.aider*
|
.aider*
|
||||||
|
cmd/example/example
|
||||||
|
|||||||
39
Dockerfile
Normal file
39
Dockerfile
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
# First stage: Use the golangci-lint image to run the linter
|
||||||
|
FROM golangci/golangci-lint:latest as lint
|
||||||
|
|
||||||
|
# Set the Current Working Directory inside the container
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Copy the go.mod file and the rest of the application code
|
||||||
|
COPY go.mod ./
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
# Run golangci-lint
|
||||||
|
RUN golangci-lint run
|
||||||
|
|
||||||
|
RUN sh -c 'test -z "$(gofmt -l .)"'
|
||||||
|
|
||||||
|
# Second stage: Use the official Golang image to run tests
|
||||||
|
FROM golang:1.22 as test
|
||||||
|
|
||||||
|
# Set the Current Working Directory inside the container
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Copy the go.mod file and the rest of the application code
|
||||||
|
COPY go.mod ./
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
# Run tests
|
||||||
|
RUN go test -v ./...
|
||||||
|
|
||||||
|
# Final stage: Combine the linting and testing stages
|
||||||
|
FROM golang:1.22 as final
|
||||||
|
|
||||||
|
# Ensure that the linting stage succeeded
|
||||||
|
WORKDIR /app
|
||||||
|
COPY --from=lint /app .
|
||||||
|
COPY --from=test /app .
|
||||||
|
|
||||||
|
# Set the final CMD to something minimal since we only needed to verify lint and tests during build
|
||||||
|
CMD ["echo", "Build and tests passed successfully!"]
|
||||||
|
|
||||||
14
LICENSE
Normal file
14
LICENSE
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
|
||||||
|
Version 2, December 2004
|
||||||
|
|
||||||
|
Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
|
||||||
|
|
||||||
|
Everyone is permitted to copy and distribute verbatim or modified
|
||||||
|
copies of this license document, and changing it is allowed as long
|
||||||
|
as the name is changed.
|
||||||
|
|
||||||
|
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
|
||||||
|
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
|
||||||
|
|
||||||
|
0. You just DO WHAT THE FUCK YOU WANT TO.
|
||||||
|
|
||||||
3
Makefile
3
Makefile
@@ -12,3 +12,6 @@ fmt:
|
|||||||
lint:
|
lint:
|
||||||
golangci-lint run
|
golangci-lint run
|
||||||
sh -c 'test -z "$$(gofmt -l .)"'
|
sh -c 'test -z "$$(gofmt -l .)"'
|
||||||
|
|
||||||
|
docker:
|
||||||
|
docker build --progress plain .
|
||||||
|
|||||||
38
README.md
38
README.md
@@ -1,5 +1,7 @@
|
|||||||
# simplelog
|
# simplelog
|
||||||
|
|
||||||
|
## Summary
|
||||||
|
|
||||||
simplelog is an opinionated logging package designed to facilitate easy and
|
simplelog is an opinionated logging package designed to facilitate easy and
|
||||||
structured logging in Go applications with an absolute minimum of
|
structured logging in Go applications with an absolute minimum of
|
||||||
boilerplate.
|
boilerplate.
|
||||||
@@ -7,12 +9,28 @@ boilerplate.
|
|||||||
The idea is that you can add a single import line which replaces the
|
The idea is that you can add a single import line which replaces the
|
||||||
stdlib `log/slog` default handler, and solve the 90% case for logging.
|
stdlib `log/slog` default handler, and solve the 90% case for logging.
|
||||||
|
|
||||||
|
## Current Status
|
||||||
|
|
||||||
|
Released v1.0.0 2024-06-14. Works as intended. No known bugs.
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
* if output is a tty, outputs pretty color logs
|
- if output is a tty, outputs pretty color logs
|
||||||
* if output is not a tty, outputs json
|
- if output is not a tty, outputs json
|
||||||
* supports delivering logs via tcp RELP (e.g. to remote rsyslog using imrelp)
|
- supports delivering each log message via a webhook
|
||||||
* supports delivering each log message via a webhook
|
|
||||||
|
## RELP Delivery
|
||||||
|
|
||||||
|
To deliver logs via RELP to a remote rsyslog server (using `imrelp`),
|
||||||
|
set the `LOGGER_RELP_URL` environment variable:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
export LOGGER_RELP_URL=tcp://rsyslog.example.com:2514
|
||||||
|
```
|
||||||
|
|
||||||
|
Messages are formatted as RFC 5424 syslog and delivered reliably with
|
||||||
|
per-message acknowledgement. The connection is established lazily on
|
||||||
|
first log and reconnects automatically on failure.
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
@@ -25,19 +43,21 @@ go mod init your_project_name
|
|||||||
Then, add SimpleLog to your project:
|
Then, add SimpleLog to your project:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
go get git.eeqj.de/sneak/go-simplelog
|
go get sneak.berlin/go/simplelog
|
||||||
```
|
```
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
Below is an example of how to use SimpleLog in a Go application. This example is provided in the form of a `main.go` file, which demonstrates logging at various levels using structured logging syntax.
|
Below is an example of how to use SimpleLog in a Go application. This
|
||||||
|
example is provided in the form of a `main.go` file, which demonstrates
|
||||||
|
logging at various levels using structured logging syntax.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log/slog"
|
"log/slog"
|
||||||
_ "git.eeqj.de/sneak/go-simplelog"
|
_ "sneak.berlin/go/simplelog"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@@ -48,3 +68,7 @@ func main() {
|
|||||||
slog.Error("Failed to save data", slog.String("reason", "permission denied"))
|
slog.Error("Failed to save data", slog.String("reason", "permission denied"))
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## License
|
||||||
|
|
||||||
|
[WTFPL](./LICENSE)
|
||||||
|
|||||||
26
cmd/example/main.go
Normal file
26
cmd/example/main.go
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log/slog"
|
||||||
|
|
||||||
|
_ "sneak.berlin/go/simplelog"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
|
||||||
|
// log structured data with slog as usual:
|
||||||
|
slog.Info(
|
||||||
|
"User login attempt",
|
||||||
|
slog.String("user", "JohnDoe"),
|
||||||
|
slog.Int("attempt", 3),
|
||||||
|
)
|
||||||
|
slog.Warn(
|
||||||
|
"Configuration mismatch",
|
||||||
|
slog.String("expected", "config.json"),
|
||||||
|
slog.String("found", "config.dev.json"),
|
||||||
|
)
|
||||||
|
slog.Error(
|
||||||
|
"Failed to save data",
|
||||||
|
slog.String("reason", "permission denied"),
|
||||||
|
)
|
||||||
|
}
|
||||||
@@ -16,7 +16,10 @@ func NewConsoleHandler() *ConsoleHandler {
|
|||||||
return &ConsoleHandler{}
|
return &ConsoleHandler{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConsoleHandler) Handle(ctx context.Context, record slog.Record) error {
|
func (c *ConsoleHandler) Handle(
|
||||||
|
ctx context.Context,
|
||||||
|
record slog.Record,
|
||||||
|
) error {
|
||||||
timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z07:00")
|
timestamp := time.Now().UTC().Format("2006-01-02T15:04:05.000Z07:00")
|
||||||
var colorFunc func(format string, a ...interface{}) string
|
var colorFunc func(format string, a ...interface{}) string
|
||||||
|
|
||||||
@@ -32,16 +35,28 @@ func (c *ConsoleHandler) Handle(ctx context.Context, record slog.Record) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get the caller information
|
// Get the caller information
|
||||||
_, file, line, ok := runtime.Caller(5)
|
_, file, line, ok := runtime.Caller(4)
|
||||||
if !ok {
|
if !ok {
|
||||||
file = "???"
|
file = "???"
|
||||||
line = 0
|
line = 0
|
||||||
}
|
}
|
||||||
fmt.Println(colorFunc("%s [%s] %s:%d: %s", timestamp, record.Level, file, line, record.Message))
|
fmt.Println(
|
||||||
|
colorFunc(
|
||||||
|
"%s [%s] %s:%d: %s",
|
||||||
|
timestamp,
|
||||||
|
record.Level,
|
||||||
|
file,
|
||||||
|
line,
|
||||||
|
record.Message,
|
||||||
|
),
|
||||||
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConsoleHandler) Enabled(ctx context.Context, level slog.Level) bool {
|
func (c *ConsoleHandler) Enabled(
|
||||||
|
ctx context.Context,
|
||||||
|
level slog.Level,
|
||||||
|
) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
1
event.go
1
event.go
@@ -15,7 +15,6 @@ type Event struct {
|
|||||||
Data json.RawMessage `json:"data"`
|
Data json.RawMessage `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func NewEvent(level, message string, data json.RawMessage) Event {
|
func NewEvent(level, message string, data json.RawMessage) Event {
|
||||||
return Event{
|
return Event{
|
||||||
ID: uuid.New(),
|
ID: uuid.New(),
|
||||||
|
|||||||
@@ -3,8 +3,9 @@ package simplelog
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"log"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
type JSONHandler struct{}
|
type JSONHandler struct{}
|
||||||
@@ -15,7 +16,7 @@ func NewJSONHandler() *JSONHandler {
|
|||||||
|
|
||||||
func (j *JSONHandler) Handle(ctx context.Context, record slog.Record) error {
|
func (j *JSONHandler) Handle(ctx context.Context, record slog.Record) error {
|
||||||
jsonData, _ := json.Marshal(record)
|
jsonData, _ := json.Marshal(record)
|
||||||
log.Println(string(jsonData))
|
fmt.Fprintln(os.Stdout, string(jsonData))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
33
json_handler_test.go
Normal file
33
json_handler_test.go
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package simplelog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log/slog"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestJSONHandlerDeadlock verifies that JSONHandler.Handle does not deadlock
|
||||||
|
// when the default slog handler routes log.Println back through slog.
|
||||||
|
// On the unfixed code this test will hang (deadlock); with the fix it completes.
|
||||||
|
func TestJSONHandlerDeadlock(t *testing.T) {
|
||||||
|
handler := NewJSONHandler()
|
||||||
|
|
||||||
|
// Set our handler as the default so log.Println routes through slog
|
||||||
|
logger := slog.New(handler)
|
||||||
|
slog.SetDefault(logger)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
// This call deadlocks on unfixed code because Handle() calls
|
||||||
|
// log.Println() which re-enters slog → Handle() → log.Println() …
|
||||||
|
slog.Info("test message")
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
// success
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("JSONHandler.Handle deadlocked: timed out after 5 seconds")
|
||||||
|
}
|
||||||
|
}
|
||||||
525
relp_handler.go
525
relp_handler.go
@@ -4,271 +4,324 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"log"
|
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net"
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
|
||||||
"runtime"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func getEnvAsInt(name string, defaultVal int) int {
|
const (
|
||||||
valStr := os.Getenv(name)
|
relpVersion = "1"
|
||||||
if val, err := strconv.Atoi(valStr); err == nil {
|
relpSoftware = "simplelog,1.0.0,https://sneak.berlin/go/simplelog"
|
||||||
return val
|
relpDefaultTimeout = 5 * time.Second
|
||||||
}
|
relpMaxTxnr = 999_999_999
|
||||||
return defaultVal
|
|
||||||
}
|
|
||||||
|
|
||||||
func getEnvAsDuration(name string, defaultVal time.Duration) time.Duration {
|
|
||||||
valStr := os.Getenv(name)
|
|
||||||
if val, err := time.ParseDuration(valStr); err == nil {
|
|
||||||
return val
|
|
||||||
}
|
|
||||||
return defaultVal
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
cacheDir, _ = os.UserCacheDir()
|
|
||||||
diskBufferLimit = getEnvAsInt("LOGGER_DISK_BUFFER_LIMIT", 100)
|
|
||||||
diskWriteInterval = getEnvAsDuration("LOGGER_DISK_WRITE_INTERVAL", time.Second)
|
|
||||||
relpDebug = os.Getenv("RELP_DEBUG") != ""
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// RELPHandler delivers log messages to a remote rsyslog server
|
||||||
|
// using the RELP (Reliable Event Logging Protocol).
|
||||||
type RELPHandler struct {
|
type RELPHandler struct {
|
||||||
relpServerURL string
|
mu sync.Mutex
|
||||||
relpHost string
|
address string
|
||||||
relpPort string
|
conn net.Conn
|
||||||
conn net.Conn
|
txnr int
|
||||||
ch chan Event
|
|
||||||
done chan struct{}
|
|
||||||
failedCh chan Event
|
|
||||||
timer *time.Timer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRELPHandler(relpURL string) (ExtendedHandler, error) {
|
// NewRELPHandler creates a handler that sends logs via RELP.
|
||||||
parsedURL, err := url.Parse(relpURL)
|
// The relpURL should be in the form "tcp://host:port".
|
||||||
|
func NewRELPHandler(relpURL string) (*RELPHandler, error) {
|
||||||
|
u, err := url.Parse(relpURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error parsing RELP URL: %v", err)
|
return nil, fmt.Errorf("invalid RELP URL: %w", err)
|
||||||
}
|
}
|
||||||
if parsedURL.Scheme != "tcp" {
|
scheme := u.Scheme
|
||||||
return nil, fmt.Errorf("the RELP URL must have the tcp scheme, got %s", parsedURL.Scheme)
|
if scheme == "" {
|
||||||
|
scheme = "tcp"
|
||||||
|
}
|
||||||
|
if scheme != "tcp" {
|
||||||
|
return nil, fmt.Errorf(
|
||||||
|
"unsupported RELP scheme %q, only tcp is supported",
|
||||||
|
scheme,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
host := u.Host
|
||||||
|
if host == "" {
|
||||||
|
return nil, fmt.Errorf("RELP URL must include a host")
|
||||||
|
}
|
||||||
|
if _, _, err := net.SplitHostPort(host); err != nil {
|
||||||
|
host = net.JoinHostPort(host, "2514")
|
||||||
}
|
}
|
||||||
|
|
||||||
host, port, err := net.SplitHostPort(parsedURL.Host)
|
h := &RELPHandler{
|
||||||
if err != nil {
|
address: host,
|
||||||
return nil, fmt.Errorf("Error splitting host and port: %v", err)
|
|
||||||
}
|
}
|
||||||
|
return h, nil
|
||||||
if err := os.MkdirAll(filepath.Join(cacheDir, "simplelog"), 0755); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create cache directory: %v", err)
|
|
||||||
}
|
|
||||||
r := &RELPHandler{
|
|
||||||
ch: make(chan Event, diskBufferLimit),
|
|
||||||
done: make(chan struct{}),
|
|
||||||
failedCh: make(chan Event, diskBufferLimit),
|
|
||||||
timer: time.NewTimer(diskWriteInterval),
|
|
||||||
relpHost: host,
|
|
||||||
relpPort: port,
|
|
||||||
}
|
|
||||||
if relpDebug {
|
|
||||||
log.Printf("Created new RELP handler for server at %s", r.relpServerURL)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = r.Startup()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return r, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RELPHandler) connectToRELPServer() (net.Conn, error) {
|
func (h *RELPHandler) Enabled(
|
||||||
conn, err := net.Dial("tcp", net.JoinHostPort(r.relpHost, r.relpPort))
|
_ context.Context,
|
||||||
if err != nil {
|
_ slog.Level,
|
||||||
if relpDebug {
|
) bool {
|
||||||
log.Printf("Failed to connect to RELP server at %s: %v", net.JoinHostPort(r.relpHost, r.relpPort), err)
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if relpDebug {
|
|
||||||
log.Printf("Successfully connected to RELP server at %s", net.JoinHostPort(r.relpHost, r.relpPort))
|
|
||||||
}
|
|
||||||
return conn, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RELPHandler) Startup() error {
|
|
||||||
var err error
|
|
||||||
r.conn, err = r.connectToRELPServer()
|
|
||||||
if err != nil {
|
|
||||||
if relpDebug {
|
|
||||||
log.Printf("Failed to establish TCP connection to RELP server: %v", err)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("Failed to establish TCP connection to RELP server: %v", err)
|
|
||||||
}
|
|
||||||
if relpDebug {
|
|
||||||
log.Printf("Successfully connected to RELP server at %s", r.relpServerURL)
|
|
||||||
}
|
|
||||||
go r.receiveEventsFromChannel()
|
|
||||||
go r.processFailedEvents()
|
|
||||||
go r.watchDirectoryForFailedEventFiles()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RELPHandler) Enabled(ctx context.Context, level slog.Level) bool {
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RELPHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
|
func (h *RELPHandler) WithAttrs(_ []slog.Attr) slog.Handler {
|
||||||
return r
|
return h
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RELPHandler) WithGroup(name string) slog.Handler {
|
func (h *RELPHandler) WithGroup(_ string) slog.Handler {
|
||||||
return r
|
return h
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RELPHandler) Handle(ctx context.Context, record slog.Record) error {
|
func (h *RELPHandler) Handle(
|
||||||
attrs := make(map[string]interface{})
|
_ context.Context,
|
||||||
record.Attrs(func(attr slog.Attr) bool {
|
record slog.Record,
|
||||||
attrs[attr.Key] = attr.Value
|
) error {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
|
||||||
|
if err := h.ensureConnected(); err != nil {
|
||||||
|
return fmt.Errorf("relp connect: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := h.formatSyslog(record)
|
||||||
|
if err := h.sendSyslog(msg); err != nil {
|
||||||
|
// Connection may be broken; close and let next call
|
||||||
|
// reconnect.
|
||||||
|
h.closeConn()
|
||||||
|
return fmt.Errorf("relp syslog: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensureConnected dials and performs the RELP open handshake if
|
||||||
|
// no connection exists.
|
||||||
|
func (h *RELPHandler) ensureConnected() error {
|
||||||
|
if h.conn != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := net.DialTimeout("tcp", h.address, relpDefaultTimeout)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
h.conn = conn
|
||||||
|
h.txnr = 0
|
||||||
|
|
||||||
|
return h.open()
|
||||||
|
}
|
||||||
|
|
||||||
|
// open sends the RELP "open" command and reads the server's
|
||||||
|
// response.
|
||||||
|
func (h *RELPHandler) open() error {
|
||||||
|
offers := fmt.Sprintf(
|
||||||
|
"relp_version=%s\nrelp_software=%s\ncommands=syslog",
|
||||||
|
relpVersion,
|
||||||
|
relpSoftware,
|
||||||
|
)
|
||||||
|
if err := h.sendFrame("open", []byte(offers)); err != nil {
|
||||||
|
return fmt.Errorf("open send: %w", err)
|
||||||
|
}
|
||||||
|
_, _, err := h.readResponse()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open rsp: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendSyslog sends a single syslog message and waits for the
|
||||||
|
// acknowledgement.
|
||||||
|
func (h *RELPHandler) sendSyslog(msg []byte) error {
|
||||||
|
if err := h.sendFrame("syslog", msg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
code, _, err := h.readResponse()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if code != 200 {
|
||||||
|
return fmt.Errorf("server returned status %d", code)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close gracefully shuts down the RELP session.
|
||||||
|
func (h *RELPHandler) Close() error {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
|
||||||
|
if h.conn == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Best-effort close command.
|
||||||
|
_ = h.sendFrame("close", nil)
|
||||||
|
_, _, _ = h.readResponse()
|
||||||
|
return h.closeConn()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *RELPHandler) closeConn() error {
|
||||||
|
if h.conn == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err := h.conn.Close()
|
||||||
|
h.conn = nil
|
||||||
|
h.txnr = 0
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// nextTxnr returns the next transaction number.
|
||||||
|
func (h *RELPHandler) nextTxnr() int {
|
||||||
|
h.txnr++
|
||||||
|
if h.txnr > relpMaxTxnr {
|
||||||
|
h.txnr = 1
|
||||||
|
}
|
||||||
|
return h.txnr
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendFrame writes a RELP frame to the connection.
|
||||||
|
// Frame format: TXNR SP COMMAND SP DATALEN [SP DATA] LF
|
||||||
|
func (h *RELPHandler) sendFrame(
|
||||||
|
command string,
|
||||||
|
data []byte,
|
||||||
|
) error {
|
||||||
|
txnr := h.nextTxnr()
|
||||||
|
dataLen := len(data)
|
||||||
|
|
||||||
|
var frame []byte
|
||||||
|
header := fmt.Sprintf("%d %s %d", txnr, command, dataLen)
|
||||||
|
if dataLen > 0 {
|
||||||
|
frame = make([]byte, 0, len(header)+1+dataLen+1)
|
||||||
|
frame = append(frame, header...)
|
||||||
|
frame = append(frame, ' ')
|
||||||
|
frame = append(frame, data...)
|
||||||
|
} else {
|
||||||
|
frame = make([]byte, 0, len(header)+1)
|
||||||
|
frame = append(frame, header...)
|
||||||
|
}
|
||||||
|
frame = append(frame, '\n')
|
||||||
|
|
||||||
|
_ = h.conn.SetWriteDeadline(
|
||||||
|
time.Now().Add(relpDefaultTimeout),
|
||||||
|
)
|
||||||
|
_, err := h.conn.Write(frame)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// readResponse reads a RELP response frame from the connection.
|
||||||
|
// Returns the status code and any extra data.
|
||||||
|
func (h *RELPHandler) readResponse() (int, string, error) {
|
||||||
|
_ = h.conn.SetReadDeadline(
|
||||||
|
time.Now().Add(relpDefaultTimeout),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Read until we hit LF. RELP frames are terminated by LF.
|
||||||
|
buf := make([]byte, 0, 1024)
|
||||||
|
one := make([]byte, 1)
|
||||||
|
for {
|
||||||
|
n, err := h.conn.Read(one)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", fmt.Errorf("read: %w", err)
|
||||||
|
}
|
||||||
|
if n == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if one[0] == '\n' {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
buf = append(buf, one[0])
|
||||||
|
if len(buf) > 128*1024 {
|
||||||
|
return 0, "", fmt.Errorf("response too large")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse: TXNR SP "rsp" SP DATALEN [SP DATA]
|
||||||
|
frame := string(buf)
|
||||||
|
|
||||||
|
// Skip TXNR
|
||||||
|
parts := strings.SplitN(frame, " ", 4)
|
||||||
|
if len(parts) < 3 {
|
||||||
|
return 0, "", fmt.Errorf("malformed rsp frame: %q", frame)
|
||||||
|
}
|
||||||
|
|
||||||
|
dataLen, err := strconv.Atoi(parts[2])
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", fmt.Errorf("bad datalen: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var rspData string
|
||||||
|
if dataLen > 0 && len(parts) >= 4 {
|
||||||
|
rspData = parts[3]
|
||||||
|
}
|
||||||
|
|
||||||
|
// rspData format: STATUS SP HUMANTEXT [LF EXTRA]
|
||||||
|
if rspData == "" {
|
||||||
|
return 200, "", nil
|
||||||
|
}
|
||||||
|
statusStr := rspData
|
||||||
|
rest := ""
|
||||||
|
if idx := strings.IndexByte(rspData, ' '); idx >= 0 {
|
||||||
|
statusStr = rspData[:idx]
|
||||||
|
rest = rspData[idx+1:]
|
||||||
|
}
|
||||||
|
code, err := strconv.Atoi(statusStr)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", fmt.Errorf("bad status code %q: %w", statusStr, err)
|
||||||
|
}
|
||||||
|
return code, rest, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// formatSyslog formats a slog.Record as an RFC 5424 syslog
|
||||||
|
// message.
|
||||||
|
func (h *RELPHandler) formatSyslog(record slog.Record) []byte {
|
||||||
|
// Map slog levels to syslog severity.
|
||||||
|
var severity int
|
||||||
|
switch {
|
||||||
|
case record.Level >= slog.LevelError:
|
||||||
|
severity = 3 // err
|
||||||
|
case record.Level >= slog.LevelWarn:
|
||||||
|
severity = 4 // warning
|
||||||
|
case record.Level >= slog.LevelInfo:
|
||||||
|
severity = 6 // info
|
||||||
|
default:
|
||||||
|
severity = 7 // debug
|
||||||
|
}
|
||||||
|
|
||||||
|
// Facility 1 = user-level
|
||||||
|
priority := 1*8 + severity
|
||||||
|
|
||||||
|
hostname, _ := os.Hostname()
|
||||||
|
if hostname == "" {
|
||||||
|
hostname = "-"
|
||||||
|
}
|
||||||
|
|
||||||
|
ts := record.Time.UTC().Format(time.RFC3339Nano)
|
||||||
|
|
||||||
|
// Collect structured data from attributes.
|
||||||
|
attrs := make(map[string]string)
|
||||||
|
record.Attrs(func(a slog.Attr) bool {
|
||||||
|
attrs[a.Key] = a.Value.String()
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
jsonData, err := json.Marshal(attrs)
|
|
||||||
if err != nil {
|
var sd string
|
||||||
return fmt.Errorf("error marshaling attributes: %v", err)
|
if len(attrs) > 0 {
|
||||||
}
|
jsonBytes, _ := json.Marshal(attrs)
|
||||||
// Get the caller information
|
sd = string(jsonBytes)
|
||||||
_, file, line, ok := runtime.Caller(5)
|
} else {
|
||||||
if !ok {
|
sd = "-"
|
||||||
file = "???"
|
|
||||||
line = 0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
event := NewExtendedEvent(record.Level.String(), record.Message, jsonData, file, line)
|
msg := fmt.Sprintf(
|
||||||
for _, handler := range cl.handlers {
|
"<%d>1 %s %s simplelog - - %s %s",
|
||||||
if err := handler.Handle(ctx, event); err != nil {
|
priority,
|
||||||
return err
|
ts,
|
||||||
}
|
hostname,
|
||||||
}
|
sd,
|
||||||
return nil
|
record.Message,
|
||||||
}
|
)
|
||||||
|
return []byte(msg)
|
||||||
func (r *RELPHandler) receiveEventsFromChannel() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event := <-r.ch:
|
|
||||||
if err := r.sendEventToRELPServer(event); err != nil {
|
|
||||||
r.failedCh <- event
|
|
||||||
}
|
|
||||||
case <-r.done:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RELPHandler) processFailedEvents() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event := <-r.failedCh:
|
|
||||||
if err := r.sendEventToRELPServer(event); err != nil {
|
|
||||||
// If still failing, write to disk
|
|
||||||
r.writeFailedToDisk(event)
|
|
||||||
}
|
|
||||||
case <-r.timer.C:
|
|
||||||
// Flush all events in failedCh to disk
|
|
||||||
for len(r.failedCh) > 0 {
|
|
||||||
event := <-r.failedCh
|
|
||||||
r.writeFailedToDisk(event)
|
|
||||||
}
|
|
||||||
case <-r.done:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RELPHandler) sendEventToRELPServer(event Event) error {
|
|
||||||
jsonData, err := json.Marshal(event)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error marshaling event: %v", err)
|
|
||||||
}
|
|
||||||
_, err = r.conn.Write(jsonData)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Implement TCP read for the acknowledgment with a timeout
|
|
||||||
r.conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
|
||||||
ack := make([]byte, 256)
|
|
||||||
n, err := r.conn.Read(ack)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if string(ack[:n]) != "ACK" {
|
|
||||||
return fmt.Errorf("expected ACK from server, got %s", string(ack[:n]))
|
|
||||||
}
|
|
||||||
if relpDebug {
|
|
||||||
log.Printf("Received ACK from RELP server for event %s", event.ID)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RELPHandler) watchDirectoryForFailedEventFiles() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-r.done:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
if r.isConnected() {
|
|
||||||
if relpDebug {
|
|
||||||
log.Printf("Reconnected to RELP server at %s", r.relpServerURL)
|
|
||||||
}
|
|
||||||
files, err := ioutil.ReadDir(cacheDir)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Error reading cache directory: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, file := range files {
|
|
||||||
filePath := filepath.Join(cacheDir, file.Name())
|
|
||||||
data, err := ioutil.ReadFile(filePath)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Error reading event cache file %s: %v", file.Name(), err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var event Event
|
|
||||||
if err := json.Unmarshal(data, &event); err != nil {
|
|
||||||
log.Printf("Error unmarshalling event from file %s: %v", file.Name(), err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
r.ch <- event
|
|
||||||
os.Remove(filePath) // Remove file after processing
|
|
||||||
}
|
|
||||||
}
|
|
||||||
time.Sleep(10 * time.Second) // Check disk every 10 seconds
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RELPHandler) isConnected() bool {
|
|
||||||
_, err := r.conn.Write([]byte{})
|
|
||||||
return err == nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RELPHandler) Shutdown() error {
|
|
||||||
close(r.done)
|
|
||||||
if r.conn != nil {
|
|
||||||
if err := r.conn.Close(); err != nil {
|
|
||||||
return fmt.Errorf("error closing TCP connection: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RELPHandler) writeFailedToDisk(event Event) {
|
|
||||||
fileName := filepath.Join(cacheDir, "simplelog", uuid.New().String()+".logevent")
|
|
||||||
data, _ := json.Marshal(event)
|
|
||||||
ioutil.WriteFile(fileName, data, 0600)
|
|
||||||
}
|
}
|
||||||
|
|||||||
34
simplelog.go
34
simplelog.go
@@ -2,17 +2,20 @@ package simplelog
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"log"
|
"log"
|
||||||
"runtime"
|
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
|
||||||
"github.com/mattn/go-isatty"
|
"github.com/mattn/go-isatty"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
relpServerURL = os.Getenv("LOGGER_RELP_URL")
|
webhookURL = os.Getenv("LOGGER_WEBHOOK_URL")
|
||||||
webhookURL = os.Getenv("LOGGER_WEBHOOK_URL")
|
relpURL = os.Getenv("LOGGER_RELP_URL")
|
||||||
)
|
)
|
||||||
|
|
||||||
var ourCustomLogger *slog.Logger
|
var ourCustomLogger *slog.Logger
|
||||||
@@ -35,13 +38,6 @@ func NewMultiplexHandler() slog.Handler {
|
|||||||
} else {
|
} else {
|
||||||
cl.handlers = append(cl.handlers, NewJSONHandler())
|
cl.handlers = append(cl.handlers, NewJSONHandler())
|
||||||
}
|
}
|
||||||
if relpServerURL != "" {
|
|
||||||
handler, err := NewRELPHandler(relpServerURL)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to initialize RELP handler: %v", err)
|
|
||||||
}
|
|
||||||
cl.handlers = append(cl.handlers, handler)
|
|
||||||
}
|
|
||||||
if webhookURL != "" {
|
if webhookURL != "" {
|
||||||
handler, err := NewWebhookHandler(webhookURL)
|
handler, err := NewWebhookHandler(webhookURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -49,10 +45,20 @@ func NewMultiplexHandler() slog.Handler {
|
|||||||
}
|
}
|
||||||
cl.handlers = append(cl.handlers, handler)
|
cl.handlers = append(cl.handlers, handler)
|
||||||
}
|
}
|
||||||
|
if relpURL != "" {
|
||||||
|
handler, err := NewRELPHandler(relpURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to initialize RELP handler: %v", err)
|
||||||
|
}
|
||||||
|
cl.handlers = append(cl.handlers, handler)
|
||||||
|
}
|
||||||
return cl
|
return cl
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cl *MultiplexHandler) Handle(ctx context.Context, record slog.Record) error {
|
func (cl *MultiplexHandler) Handle(
|
||||||
|
ctx context.Context,
|
||||||
|
record slog.Record,
|
||||||
|
) error {
|
||||||
for _, handler := range cl.handlers {
|
for _, handler := range cl.handlers {
|
||||||
if err := handler.Handle(ctx, record); err != nil {
|
if err := handler.Handle(ctx, record); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -61,7 +67,10 @@ func (cl *MultiplexHandler) Handle(ctx context.Context, record slog.Record) erro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cl *MultiplexHandler) Enabled(ctx context.Context, level slog.Level) bool {
|
func (cl *MultiplexHandler) Enabled(
|
||||||
|
ctx context.Context,
|
||||||
|
level slog.Level,
|
||||||
|
) bool {
|
||||||
// send us all events
|
// send us all events
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@@ -81,6 +90,7 @@ func (cl *MultiplexHandler) WithGroup(name string) slog.Handler {
|
|||||||
}
|
}
|
||||||
return &MultiplexHandler{handlers: newHandlers}
|
return &MultiplexHandler{handlers: newHandlers}
|
||||||
}
|
}
|
||||||
|
|
||||||
type ExtendedEvent interface {
|
type ExtendedEvent interface {
|
||||||
GetID() uuid.UUID
|
GetID() uuid.UUID
|
||||||
GetTimestamp() time.Time
|
GetTimestamp() time.Time
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
|
||||||
_ "git.eeqj.de/sneak/go-simplelog" // Using underscore to only invoke init()
|
_ "sneak.berlin/go/simplelog" // Using underscore to only invoke init()
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|||||||
Reference in New Issue
Block a user