|
|
|
|
@@ -10,8 +10,8 @@ import (
|
|
|
|
|
"net"
|
|
|
|
|
"net/url"
|
|
|
|
|
"os"
|
|
|
|
|
"os"
|
|
|
|
|
"path/filepath"
|
|
|
|
|
"runtime"
|
|
|
|
|
"strconv"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
@@ -43,6 +43,8 @@ var (
|
|
|
|
|
|
|
|
|
|
type RELPHandler struct {
|
|
|
|
|
relpServerURL string
|
|
|
|
|
relpHost string
|
|
|
|
|
relpPort string
|
|
|
|
|
conn net.Conn
|
|
|
|
|
ch chan Event
|
|
|
|
|
done chan struct{}
|
|
|
|
|
@@ -50,51 +52,52 @@ type RELPHandler struct {
|
|
|
|
|
timer *time.Timer
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewRELPHandler(relpURL string) (*RELPHandler, error) {
|
|
|
|
|
func NewRELPHandler(relpURL string) (ExtendedHandler, error) {
|
|
|
|
|
parsedURL, err := url.Parse(relpURL)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Error parsing RELP URL: %v", err)
|
|
|
|
|
return nil, fmt.Errorf("error parsing RELP URL: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if parsedURL.Scheme != "tcp" {
|
|
|
|
|
return nil, fmt.Errorf("RELP URL must have the tcp scheme, got %s", parsedURL.Scheme)
|
|
|
|
|
return nil, fmt.Errorf("the RELP URL must have the tcp scheme, got %s", parsedURL.Scheme)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
host, port, err := net.SplitHostPort(parsedURL.Host)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Error splitting host and port: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := os.MkdirAll(filepath.Join(cacheDir, "simplelog"), 0755); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Failed to create cache directory: %v", err)
|
|
|
|
|
return nil, fmt.Errorf("failed to create cache directory: %v", err)
|
|
|
|
|
}
|
|
|
|
|
r := &RELPHandler{
|
|
|
|
|
relpServerURL: parsedURL.Host,
|
|
|
|
|
ch: make(chan Event, diskBufferLimit),
|
|
|
|
|
done: make(chan struct{}),
|
|
|
|
|
failedCh: make(chan Event, diskBufferLimit),
|
|
|
|
|
timer: time.NewTimer(diskWriteInterval),
|
|
|
|
|
ch: make(chan Event, diskBufferLimit),
|
|
|
|
|
done: make(chan struct{}),
|
|
|
|
|
failedCh: make(chan Event, diskBufferLimit),
|
|
|
|
|
timer: time.NewTimer(diskWriteInterval),
|
|
|
|
|
relpHost: host,
|
|
|
|
|
relpPort: port,
|
|
|
|
|
}
|
|
|
|
|
if relpDebug {
|
|
|
|
|
log.Printf("Created new RELP handler for server at %s", r.relpServerURL)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = r.Startup()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return r, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *RELPHandler) connectToRELPServer() (net.Conn, error) {
|
|
|
|
|
parsedURL, err := url.Parse(r.relpServerURL)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Error parsing RELP URL: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if parsedURL.Scheme != "tcp" {
|
|
|
|
|
return nil, fmt.Errorf("RELP URL must have the tcp scheme, got %s", parsedURL.Scheme)
|
|
|
|
|
}
|
|
|
|
|
host, port, err := net.SplitHostPort(parsedURL.Host)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("Error splitting host and port: %v", err)
|
|
|
|
|
}
|
|
|
|
|
conn, err := net.Dial("tcp", net.JoinHostPort(host, port))
|
|
|
|
|
conn, err := net.Dial("tcp", net.JoinHostPort(r.relpHost, r.relpPort))
|
|
|
|
|
if err != nil {
|
|
|
|
|
if relpDebug {
|
|
|
|
|
log.Printf("Failed to connect to RELP server at %s: %v", net.JoinHostPort(host, port), err)
|
|
|
|
|
log.Printf("Failed to connect to RELP server at %s: %v", net.JoinHostPort(r.relpHost, r.relpPort), err)
|
|
|
|
|
}
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if relpDebug {
|
|
|
|
|
log.Printf("Successfully connected to RELP server at %s", net.JoinHostPort(host, port))
|
|
|
|
|
log.Printf("Successfully connected to RELP server at %s", net.JoinHostPort(r.relpHost, r.relpPort))
|
|
|
|
|
}
|
|
|
|
|
return conn, nil
|
|
|
|
|
}
|
|
|
|
|
@@ -139,13 +142,20 @@ func (r *RELPHandler) Handle(ctx context.Context, record slog.Record) error {
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("error marshaling attributes: %v", err)
|
|
|
|
|
}
|
|
|
|
|
event := NewEvent(record.Level.String(), record.Message, jsonData)
|
|
|
|
|
select {
|
|
|
|
|
case r.ch <- event:
|
|
|
|
|
return nil // Successfully sent event to channel
|
|
|
|
|
default:
|
|
|
|
|
return fmt.Errorf("failed to log event: channel is full")
|
|
|
|
|
// Get the caller information
|
|
|
|
|
_, file, line, ok := runtime.Caller(5)
|
|
|
|
|
if !ok {
|
|
|
|
|
file = "???"
|
|
|
|
|
line = 0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
event := NewExtendedEvent(record.Level.String(), record.Message, jsonData, file, line)
|
|
|
|
|
for _, handler := range cl.handlers {
|
|
|
|
|
if err := handler.Handle(ctx, event); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *RELPHandler) receiveEventsFromChannel() {
|
|
|
|
|
|