initial
This commit is contained in:
169
relp_handler.go
Normal file
169
relp_handler.go
Normal file
@@ -0,0 +1,169 @@
|
||||
package simplelog
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
cacheDir = "/var/cache/relploggercache"
|
||||
diskBufferLimit = 100 // Write to disk after accumulating 100 failed events
|
||||
diskWriteInterval = time.Second // Or write every second, whichever comes first
|
||||
)
|
||||
|
||||
type RELPHandler struct {
|
||||
relpServerURL string
|
||||
conn net.Conn
|
||||
ch chan Event
|
||||
done chan struct{}
|
||||
failedCh chan Event
|
||||
timer *time.Timer
|
||||
}
|
||||
|
||||
func NewRELPHandler(relpURL string) (*RELPHandler, error) {
|
||||
parsedURL, err := url.Parse(relpURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error parsing RELP URL: %v", err)
|
||||
}
|
||||
if parsedURL.Scheme != "tcp" {
|
||||
return nil, fmt.Errorf("RELP URL must have the tcp scheme, got %s", parsedURL.Scheme)
|
||||
}
|
||||
if err := os.MkdirAll(cacheDir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("Failed to create cache directory: %v", err)
|
||||
}
|
||||
return &RELPHandler{
|
||||
relpServerURL: parsedURL.Host,
|
||||
ch: make(chan Event, diskBufferLimit),
|
||||
done: make(chan struct{}),
|
||||
failedCh: make(chan Event, diskBufferLimit),
|
||||
timer: time.NewTimer(diskWriteInterval),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *RELPHandler) Startup() error {
|
||||
var err error
|
||||
r.conn, err = net.Dial("tcp", r.relpServerURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to establish TCP connection: %v", err)
|
||||
}
|
||||
go r.receiveEventsFromChannel()
|
||||
go r.processFailedEvents()
|
||||
go r.watchDirectoryForFailedEventFiles()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RELPHandler) Log(event Event) error {
|
||||
select {
|
||||
case r.ch <- event:
|
||||
return nil // Successfully sent event to channel
|
||||
default:
|
||||
return fmt.Errorf("failed to log event: channel is full")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RELPHandler) receiveEventsFromChannel() {
|
||||
for {
|
||||
select {
|
||||
case event := <-r.ch:
|
||||
if err := r.sendEventToRELPServer(event); err != nil {
|
||||
r.failedCh <- event
|
||||
}
|
||||
case <-r.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RELPHandler) processFailedEvents() {
|
||||
for {
|
||||
select {
|
||||
case event := <-r.failedCh:
|
||||
if err := r.sendEventToRELPServer(event); err != nil {
|
||||
// If still failing, write to disk
|
||||
r.writeFailedToDisk(event)
|
||||
}
|
||||
case <-r.timer.C:
|
||||
// Flush all events in failedCh to disk
|
||||
for len(r.failedCh) > 0 {
|
||||
event := <-r.failedCh
|
||||
r.writeFailedToDisk(event)
|
||||
}
|
||||
case <-r.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RELPHandler) sendEventToRELPServer(event Event) error {
|
||||
jsonData, _ := json.Marshal(event)
|
||||
_, err := r.conn.Write(jsonData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Implement TCP read for the acknowledgment with a timeout
|
||||
r.conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
ack := make([]byte, 256)
|
||||
n, err := r.conn.Read(ack)
|
||||
if err != nil || string(ack[:n]) != "ACK" {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RELPHandler) watchDirectoryForFailedEventFiles() {
|
||||
for {
|
||||
select {
|
||||
case <-r.done:
|
||||
return
|
||||
default:
|
||||
if r.isConnected() {
|
||||
files, err := ioutil.ReadDir(cacheDir)
|
||||
if err != nil {
|
||||
log.Printf("Error reading cache directory: %v", err)
|
||||
continue
|
||||
}
|
||||
for _, file := range files {
|
||||
filePath := filepath.Join(cacheDir, file.Name())
|
||||
data, err := ioutil.ReadFile(filePath)
|
||||
if err != nil {
|
||||
log.Printf("Error reading file %s: %v", file.Name(), err)
|
||||
continue
|
||||
}
|
||||
var event Event
|
||||
if err := json.Unmarshal(data, &event); err != nil {
|
||||
log.Printf("Error unmarshalling event from file %s: %v", file.Name(), err)
|
||||
continue
|
||||
}
|
||||
r.ch <- event
|
||||
os.Remove(filePath) // Remove file after processing
|
||||
}
|
||||
}
|
||||
time.Sleep(10 * time.Second) // Check disk every 10 seconds
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RELPHandler) isConnected() bool {
|
||||
_, err := r.conn.Write([]byte{})
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (r *RELPHandler) Shutdown() error {
|
||||
close(r.done)
|
||||
return r.conn.Close()
|
||||
}
|
||||
|
||||
func (r *RELPHandler) writeFailedToDisk(event Event) {
|
||||
fileName := filepath.Join(cacheDir, uuid.New().String()+".logevent")
|
||||
data, _ := json.Marshal(event)
|
||||
ioutil.WriteFile(fileName, data, 0644)
|
||||
}
|
||||
Reference in New Issue
Block a user