- Add debug logging for goroutines and memory usage (enabled via DEBUG=routewatch) - Increase SQLite connection pool from 1 to 10 connections for better concurrency - Optimize SQLite pragmas for balanced performance and safety - Add proper shutdown handling for peering handler - Define constants to avoid magic numbers in code Co-Authored-By: Claude <noreply@anthropic.com>
187 lines
4.7 KiB
Go
187 lines
4.7 KiB
Go
// Package routewatch contains the primary RouteWatch type that represents a running instance
|
|
// of the application and contains pointers to its core dependencies, and is responsible for initialization.
|
|
package routewatch
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.eeqj.de/sneak/routewatch/internal/config"
|
|
"git.eeqj.de/sneak/routewatch/internal/database"
|
|
"git.eeqj.de/sneak/routewatch/internal/logger"
|
|
"git.eeqj.de/sneak/routewatch/internal/metrics"
|
|
"git.eeqj.de/sneak/routewatch/internal/server"
|
|
"git.eeqj.de/sneak/routewatch/internal/streamer"
|
|
|
|
"go.uber.org/fx"
|
|
)
|
|
|
|
// Dependencies contains all dependencies for RouteWatch
|
|
type Dependencies struct {
|
|
fx.In
|
|
|
|
DB database.Store
|
|
Streamer *streamer.Streamer
|
|
Server *server.Server
|
|
Logger *logger.Logger
|
|
Config *config.Config
|
|
}
|
|
|
|
// RouteWatch represents the main application instance
|
|
type RouteWatch struct {
|
|
db database.Store
|
|
streamer *streamer.Streamer
|
|
server *server.Server
|
|
logger *logger.Logger
|
|
maxRuntime time.Duration
|
|
shutdown bool
|
|
mu sync.Mutex
|
|
config *config.Config
|
|
dbHandler *ASHandler
|
|
peerHandler *PeerHandler
|
|
prefixHandler *PrefixHandler
|
|
peeringHandler *PeeringHandler
|
|
}
|
|
|
|
// New creates a new RouteWatch instance
|
|
func New(deps Dependencies) *RouteWatch {
|
|
rw := &RouteWatch{
|
|
db: deps.DB,
|
|
streamer: deps.Streamer,
|
|
server: deps.Server,
|
|
logger: deps.Logger,
|
|
maxRuntime: deps.Config.MaxRuntime,
|
|
config: deps.Config,
|
|
}
|
|
|
|
return rw
|
|
}
|
|
|
|
// Run starts the RouteWatch application
|
|
func (rw *RouteWatch) Run(ctx context.Context) error {
|
|
rw.logger.Info("Starting RouteWatch")
|
|
|
|
// Apply runtime limit if specified
|
|
if rw.maxRuntime > 0 {
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithTimeout(ctx, rw.maxRuntime)
|
|
defer cancel()
|
|
rw.logger.Info("Running with time limit", "max_runtime", rw.maxRuntime)
|
|
}
|
|
|
|
// Register database handler to process BGP UPDATE messages
|
|
if rw.config.EnableBatchedDatabaseWrites {
|
|
rw.logger.Info("Using batched database handlers for improved performance")
|
|
// ASHandler maintains the asns table
|
|
rw.dbHandler = NewASHandler(rw.db, rw.logger)
|
|
rw.streamer.RegisterHandler(rw.dbHandler)
|
|
|
|
// PeerHandler maintains the bgp_peers table
|
|
rw.peerHandler = NewPeerHandler(rw.db, rw.logger)
|
|
rw.streamer.RegisterHandler(rw.peerHandler)
|
|
|
|
// PrefixHandler maintains the prefixes and live_routes tables
|
|
rw.prefixHandler = NewPrefixHandler(rw.db, rw.logger)
|
|
rw.prefixHandler.SetMetricsTracker(rw.streamer.GetMetricsTracker())
|
|
rw.streamer.RegisterHandler(rw.prefixHandler)
|
|
|
|
// PeeringHandler maintains the asn_peerings table
|
|
rw.peeringHandler = NewPeeringHandler(rw.db, rw.logger)
|
|
rw.streamer.RegisterHandler(rw.peeringHandler)
|
|
} else {
|
|
// Non-batched handlers not implemented yet
|
|
rw.logger.Error("Non-batched handlers not implemented")
|
|
|
|
return fmt.Errorf("non-batched handlers not implemented")
|
|
}
|
|
|
|
// No longer need routing table handler - PrefixHandler maintains live_routes table
|
|
|
|
// Start streaming
|
|
if err := rw.streamer.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Start HTTP server
|
|
if err := rw.server.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Wait for context cancellation
|
|
<-ctx.Done()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Shutdown performs graceful shutdown of all services
|
|
func (rw *RouteWatch) Shutdown() {
|
|
rw.mu.Lock()
|
|
if rw.shutdown {
|
|
rw.mu.Unlock()
|
|
|
|
return
|
|
}
|
|
rw.shutdown = true
|
|
rw.mu.Unlock()
|
|
|
|
// Stop batched handlers first to flush remaining batches
|
|
if rw.dbHandler != nil {
|
|
rw.logger.Info("Flushing database handler")
|
|
rw.dbHandler.Stop()
|
|
}
|
|
if rw.peerHandler != nil {
|
|
rw.logger.Info("Flushing peer handler")
|
|
rw.peerHandler.Stop()
|
|
}
|
|
if rw.prefixHandler != nil {
|
|
rw.logger.Info("Flushing prefix handler")
|
|
rw.prefixHandler.Stop()
|
|
}
|
|
if rw.peeringHandler != nil {
|
|
rw.logger.Info("Flushing peering handler")
|
|
rw.peeringHandler.Stop()
|
|
}
|
|
|
|
// Stop services
|
|
rw.streamer.Stop()
|
|
|
|
// Stop HTTP server with a timeout
|
|
const serverStopTimeout = 5 * time.Second
|
|
stopCtx, cancel := context.WithTimeout(context.Background(), serverStopTimeout)
|
|
defer cancel()
|
|
if err := rw.server.Stop(stopCtx); err != nil {
|
|
rw.logger.Error("Failed to stop HTTP server gracefully", "error", err)
|
|
}
|
|
|
|
// Log final metrics
|
|
metrics := rw.streamer.GetMetrics()
|
|
rw.logger.Info("Final metrics",
|
|
"total_messages", metrics.TotalMessages,
|
|
"total_bytes", metrics.TotalBytes,
|
|
"messages_per_sec", metrics.MessagesPerSec,
|
|
"bits_per_sec", metrics.BitsPerSec,
|
|
"duration", time.Since(metrics.ConnectedSince),
|
|
)
|
|
|
|
}
|
|
|
|
// getModule provides all fx dependencies
|
|
func getModule() fx.Option {
|
|
return fx.Options(
|
|
fx.Provide(
|
|
logger.New,
|
|
config.New,
|
|
metrics.New,
|
|
fx.Annotate(
|
|
database.New,
|
|
fx.As(new(database.Store)),
|
|
),
|
|
streamer.New,
|
|
server.New,
|
|
New,
|
|
),
|
|
)
|
|
}
|