- Create internal/logger package with Logger wrapper around slog - Logger automatically adds source file, line number, and function name to all log entries - Use golang.org/x/term to properly detect if stdout is a terminal - Replace all slog.Logger usage with logger.Logger throughout the codebase - Remove verbose logging from database GetStats() method - Update all constructors and dependencies to use the new logger
156 lines
3.4 KiB
Go
156 lines
3.4 KiB
Go
package routewatch
|
|
|
|
import (
|
|
"git.eeqj.de/sneak/routewatch/internal/database"
|
|
"git.eeqj.de/sneak/routewatch/internal/logger"
|
|
"git.eeqj.de/sneak/routewatch/internal/ristypes"
|
|
)
|
|
|
|
const (
|
|
// databaseHandlerQueueSize is the queue capacity for database operations
|
|
databaseHandlerQueueSize = 200
|
|
)
|
|
|
|
// DatabaseHandler handles BGP messages and stores them in the database
|
|
type DatabaseHandler struct {
|
|
db database.Store
|
|
logger *logger.Logger
|
|
}
|
|
|
|
// NewDatabaseHandler creates a new database handler
|
|
func NewDatabaseHandler(
|
|
db database.Store,
|
|
logger *logger.Logger,
|
|
) *DatabaseHandler {
|
|
return &DatabaseHandler{
|
|
db: db,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// WantsMessage returns true if this handler wants to process messages of the given type
|
|
func (h *DatabaseHandler) WantsMessage(messageType string) bool {
|
|
// We only care about UPDATE messages for the database
|
|
return messageType == "UPDATE"
|
|
}
|
|
|
|
// QueueCapacity returns the desired queue capacity for this handler
|
|
func (h *DatabaseHandler) QueueCapacity() int {
|
|
// Database operations are slow, so use a smaller queue
|
|
return databaseHandlerQueueSize
|
|
}
|
|
|
|
// HandleMessage processes a RIS message and updates the database
|
|
func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
|
|
// Use the pre-parsed timestamp
|
|
timestamp := msg.ParsedTimestamp
|
|
|
|
// Get origin ASN from path (last element)
|
|
var originASN int
|
|
if len(msg.Path) > 0 {
|
|
originASN = msg.Path[len(msg.Path)-1]
|
|
}
|
|
|
|
// Process announcements
|
|
for _, announcement := range msg.Announcements {
|
|
for _, prefix := range announcement.Prefixes {
|
|
// Get or create prefix
|
|
_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
|
|
if err != nil {
|
|
h.logger.Error(
|
|
"Failed to get/create prefix",
|
|
"prefix",
|
|
prefix,
|
|
"error",
|
|
err,
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
// Get or create origin ASN
|
|
_, err = h.db.GetOrCreateASN(originASN, timestamp)
|
|
if err != nil {
|
|
h.logger.Error(
|
|
"Failed to get/create ASN",
|
|
"asn",
|
|
originASN,
|
|
"error",
|
|
err,
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
// TODO: Record the announcement in the announcements table
|
|
// Process AS path to update peerings
|
|
if len(msg.Path) > 1 {
|
|
for i := range len(msg.Path) - 1 {
|
|
fromASN := msg.Path[i]
|
|
toASN := msg.Path[i+1]
|
|
|
|
// Get or create both ASNs
|
|
fromAS, err := h.db.GetOrCreateASN(fromASN, timestamp)
|
|
if err != nil {
|
|
h.logger.Error(
|
|
"Failed to get/create from ASN",
|
|
"asn",
|
|
fromASN,
|
|
"error",
|
|
err,
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
toAS, err := h.db.GetOrCreateASN(toASN, timestamp)
|
|
if err != nil {
|
|
h.logger.Error(
|
|
"Failed to get/create to ASN",
|
|
"asn",
|
|
toASN,
|
|
"error",
|
|
err,
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
// Record the peering
|
|
err = h.db.RecordPeering(
|
|
fromAS.ID.String(),
|
|
toAS.ID.String(),
|
|
timestamp,
|
|
)
|
|
if err != nil {
|
|
h.logger.Error("Failed to record peering",
|
|
"from_asn", fromASN,
|
|
"to_asn", toASN,
|
|
"error", err,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process withdrawals
|
|
for _, prefix := range msg.Withdrawals {
|
|
// Get prefix
|
|
_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
|
|
if err != nil {
|
|
h.logger.Error(
|
|
"Failed to get prefix for withdrawal",
|
|
"prefix",
|
|
prefix,
|
|
"error",
|
|
err,
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
// TODO: Record the withdrawal in the announcements table as a withdrawal
|
|
}
|
|
}
|