Implement routing table snapshotter with automatic loading on startup

- Create snapshotter package with periodic (10 min) and on-demand snapshots
- Add JSON serialization with gzip compression and atomic file writes
- Update routing table to track AddedAt time for each route
- Load snapshots on startup, filtering out stale routes (>30 minutes old)
- Add ROUTEWATCH_DISABLE_SNAPSHOTTER env var for tests
- Use OS-appropriate state directories (macOS: ~/Library/Application Support, Linux: /var/lib or XDG_STATE_HOME)
This commit is contained in:
2025-07-28 00:03:19 +02:00
parent 283f2ddbf2
commit ae2ef2ae0c
6 changed files with 258 additions and 32 deletions

View File

@@ -14,6 +14,7 @@ import (
"git.eeqj.de/sneak/routewatch/internal/metrics"
"git.eeqj.de/sneak/routewatch/internal/routingtable"
"git.eeqj.de/sneak/routewatch/internal/server"
"git.eeqj.de/sneak/routewatch/internal/snapshotter"
"git.eeqj.de/sneak/routewatch/internal/streamer"
"go.uber.org/fx"
@@ -54,13 +55,14 @@ type RouteWatch struct {
routingTable *routingtable.RoutingTable
streamer *streamer.Streamer
server *server.Server
snapshotter *snapshotter.Snapshotter
logger *slog.Logger
maxRuntime time.Duration
}
// New creates a new RouteWatch instance
func New(deps Dependencies) *RouteWatch {
return &RouteWatch{
rw := &RouteWatch{
db: deps.DB,
routingTable: deps.RoutingTable,
streamer: deps.Streamer,
@@ -68,6 +70,19 @@ func New(deps Dependencies) *RouteWatch {
logger: deps.Logger,
maxRuntime: deps.Config.MaxRuntime,
}
// Create snapshotter unless disabled (for tests)
if os.Getenv("ROUTEWATCH_DISABLE_SNAPSHOTTER") != "1" {
snapshotter, err := snapshotter.New(deps.RoutingTable, deps.Logger)
if err != nil {
deps.Logger.Error("Failed to create snapshotter", "error", err)
// Continue without snapshotter
} else {
rw.snapshotter = snapshotter
}
}
return rw
}
// Run starts the RouteWatch application
@@ -97,6 +112,11 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
// Start periodic routing table stats logging
go rw.logRoutingTableStats(ctx)
// Start snapshotter if available
if rw.snapshotter != nil {
rw.snapshotter.Start(ctx)
}
// Start streaming
if err := rw.streamer.Start(); err != nil {
return err
@@ -131,6 +151,13 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
"duration", time.Since(metrics.ConnectedSince),
)
// Take final snapshot before shutdown if snapshotter is available
if rw.snapshotter != nil {
if err := rw.snapshotter.Shutdown(); err != nil {
rw.logger.Error("Failed to shutdown snapshotter", "error", err)
}
}
return nil
}
@@ -199,10 +226,7 @@ func getModule() fx.Option {
),
routingtable.New,
streamer.New,
fx.Annotate(
server.New,
fx.ParamTags(``, ``, ``, ``),
),
server.New,
New,
),
)

View File

@@ -156,6 +156,9 @@ func (m *mockStore) GetStats() (database.Stats, error) {
}
func TestRouteWatchLiveFeed(t *testing.T) {
// Disable snapshotter for tests
t.Setenv("ROUTEWATCH_DISABLE_SNAPSHOTTER", "1")
// Create mock database
mockDB := newMockStore()
defer mockDB.Close()
@@ -169,7 +172,7 @@ func TestRouteWatchLiveFeed(t *testing.T) {
s := streamer.New(logger, metricsTracker)
// Create routing table
rt := routingtable.New()
rt := routingtable.New(logger)
// Create server
srv := server.New(mockDB, rt, s, logger)

View File

@@ -9,7 +9,7 @@ import (
const (
// databaseHandlerQueueSize is the queue capacity for database operations
databaseHandlerQueueSize = 100
databaseHandlerQueueSize = 200
)
// DatabaseHandler handles BGP messages and stores them in the database
@@ -19,7 +19,10 @@ type DatabaseHandler struct {
}
// NewDatabaseHandler creates a new database handler
func NewDatabaseHandler(db database.Store, logger *slog.Logger) *DatabaseHandler {
func NewDatabaseHandler(
db database.Store,
logger *slog.Logger,
) *DatabaseHandler {
return &DatabaseHandler{
db: db,
logger: logger,
@@ -55,7 +58,13 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
// Get or create prefix
_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
if err != nil {
h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
h.logger.Error(
"Failed to get/create prefix",
"prefix",
prefix,
"error",
err,
)
continue
}
@@ -63,7 +72,13 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
// Get or create origin ASN
_, err = h.db.GetOrCreateASN(originASN, timestamp)
if err != nil {
h.logger.Error("Failed to get/create ASN", "asn", originASN, "error", err)
h.logger.Error(
"Failed to get/create ASN",
"asn",
originASN,
"error",
err,
)
continue
}
@@ -78,20 +93,36 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
// Get or create both ASNs
fromAS, err := h.db.GetOrCreateASN(fromASN, timestamp)
if err != nil {
h.logger.Error("Failed to get/create from ASN", "asn", fromASN, "error", err)
h.logger.Error(
"Failed to get/create from ASN",
"asn",
fromASN,
"error",
err,
)
continue
}
toAS, err := h.db.GetOrCreateASN(toASN, timestamp)
if err != nil {
h.logger.Error("Failed to get/create to ASN", "asn", toASN, "error", err)
h.logger.Error(
"Failed to get/create to ASN",
"asn",
toASN,
"error",
err,
)
continue
}
// Record the peering
err = h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), timestamp)
err = h.db.RecordPeering(
fromAS.ID.String(),
toAS.ID.String(),
timestamp,
)
if err != nil {
h.logger.Error("Failed to record peering",
"from_asn", fromASN,
@@ -109,7 +140,13 @@ func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
// Get prefix
_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
if err != nil {
h.logger.Error("Failed to get prefix for withdrawal", "prefix", prefix, "error", err)
h.logger.Error(
"Failed to get prefix for withdrawal",
"prefix",
prefix,
"error",
err,
)
continue
}