Compare commits
	
		
			No commits in common. "eda90d96a9344988fc276ab5559793ad51faf920" and "3aef3f9a07cdb994907f8e459579b7307a8d0653" have entirely different histories.
		
	
	
		
			eda90d96a9
			...
			3aef3f9a07
		
	
		
@ -15,9 +15,6 @@ const (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// dirPermissions for creating directories
 | 
						// dirPermissions for creating directories
 | 
				
			||||||
	dirPermissions = 0750 // rwxr-x---
 | 
						dirPermissions = 0750 // rwxr-x---
 | 
				
			||||||
 | 
					 | 
				
			||||||
	// defaultRouteExpirationMinutes is the default route expiration timeout in minutes
 | 
					 | 
				
			||||||
	defaultRouteExpirationMinutes = 5
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Config holds configuration for the entire application
 | 
					// Config holds configuration for the entire application
 | 
				
			||||||
@ -30,10 +27,6 @@ type Config struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// EnableBatchedDatabaseWrites enables batched database operations for better performance
 | 
						// EnableBatchedDatabaseWrites enables batched database operations for better performance
 | 
				
			||||||
	EnableBatchedDatabaseWrites bool
 | 
						EnableBatchedDatabaseWrites bool
 | 
				
			||||||
 | 
					 | 
				
			||||||
	// RouteExpirationTimeout is how long a route can go without being refreshed before expiring
 | 
					 | 
				
			||||||
	// Default is 2 hours which is conservative for BGP (typical BGP hold time is 90-180 seconds)
 | 
					 | 
				
			||||||
	RouteExpirationTimeout time.Duration
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// New creates a new Config with default paths based on the OS
 | 
					// New creates a new Config with default paths based on the OS
 | 
				
			||||||
@ -45,9 +38,8 @@ func New() (*Config, error) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	return &Config{
 | 
						return &Config{
 | 
				
			||||||
		StateDir:                    stateDir,
 | 
							StateDir:                    stateDir,
 | 
				
			||||||
		MaxRuntime:                  0,                                           // Run forever by default
 | 
							MaxRuntime:                  0,    // Run forever by default
 | 
				
			||||||
		EnableBatchedDatabaseWrites: true,                                        // Enable batching by default
 | 
							EnableBatchedDatabaseWrites: true, // Enable batching by default for better performance
 | 
				
			||||||
		RouteExpirationTimeout:      defaultRouteExpirationMinutes * time.Minute, // For active route monitoring
 | 
					 | 
				
			||||||
	}, nil
 | 
						}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -4,7 +4,6 @@ package database
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"database/sql"
 | 
						"database/sql"
 | 
				
			||||||
	_ "embed"
 | 
						_ "embed"
 | 
				
			||||||
	"encoding/json"
 | 
					 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"os"
 | 
						"os"
 | 
				
			||||||
	"path/filepath"
 | 
						"path/filepath"
 | 
				
			||||||
@ -388,120 +387,5 @@ func (d *Database) GetStats() (Stats, error) {
 | 
				
			|||||||
		stats.FileSizeBytes = fileInfo.Size()
 | 
							stats.FileSizeBytes = fileInfo.Size()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Get live routes count
 | 
					 | 
				
			||||||
	err = d.db.QueryRow("SELECT COUNT(*) FROM live_routes").Scan(&stats.LiveRoutes)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return stats, fmt.Errorf("failed to count live routes: %w", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Get prefix distribution
 | 
					 | 
				
			||||||
	stats.IPv4PrefixDistribution, stats.IPv6PrefixDistribution, err = d.GetPrefixDistribution()
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		// Log but don't fail
 | 
					 | 
				
			||||||
		d.logger.Warn("Failed to get prefix distribution", "error", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return stats, nil
 | 
						return stats, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
// UpsertLiveRoute inserts or updates a live route
 | 
					 | 
				
			||||||
func (d *Database) UpsertLiveRoute(route *LiveRoute) error {
 | 
					 | 
				
			||||||
	query := `
 | 
					 | 
				
			||||||
		INSERT INTO live_routes (id, prefix, mask_length, ip_version, origin_asn, peer_ip, as_path, next_hop, last_updated)
 | 
					 | 
				
			||||||
		VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
 | 
					 | 
				
			||||||
		ON CONFLICT(prefix, origin_asn, peer_ip) DO UPDATE SET
 | 
					 | 
				
			||||||
			mask_length = excluded.mask_length,
 | 
					 | 
				
			||||||
			ip_version = excluded.ip_version,
 | 
					 | 
				
			||||||
			as_path = excluded.as_path,
 | 
					 | 
				
			||||||
			next_hop = excluded.next_hop,
 | 
					 | 
				
			||||||
			last_updated = excluded.last_updated
 | 
					 | 
				
			||||||
	`
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Encode AS path as JSON
 | 
					 | 
				
			||||||
	pathJSON, err := json.Marshal(route.ASPath)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return fmt.Errorf("failed to encode AS path: %w", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	_, err = d.db.Exec(query,
 | 
					 | 
				
			||||||
		route.ID.String(),
 | 
					 | 
				
			||||||
		route.Prefix,
 | 
					 | 
				
			||||||
		route.MaskLength,
 | 
					 | 
				
			||||||
		route.IPVersion,
 | 
					 | 
				
			||||||
		route.OriginASN,
 | 
					 | 
				
			||||||
		route.PeerIP,
 | 
					 | 
				
			||||||
		string(pathJSON),
 | 
					 | 
				
			||||||
		route.NextHop,
 | 
					 | 
				
			||||||
		route.LastUpdated,
 | 
					 | 
				
			||||||
	)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return err
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// DeleteLiveRoute deletes a live route
 | 
					 | 
				
			||||||
// If originASN is 0, deletes all routes for the prefix/peer combination
 | 
					 | 
				
			||||||
func (d *Database) DeleteLiveRoute(prefix string, originASN int, peerIP string) error {
 | 
					 | 
				
			||||||
	var query string
 | 
					 | 
				
			||||||
	var err error
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if originASN == 0 {
 | 
					 | 
				
			||||||
		// Delete all routes for this prefix from this peer
 | 
					 | 
				
			||||||
		query = `DELETE FROM live_routes WHERE prefix = ? AND peer_ip = ?`
 | 
					 | 
				
			||||||
		_, err = d.db.Exec(query, prefix, peerIP)
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		// Delete specific route
 | 
					 | 
				
			||||||
		query = `DELETE FROM live_routes WHERE prefix = ? AND origin_asn = ? AND peer_ip = ?`
 | 
					 | 
				
			||||||
		_, err = d.db.Exec(query, prefix, originASN, peerIP)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return err
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// GetPrefixDistribution returns the distribution of prefixes by mask length
 | 
					 | 
				
			||||||
func (d *Database) GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error) {
 | 
					 | 
				
			||||||
	// IPv4 distribution
 | 
					 | 
				
			||||||
	query := `
 | 
					 | 
				
			||||||
		SELECT mask_length, COUNT(*) as count
 | 
					 | 
				
			||||||
		FROM live_routes
 | 
					 | 
				
			||||||
		WHERE ip_version = 4
 | 
					 | 
				
			||||||
		GROUP BY mask_length
 | 
					 | 
				
			||||||
		ORDER BY mask_length
 | 
					 | 
				
			||||||
	`
 | 
					 | 
				
			||||||
	rows, err := d.db.Query(query)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return nil, nil, fmt.Errorf("failed to query IPv4 distribution: %w", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer func() { _ = rows.Close() }()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for rows.Next() {
 | 
					 | 
				
			||||||
		var dist PrefixDistribution
 | 
					 | 
				
			||||||
		if err := rows.Scan(&dist.MaskLength, &dist.Count); err != nil {
 | 
					 | 
				
			||||||
			return nil, nil, fmt.Errorf("failed to scan IPv4 distribution: %w", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		ipv4 = append(ipv4, dist)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// IPv6 distribution
 | 
					 | 
				
			||||||
	query = `
 | 
					 | 
				
			||||||
		SELECT mask_length, COUNT(*) as count
 | 
					 | 
				
			||||||
		FROM live_routes
 | 
					 | 
				
			||||||
		WHERE ip_version = 6
 | 
					 | 
				
			||||||
		GROUP BY mask_length
 | 
					 | 
				
			||||||
		ORDER BY mask_length
 | 
					 | 
				
			||||||
	`
 | 
					 | 
				
			||||||
	rows, err = d.db.Query(query)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return nil, nil, fmt.Errorf("failed to query IPv6 distribution: %w", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer func() { _ = rows.Close() }()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for rows.Next() {
 | 
					 | 
				
			||||||
		var dist PrefixDistribution
 | 
					 | 
				
			||||||
		if err := rows.Scan(&dist.MaskLength, &dist.Count); err != nil {
 | 
					 | 
				
			||||||
			return nil, nil, fmt.Errorf("failed to scan IPv6 distribution: %w", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		ipv6 = append(ipv6, dist)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return ipv4, ipv6, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
				
			|||||||
@ -6,15 +6,12 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// Stats contains database statistics
 | 
					// Stats contains database statistics
 | 
				
			||||||
type Stats struct {
 | 
					type Stats struct {
 | 
				
			||||||
	ASNs                   int
 | 
						ASNs          int
 | 
				
			||||||
	Prefixes               int
 | 
						Prefixes      int
 | 
				
			||||||
	IPv4Prefixes           int
 | 
						IPv4Prefixes  int
 | 
				
			||||||
	IPv6Prefixes           int
 | 
						IPv6Prefixes  int
 | 
				
			||||||
	Peerings               int
 | 
						Peerings      int
 | 
				
			||||||
	FileSizeBytes          int64
 | 
						FileSizeBytes int64
 | 
				
			||||||
	LiveRoutes             int
 | 
					 | 
				
			||||||
	IPv4PrefixDistribution []PrefixDistribution
 | 
					 | 
				
			||||||
	IPv6PrefixDistribution []PrefixDistribution
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Store defines the interface for database operations
 | 
					// Store defines the interface for database operations
 | 
				
			||||||
@ -37,11 +34,6 @@ type Store interface {
 | 
				
			|||||||
	// Peer operations
 | 
						// Peer operations
 | 
				
			||||||
	UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error
 | 
						UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Live route operations
 | 
					 | 
				
			||||||
	UpsertLiveRoute(route *LiveRoute) error
 | 
					 | 
				
			||||||
	DeleteLiveRoute(prefix string, originASN int, peerIP string) error
 | 
					 | 
				
			||||||
	GetPrefixDistribution() (ipv4 []PrefixDistribution, ipv6 []PrefixDistribution, err error)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Lifecycle
 | 
						// Lifecycle
 | 
				
			||||||
	Close() error
 | 
						Close() error
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -45,22 +45,3 @@ type ASNPeering struct {
 | 
				
			|||||||
	FirstSeen time.Time `json:"first_seen"`
 | 
						FirstSeen time.Time `json:"first_seen"`
 | 
				
			||||||
	LastSeen  time.Time `json:"last_seen"`
 | 
						LastSeen  time.Time `json:"last_seen"`
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
// LiveRoute represents a route in the live routing table
 | 
					 | 
				
			||||||
type LiveRoute struct {
 | 
					 | 
				
			||||||
	ID          uuid.UUID `json:"id"`
 | 
					 | 
				
			||||||
	Prefix      string    `json:"prefix"`
 | 
					 | 
				
			||||||
	MaskLength  int       `json:"mask_length"`
 | 
					 | 
				
			||||||
	IPVersion   int       `json:"ip_version"`
 | 
					 | 
				
			||||||
	OriginASN   int       `json:"origin_asn"`
 | 
					 | 
				
			||||||
	PeerIP      string    `json:"peer_ip"`
 | 
					 | 
				
			||||||
	ASPath      []int     `json:"as_path"`
 | 
					 | 
				
			||||||
	NextHop     string    `json:"next_hop"`
 | 
					 | 
				
			||||||
	LastUpdated time.Time `json:"last_updated"`
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// PrefixDistribution represents the distribution of prefixes by mask length
 | 
					 | 
				
			||||||
type PrefixDistribution struct {
 | 
					 | 
				
			||||||
	MaskLength int `json:"mask_length"`
 | 
					 | 
				
			||||||
	Count      int `json:"count"`
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
				
			|||||||
@ -68,24 +68,4 @@ CREATE INDEX IF NOT EXISTS idx_asns_number ON asns(number);
 | 
				
			|||||||
-- Indexes for bgp_peers table
 | 
					-- Indexes for bgp_peers table
 | 
				
			||||||
CREATE INDEX IF NOT EXISTS idx_bgp_peers_asn ON bgp_peers(peer_asn);
 | 
					CREATE INDEX IF NOT EXISTS idx_bgp_peers_asn ON bgp_peers(peer_asn);
 | 
				
			||||||
CREATE INDEX IF NOT EXISTS idx_bgp_peers_last_seen ON bgp_peers(last_seen);
 | 
					CREATE INDEX IF NOT EXISTS idx_bgp_peers_last_seen ON bgp_peers(last_seen);
 | 
				
			||||||
CREATE INDEX IF NOT EXISTS idx_bgp_peers_ip ON bgp_peers(peer_ip);
 | 
					CREATE INDEX IF NOT EXISTS idx_bgp_peers_ip ON bgp_peers(peer_ip);
 | 
				
			||||||
 | 
					 | 
				
			||||||
-- Live routing table maintained by PrefixHandler
 | 
					 | 
				
			||||||
CREATE TABLE IF NOT EXISTS live_routes (
 | 
					 | 
				
			||||||
	id TEXT PRIMARY KEY,
 | 
					 | 
				
			||||||
	prefix TEXT NOT NULL,
 | 
					 | 
				
			||||||
	mask_length INTEGER NOT NULL, -- CIDR mask length (0-32 for IPv4, 0-128 for IPv6)
 | 
					 | 
				
			||||||
	ip_version INTEGER NOT NULL, -- 4 or 6
 | 
					 | 
				
			||||||
	origin_asn INTEGER NOT NULL,
 | 
					 | 
				
			||||||
	peer_ip TEXT NOT NULL,
 | 
					 | 
				
			||||||
	as_path TEXT NOT NULL, -- JSON array
 | 
					 | 
				
			||||||
	next_hop TEXT NOT NULL,
 | 
					 | 
				
			||||||
	last_updated DATETIME NOT NULL,
 | 
					 | 
				
			||||||
	UNIQUE(prefix, origin_asn, peer_ip)
 | 
					 | 
				
			||||||
);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- Indexes for live_routes table
 | 
					 | 
				
			||||||
CREATE INDEX IF NOT EXISTS idx_live_routes_prefix ON live_routes(prefix);
 | 
					 | 
				
			||||||
CREATE INDEX IF NOT EXISTS idx_live_routes_mask_length ON live_routes(mask_length);
 | 
					 | 
				
			||||||
CREATE INDEX IF NOT EXISTS idx_live_routes_ip_version_mask ON live_routes(ip_version, mask_length);
 | 
					 | 
				
			||||||
CREATE INDEX IF NOT EXISTS idx_live_routes_last_updated ON live_routes(last_updated);
 | 
					 | 
				
			||||||
@ -40,19 +40,18 @@ type Dependencies struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// RouteWatch represents the main application instance
 | 
					// RouteWatch represents the main application instance
 | 
				
			||||||
type RouteWatch struct {
 | 
					type RouteWatch struct {
 | 
				
			||||||
	db            database.Store
 | 
						db                 database.Store
 | 
				
			||||||
	routingTable  *routingtable.RoutingTable
 | 
						routingTable       *routingtable.RoutingTable
 | 
				
			||||||
	streamer      *streamer.Streamer
 | 
						streamer           *streamer.Streamer
 | 
				
			||||||
	server        *server.Server
 | 
						server             *server.Server
 | 
				
			||||||
	snapshotter   *snapshotter.Snapshotter
 | 
						snapshotter        *snapshotter.Snapshotter
 | 
				
			||||||
	logger        *logger.Logger
 | 
						logger             *logger.Logger
 | 
				
			||||||
	maxRuntime    time.Duration
 | 
						maxRuntime         time.Duration
 | 
				
			||||||
	shutdown      bool
 | 
						shutdown           bool
 | 
				
			||||||
	mu            sync.Mutex
 | 
						mu                 sync.Mutex
 | 
				
			||||||
	config        *config.Config
 | 
						config             *config.Config
 | 
				
			||||||
	dbHandler     *DBHandler
 | 
						batchedDBHandler   *BatchedDatabaseHandler
 | 
				
			||||||
	peerHandler   *PeerHandler
 | 
						batchedPeerHandler *BatchedPeerHandler
 | 
				
			||||||
	prefixHandler *PrefixHandler
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// isTruthy returns true if the value is considered truthy
 | 
					// isTruthy returns true if the value is considered truthy
 | 
				
			||||||
@ -107,19 +106,17 @@ func (rw *RouteWatch) Run(ctx context.Context) error {
 | 
				
			|||||||
	// Register database handler to process BGP UPDATE messages
 | 
						// Register database handler to process BGP UPDATE messages
 | 
				
			||||||
	if rw.config.EnableBatchedDatabaseWrites {
 | 
						if rw.config.EnableBatchedDatabaseWrites {
 | 
				
			||||||
		rw.logger.Info("Using batched database handlers for improved performance")
 | 
							rw.logger.Info("Using batched database handlers for improved performance")
 | 
				
			||||||
		rw.dbHandler = NewDBHandler(rw.db, rw.logger)
 | 
							rw.batchedDBHandler = NewBatchedDatabaseHandler(rw.db, rw.logger)
 | 
				
			||||||
		rw.streamer.RegisterHandler(rw.dbHandler)
 | 
							rw.streamer.RegisterHandler(rw.batchedDBHandler)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		rw.peerHandler = NewPeerHandler(rw.db, rw.logger)
 | 
							rw.batchedPeerHandler = NewBatchedPeerHandler(rw.db, rw.logger)
 | 
				
			||||||
		rw.streamer.RegisterHandler(rw.peerHandler)
 | 
							rw.streamer.RegisterHandler(rw.batchedPeerHandler)
 | 
				
			||||||
 | 
					 | 
				
			||||||
		rw.prefixHandler = NewPrefixHandler(rw.db, rw.logger)
 | 
					 | 
				
			||||||
		rw.streamer.RegisterHandler(rw.prefixHandler)
 | 
					 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		// Non-batched handlers not implemented yet
 | 
							dbHandler := NewDatabaseHandler(rw.db, rw.logger)
 | 
				
			||||||
		rw.logger.Error("Non-batched handlers not implemented")
 | 
							rw.streamer.RegisterHandler(dbHandler)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return fmt.Errorf("non-batched handlers not implemented")
 | 
							peerHandler := NewPeerHandler(rw.db, rw.logger)
 | 
				
			||||||
 | 
							rw.streamer.RegisterHandler(peerHandler)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Register routing table handler to maintain in-memory routing table
 | 
						// Register routing table handler to maintain in-memory routing table
 | 
				
			||||||
@ -162,25 +159,18 @@ func (rw *RouteWatch) Shutdown() {
 | 
				
			|||||||
	rw.mu.Unlock()
 | 
						rw.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Stop batched handlers first to flush remaining batches
 | 
						// Stop batched handlers first to flush remaining batches
 | 
				
			||||||
	if rw.dbHandler != nil {
 | 
						if rw.batchedDBHandler != nil {
 | 
				
			||||||
		rw.logger.Info("Flushing database handler")
 | 
							rw.logger.Info("Flushing batched database handler")
 | 
				
			||||||
		rw.dbHandler.Stop()
 | 
							rw.batchedDBHandler.Stop()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if rw.peerHandler != nil {
 | 
						if rw.batchedPeerHandler != nil {
 | 
				
			||||||
		rw.logger.Info("Flushing peer handler")
 | 
							rw.logger.Info("Flushing batched peer handler")
 | 
				
			||||||
		rw.peerHandler.Stop()
 | 
							rw.batchedPeerHandler.Stop()
 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if rw.prefixHandler != nil {
 | 
					 | 
				
			||||||
		rw.logger.Info("Flushing prefix handler")
 | 
					 | 
				
			||||||
		rw.prefixHandler.Stop()
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Stop services
 | 
						// Stop services
 | 
				
			||||||
	rw.streamer.Stop()
 | 
						rw.streamer.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Stop routing table expiration
 | 
					 | 
				
			||||||
	rw.routingTable.Stop()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Stop HTTP server with a timeout
 | 
						// Stop HTTP server with a timeout
 | 
				
			||||||
	const serverStopTimeout = 5 * time.Second
 | 
						const serverStopTimeout = 5 * time.Second
 | 
				
			||||||
	stopCtx, cancel := context.WithTimeout(context.Background(), serverStopTimeout)
 | 
						stopCtx, cancel := context.WithTimeout(context.Background(), serverStopTimeout)
 | 
				
			||||||
 | 
				
			|||||||
@ -157,24 +157,6 @@ func (m *mockStore) GetStats() (database.Stats, error) {
 | 
				
			|||||||
	}, nil
 | 
						}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// UpsertLiveRoute mock implementation
 | 
					 | 
				
			||||||
func (m *mockStore) UpsertLiveRoute(route *database.LiveRoute) error {
 | 
					 | 
				
			||||||
	// Simple mock - just return nil
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// DeleteLiveRoute mock implementation
 | 
					 | 
				
			||||||
func (m *mockStore) DeleteLiveRoute(prefix string, originASN int, peerIP string) error {
 | 
					 | 
				
			||||||
	// Simple mock - just return nil
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// GetPrefixDistribution mock implementation
 | 
					 | 
				
			||||||
func (m *mockStore) GetPrefixDistribution() (ipv4 []database.PrefixDistribution, ipv6 []database.PrefixDistribution, err error) {
 | 
					 | 
				
			||||||
	// Return empty distributions for now
 | 
					 | 
				
			||||||
	return nil, nil, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestRouteWatchLiveFeed(t *testing.T) {
 | 
					func TestRouteWatchLiveFeed(t *testing.T) {
 | 
				
			||||||
	// Disable snapshotter for tests
 | 
						// Disable snapshotter for tests
 | 
				
			||||||
	t.Setenv("ROUTEWATCH_DISABLE_SNAPSHOTTER", "1")
 | 
						t.Setenv("ROUTEWATCH_DISABLE_SNAPSHOTTER", "1")
 | 
				
			||||||
@ -193,9 +175,8 @@ func TestRouteWatchLiveFeed(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// Create test config with empty state dir (no snapshot loading)
 | 
						// Create test config with empty state dir (no snapshot loading)
 | 
				
			||||||
	cfg := &config.Config{
 | 
						cfg := &config.Config{
 | 
				
			||||||
		StateDir:                    "",
 | 
							StateDir:   "",
 | 
				
			||||||
		MaxRuntime:                  5 * time.Second,
 | 
							MaxRuntime: 5 * time.Second,
 | 
				
			||||||
		EnableBatchedDatabaseWrites: true,
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Create routing table
 | 
						// Create routing table
 | 
				
			||||||
 | 
				
			|||||||
@ -1,92 +1,47 @@
 | 
				
			|||||||
package routewatch
 | 
					package routewatch
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"sync"
 | 
					 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	"git.eeqj.de/sneak/routewatch/internal/database"
 | 
						"git.eeqj.de/sneak/routewatch/internal/database"
 | 
				
			||||||
	"git.eeqj.de/sneak/routewatch/internal/logger"
 | 
						"git.eeqj.de/sneak/routewatch/internal/logger"
 | 
				
			||||||
	"git.eeqj.de/sneak/routewatch/internal/ristypes"
 | 
						"git.eeqj.de/sneak/routewatch/internal/ristypes"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	// dbHandlerQueueSize is the queue capacity for database operations
 | 
						// databaseHandlerQueueSize is the queue capacity for database operations
 | 
				
			||||||
	dbHandlerQueueSize = 50000
 | 
						databaseHandlerQueueSize = 200
 | 
				
			||||||
 | 
					 | 
				
			||||||
	// batchSize is the number of operations to batch together
 | 
					 | 
				
			||||||
	batchSize = 32000
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// batchTimeout is the maximum time to wait before flushing a batch
 | 
					 | 
				
			||||||
	batchTimeout = 5 * time.Second
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// DBHandler handles BGP messages and stores them in the database using batched operations
 | 
					// DatabaseHandler handles BGP messages and stores them in the database
 | 
				
			||||||
type DBHandler struct {
 | 
					type DatabaseHandler struct {
 | 
				
			||||||
	db     database.Store
 | 
						db     database.Store
 | 
				
			||||||
	logger *logger.Logger
 | 
						logger *logger.Logger
 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Batching
 | 
					 | 
				
			||||||
	mu           sync.Mutex
 | 
					 | 
				
			||||||
	prefixBatch  []prefixOp
 | 
					 | 
				
			||||||
	asnBatch     []asnOp
 | 
					 | 
				
			||||||
	peeringBatch []peeringOp
 | 
					 | 
				
			||||||
	lastFlush    time.Time
 | 
					 | 
				
			||||||
	stopCh       chan struct{}
 | 
					 | 
				
			||||||
	wg           sync.WaitGroup
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type prefixOp struct {
 | 
					// NewDatabaseHandler creates a new database handler
 | 
				
			||||||
	prefix    string
 | 
					func NewDatabaseHandler(
 | 
				
			||||||
	timestamp time.Time
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type asnOp struct {
 | 
					 | 
				
			||||||
	number    int
 | 
					 | 
				
			||||||
	timestamp time.Time
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type peeringOp struct {
 | 
					 | 
				
			||||||
	fromASN   int
 | 
					 | 
				
			||||||
	toASN     int
 | 
					 | 
				
			||||||
	timestamp time.Time
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// NewDBHandler creates a new batched database handler
 | 
					 | 
				
			||||||
func NewDBHandler(
 | 
					 | 
				
			||||||
	db database.Store,
 | 
						db database.Store,
 | 
				
			||||||
	logger *logger.Logger,
 | 
						logger *logger.Logger,
 | 
				
			||||||
) *DBHandler {
 | 
					) *DatabaseHandler {
 | 
				
			||||||
	h := &DBHandler{
 | 
						return &DatabaseHandler{
 | 
				
			||||||
		db:           db,
 | 
							db:     db,
 | 
				
			||||||
		logger:       logger,
 | 
							logger: logger,
 | 
				
			||||||
		prefixBatch:  make([]prefixOp, 0, batchSize),
 | 
					 | 
				
			||||||
		asnBatch:     make([]asnOp, 0, batchSize),
 | 
					 | 
				
			||||||
		peeringBatch: make([]peeringOp, 0, batchSize),
 | 
					 | 
				
			||||||
		lastFlush:    time.Now(),
 | 
					 | 
				
			||||||
		stopCh:       make(chan struct{}),
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Start the flush timer goroutine
 | 
					 | 
				
			||||||
	h.wg.Add(1)
 | 
					 | 
				
			||||||
	go h.flushLoop()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return h
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// WantsMessage returns true if this handler wants to process messages of the given type
 | 
					// WantsMessage returns true if this handler wants to process messages of the given type
 | 
				
			||||||
func (h *DBHandler) WantsMessage(messageType string) bool {
 | 
					func (h *DatabaseHandler) WantsMessage(messageType string) bool {
 | 
				
			||||||
	// We only care about UPDATE messages for the database
 | 
						// We only care about UPDATE messages for the database
 | 
				
			||||||
	return messageType == "UPDATE"
 | 
						return messageType == "UPDATE"
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// QueueCapacity returns the desired queue capacity for this handler
 | 
					// QueueCapacity returns the desired queue capacity for this handler
 | 
				
			||||||
func (h *DBHandler) QueueCapacity() int {
 | 
					func (h *DatabaseHandler) QueueCapacity() int {
 | 
				
			||||||
	// Batching allows us to use a larger queue
 | 
						// Database operations are slow, so use a smaller queue
 | 
				
			||||||
	return dbHandlerQueueSize
 | 
						return databaseHandlerQueueSize
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// HandleMessage processes a RIS message and queues database operations
 | 
					// HandleMessage processes a RIS message and updates the database
 | 
				
			||||||
func (h *DBHandler) HandleMessage(msg *ristypes.RISMessage) {
 | 
					func (h *DatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
 | 
				
			||||||
	// Use the pre-parsed timestamp
 | 
						// Use the pre-parsed timestamp
 | 
				
			||||||
	timestamp := msg.ParsedTimestamp
 | 
						timestamp := msg.ParsedTimestamp
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -96,168 +51,105 @@ func (h *DBHandler) HandleMessage(msg *ristypes.RISMessage) {
 | 
				
			|||||||
		originASN = msg.Path[len(msg.Path)-1]
 | 
							originASN = msg.Path[len(msg.Path)-1]
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	h.mu.Lock()
 | 
						// Process announcements
 | 
				
			||||||
	defer h.mu.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Queue operations for announcements
 | 
					 | 
				
			||||||
	for _, announcement := range msg.Announcements {
 | 
						for _, announcement := range msg.Announcements {
 | 
				
			||||||
		for _, prefix := range announcement.Prefixes {
 | 
							for _, prefix := range announcement.Prefixes {
 | 
				
			||||||
			// Queue prefix operation
 | 
								// Get or create prefix
 | 
				
			||||||
			h.prefixBatch = append(h.prefixBatch, prefixOp{
 | 
								_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
 | 
				
			||||||
				prefix:    prefix,
 | 
								if err != nil {
 | 
				
			||||||
				timestamp: timestamp,
 | 
									h.logger.Error(
 | 
				
			||||||
			})
 | 
										"Failed to get/create prefix",
 | 
				
			||||||
 | 
										"prefix",
 | 
				
			||||||
 | 
										prefix,
 | 
				
			||||||
 | 
										"error",
 | 
				
			||||||
 | 
										err,
 | 
				
			||||||
 | 
									)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			// Queue origin ASN operation
 | 
									continue
 | 
				
			||||||
			if originASN > 0 {
 | 
					 | 
				
			||||||
				h.asnBatch = append(h.asnBatch, asnOp{
 | 
					 | 
				
			||||||
					number:    originASN,
 | 
					 | 
				
			||||||
					timestamp: timestamp,
 | 
					 | 
				
			||||||
				})
 | 
					 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			// Process AS path to queue peering operations
 | 
								// Get or create origin ASN
 | 
				
			||||||
 | 
								_, err = h.db.GetOrCreateASN(originASN, timestamp)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									h.logger.Error(
 | 
				
			||||||
 | 
										"Failed to get/create ASN",
 | 
				
			||||||
 | 
										"asn",
 | 
				
			||||||
 | 
										originASN,
 | 
				
			||||||
 | 
										"error",
 | 
				
			||||||
 | 
										err,
 | 
				
			||||||
 | 
									)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// TODO: Record the announcement in the announcements table
 | 
				
			||||||
 | 
								// Process AS path to update peerings
 | 
				
			||||||
			if len(msg.Path) > 1 {
 | 
								if len(msg.Path) > 1 {
 | 
				
			||||||
				for i := range len(msg.Path) - 1 {
 | 
									for i := range len(msg.Path) - 1 {
 | 
				
			||||||
					fromASN := msg.Path[i]
 | 
										fromASN := msg.Path[i]
 | 
				
			||||||
					toASN := msg.Path[i+1]
 | 
										toASN := msg.Path[i+1]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
					// Queue ASN operations
 | 
										// Get or create both ASNs
 | 
				
			||||||
					h.asnBatch = append(h.asnBatch, asnOp{
 | 
										fromAS, err := h.db.GetOrCreateASN(fromASN, timestamp)
 | 
				
			||||||
						number:    fromASN,
 | 
										if err != nil {
 | 
				
			||||||
						timestamp: timestamp,
 | 
											h.logger.Error(
 | 
				
			||||||
					})
 | 
												"Failed to get/create from ASN",
 | 
				
			||||||
					h.asnBatch = append(h.asnBatch, asnOp{
 | 
												"asn",
 | 
				
			||||||
						number:    toASN,
 | 
												fromASN,
 | 
				
			||||||
						timestamp: timestamp,
 | 
												"error",
 | 
				
			||||||
					})
 | 
												err,
 | 
				
			||||||
 | 
											)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
					// Queue peering operation
 | 
											continue
 | 
				
			||||||
					h.peeringBatch = append(h.peeringBatch, peeringOp{
 | 
										}
 | 
				
			||||||
						fromASN:   fromASN,
 | 
					
 | 
				
			||||||
						toASN:     toASN,
 | 
										toAS, err := h.db.GetOrCreateASN(toASN, timestamp)
 | 
				
			||||||
						timestamp: timestamp,
 | 
										if err != nil {
 | 
				
			||||||
					})
 | 
											h.logger.Error(
 | 
				
			||||||
 | 
												"Failed to get/create to ASN",
 | 
				
			||||||
 | 
												"asn",
 | 
				
			||||||
 | 
												toASN,
 | 
				
			||||||
 | 
												"error",
 | 
				
			||||||
 | 
												err,
 | 
				
			||||||
 | 
											)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
											continue
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
										// Record the peering
 | 
				
			||||||
 | 
										err = h.db.RecordPeering(
 | 
				
			||||||
 | 
											fromAS.ID.String(),
 | 
				
			||||||
 | 
											toAS.ID.String(),
 | 
				
			||||||
 | 
											timestamp,
 | 
				
			||||||
 | 
										)
 | 
				
			||||||
 | 
										if err != nil {
 | 
				
			||||||
 | 
											h.logger.Error("Failed to record peering",
 | 
				
			||||||
 | 
												"from_asn", fromASN,
 | 
				
			||||||
 | 
												"to_asn", toASN,
 | 
				
			||||||
 | 
												"error", err,
 | 
				
			||||||
 | 
											)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Queue operations for withdrawals
 | 
						// Process withdrawals
 | 
				
			||||||
	for _, prefix := range msg.Withdrawals {
 | 
						for _, prefix := range msg.Withdrawals {
 | 
				
			||||||
		h.prefixBatch = append(h.prefixBatch, prefixOp{
 | 
							// Get prefix
 | 
				
			||||||
			prefix:    prefix,
 | 
							_, err := h.db.GetOrCreatePrefix(prefix, timestamp)
 | 
				
			||||||
			timestamp: timestamp,
 | 
					 | 
				
			||||||
		})
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Check if we need to flush
 | 
					 | 
				
			||||||
	if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize {
 | 
					 | 
				
			||||||
		h.flushBatchesLocked()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// flushLoop runs in a goroutine and periodically flushes batches
 | 
					 | 
				
			||||||
func (h *DBHandler) flushLoop() {
 | 
					 | 
				
			||||||
	defer h.wg.Done()
 | 
					 | 
				
			||||||
	ticker := time.NewTicker(batchTimeout)
 | 
					 | 
				
			||||||
	defer ticker.Stop()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for {
 | 
					 | 
				
			||||||
		select {
 | 
					 | 
				
			||||||
		case <-ticker.C:
 | 
					 | 
				
			||||||
			h.mu.Lock()
 | 
					 | 
				
			||||||
			if time.Since(h.lastFlush) >= batchTimeout {
 | 
					 | 
				
			||||||
				h.flushBatchesLocked()
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			h.mu.Unlock()
 | 
					 | 
				
			||||||
		case <-h.stopCh:
 | 
					 | 
				
			||||||
			// Final flush
 | 
					 | 
				
			||||||
			h.mu.Lock()
 | 
					 | 
				
			||||||
			h.flushBatchesLocked()
 | 
					 | 
				
			||||||
			h.mu.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			return
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// flushBatchesLocked flushes all batches to the database (must be called with mutex held)
 | 
					 | 
				
			||||||
func (h *DBHandler) flushBatchesLocked() {
 | 
					 | 
				
			||||||
	if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Process ASNs first (deduped)
 | 
					 | 
				
			||||||
	asnMap := make(map[int]time.Time)
 | 
					 | 
				
			||||||
	for _, op := range h.asnBatch {
 | 
					 | 
				
			||||||
		if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) {
 | 
					 | 
				
			||||||
			asnMap[op.number] = op.timestamp
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	asnCache := make(map[int]*database.ASN)
 | 
					 | 
				
			||||||
	for asn, ts := range asnMap {
 | 
					 | 
				
			||||||
		asnObj, err := h.db.GetOrCreateASN(asn, ts)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err)
 | 
								h.logger.Error(
 | 
				
			||||||
 | 
									"Failed to get prefix for withdrawal",
 | 
				
			||||||
 | 
									"prefix",
 | 
				
			||||||
 | 
									prefix,
 | 
				
			||||||
 | 
									"error",
 | 
				
			||||||
 | 
									err,
 | 
				
			||||||
 | 
								)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		asnCache[asn] = asnObj
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Process prefixes (deduped)
 | 
							// TODO: Record the withdrawal in the announcements table as a withdrawal
 | 
				
			||||||
	prefixMap := make(map[string]time.Time)
 | 
					 | 
				
			||||||
	for _, op := range h.prefixBatch {
 | 
					 | 
				
			||||||
		if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) {
 | 
					 | 
				
			||||||
			prefixMap[op.prefix] = op.timestamp
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	for prefix, ts := range prefixMap {
 | 
					 | 
				
			||||||
		_, err := h.db.GetOrCreatePrefix(prefix, ts)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Process peerings (deduped)
 | 
					 | 
				
			||||||
	type peeringKey struct {
 | 
					 | 
				
			||||||
		from, to int
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	peeringMap := make(map[peeringKey]time.Time)
 | 
					 | 
				
			||||||
	for _, op := range h.peeringBatch {
 | 
					 | 
				
			||||||
		key := peeringKey{from: op.fromASN, to: op.toASN}
 | 
					 | 
				
			||||||
		if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) {
 | 
					 | 
				
			||||||
			peeringMap[key] = op.timestamp
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for key, ts := range peeringMap {
 | 
					 | 
				
			||||||
		fromAS := asnCache[key.from]
 | 
					 | 
				
			||||||
		toAS := asnCache[key.to]
 | 
					 | 
				
			||||||
		if fromAS != nil && toAS != nil {
 | 
					 | 
				
			||||||
			err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts)
 | 
					 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				h.logger.Error("Failed to record peering",
 | 
					 | 
				
			||||||
					"from_asn", key.from,
 | 
					 | 
				
			||||||
					"to_asn", key.to,
 | 
					 | 
				
			||||||
					"error", err,
 | 
					 | 
				
			||||||
				)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Clear batches
 | 
					 | 
				
			||||||
	h.prefixBatch = h.prefixBatch[:0]
 | 
					 | 
				
			||||||
	h.asnBatch = h.asnBatch[:0]
 | 
					 | 
				
			||||||
	h.peeringBatch = h.peeringBatch[:0]
 | 
					 | 
				
			||||||
	h.lastFlush = time.Now()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Stop gracefully stops the handler and flushes remaining batches
 | 
					 | 
				
			||||||
func (h *DBHandler) Stop() {
 | 
					 | 
				
			||||||
	close(h.stopCh)
 | 
					 | 
				
			||||||
	h.wg.Wait()
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										272
									
								
								internal/routewatch/dbhandler_batched.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										272
									
								
								internal/routewatch/dbhandler_batched.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,272 @@
 | 
				
			|||||||
 | 
					package routewatch
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"git.eeqj.de/sneak/routewatch/internal/database"
 | 
				
			||||||
 | 
						"git.eeqj.de/sneak/routewatch/internal/logger"
 | 
				
			||||||
 | 
						"git.eeqj.de/sneak/routewatch/internal/ristypes"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						// batchedDatabaseHandlerQueueSize is the queue capacity for database operations
 | 
				
			||||||
 | 
						batchedDatabaseHandlerQueueSize = 1000
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// batchSize is the number of operations to batch together
 | 
				
			||||||
 | 
						batchSize = 500
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// batchTimeout is the maximum time to wait before flushing a batch
 | 
				
			||||||
 | 
						batchTimeout = 5 * time.Second
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// BatchedDatabaseHandler handles BGP messages and stores them in the database using batched operations
 | 
				
			||||||
 | 
					type BatchedDatabaseHandler struct {
 | 
				
			||||||
 | 
						db     database.Store
 | 
				
			||||||
 | 
						logger *logger.Logger
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Batching
 | 
				
			||||||
 | 
						mu           sync.Mutex
 | 
				
			||||||
 | 
						prefixBatch  []prefixOp
 | 
				
			||||||
 | 
						asnBatch     []asnOp
 | 
				
			||||||
 | 
						peeringBatch []peeringOp
 | 
				
			||||||
 | 
						lastFlush    time.Time
 | 
				
			||||||
 | 
						stopCh       chan struct{}
 | 
				
			||||||
 | 
						wg           sync.WaitGroup
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type prefixOp struct {
 | 
				
			||||||
 | 
						prefix    string
 | 
				
			||||||
 | 
						timestamp time.Time
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type asnOp struct {
 | 
				
			||||||
 | 
						number    int
 | 
				
			||||||
 | 
						timestamp time.Time
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type peeringOp struct {
 | 
				
			||||||
 | 
						fromASN   int
 | 
				
			||||||
 | 
						toASN     int
 | 
				
			||||||
 | 
						timestamp time.Time
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewBatchedDatabaseHandler creates a new batched database handler
 | 
				
			||||||
 | 
					func NewBatchedDatabaseHandler(
 | 
				
			||||||
 | 
						db database.Store,
 | 
				
			||||||
 | 
						logger *logger.Logger,
 | 
				
			||||||
 | 
					) *BatchedDatabaseHandler {
 | 
				
			||||||
 | 
						h := &BatchedDatabaseHandler{
 | 
				
			||||||
 | 
							db:           db,
 | 
				
			||||||
 | 
							logger:       logger,
 | 
				
			||||||
 | 
							prefixBatch:  make([]prefixOp, 0, batchSize),
 | 
				
			||||||
 | 
							asnBatch:     make([]asnOp, 0, batchSize),
 | 
				
			||||||
 | 
							peeringBatch: make([]peeringOp, 0, batchSize),
 | 
				
			||||||
 | 
							lastFlush:    time.Now(),
 | 
				
			||||||
 | 
							stopCh:       make(chan struct{}),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Start the flush timer goroutine
 | 
				
			||||||
 | 
						h.wg.Add(1)
 | 
				
			||||||
 | 
						go h.flushLoop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return h
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// WantsMessage returns true if this handler wants to process messages of the given type
 | 
				
			||||||
 | 
					func (h *BatchedDatabaseHandler) WantsMessage(messageType string) bool {
 | 
				
			||||||
 | 
						// We only care about UPDATE messages for the database
 | 
				
			||||||
 | 
						return messageType == "UPDATE"
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// QueueCapacity returns the desired queue capacity for this handler
 | 
				
			||||||
 | 
					func (h *BatchedDatabaseHandler) QueueCapacity() int {
 | 
				
			||||||
 | 
						// Batching allows us to use a larger queue
 | 
				
			||||||
 | 
						return batchedDatabaseHandlerQueueSize
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// HandleMessage processes a RIS message and queues database operations
 | 
				
			||||||
 | 
					func (h *BatchedDatabaseHandler) HandleMessage(msg *ristypes.RISMessage) {
 | 
				
			||||||
 | 
						// Use the pre-parsed timestamp
 | 
				
			||||||
 | 
						timestamp := msg.ParsedTimestamp
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Get origin ASN from path (last element)
 | 
				
			||||||
 | 
						var originASN int
 | 
				
			||||||
 | 
						if len(msg.Path) > 0 {
 | 
				
			||||||
 | 
							originASN = msg.Path[len(msg.Path)-1]
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						h.mu.Lock()
 | 
				
			||||||
 | 
						defer h.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Queue operations for announcements
 | 
				
			||||||
 | 
						for _, announcement := range msg.Announcements {
 | 
				
			||||||
 | 
							for _, prefix := range announcement.Prefixes {
 | 
				
			||||||
 | 
								// Queue prefix operation
 | 
				
			||||||
 | 
								h.prefixBatch = append(h.prefixBatch, prefixOp{
 | 
				
			||||||
 | 
									prefix:    prefix,
 | 
				
			||||||
 | 
									timestamp: timestamp,
 | 
				
			||||||
 | 
								})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// Queue origin ASN operation
 | 
				
			||||||
 | 
								if originASN > 0 {
 | 
				
			||||||
 | 
									h.asnBatch = append(h.asnBatch, asnOp{
 | 
				
			||||||
 | 
										number:    originASN,
 | 
				
			||||||
 | 
										timestamp: timestamp,
 | 
				
			||||||
 | 
									})
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// Process AS path to queue peering operations
 | 
				
			||||||
 | 
								if len(msg.Path) > 1 {
 | 
				
			||||||
 | 
									for i := range len(msg.Path) - 1 {
 | 
				
			||||||
 | 
										fromASN := msg.Path[i]
 | 
				
			||||||
 | 
										toASN := msg.Path[i+1]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
										// Queue ASN operations
 | 
				
			||||||
 | 
										h.asnBatch = append(h.asnBatch, asnOp{
 | 
				
			||||||
 | 
											number:    fromASN,
 | 
				
			||||||
 | 
											timestamp: timestamp,
 | 
				
			||||||
 | 
										})
 | 
				
			||||||
 | 
										h.asnBatch = append(h.asnBatch, asnOp{
 | 
				
			||||||
 | 
											number:    toASN,
 | 
				
			||||||
 | 
											timestamp: timestamp,
 | 
				
			||||||
 | 
										})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
										// Queue peering operation
 | 
				
			||||||
 | 
										h.peeringBatch = append(h.peeringBatch, peeringOp{
 | 
				
			||||||
 | 
											fromASN:   fromASN,
 | 
				
			||||||
 | 
											toASN:     toASN,
 | 
				
			||||||
 | 
											timestamp: timestamp,
 | 
				
			||||||
 | 
										})
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Queue operations for withdrawals
 | 
				
			||||||
 | 
						for _, prefix := range msg.Withdrawals {
 | 
				
			||||||
 | 
							h.prefixBatch = append(h.prefixBatch, prefixOp{
 | 
				
			||||||
 | 
								prefix:    prefix,
 | 
				
			||||||
 | 
								timestamp: timestamp,
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Check if we need to flush
 | 
				
			||||||
 | 
						if len(h.prefixBatch) >= batchSize || len(h.asnBatch) >= batchSize || len(h.peeringBatch) >= batchSize {
 | 
				
			||||||
 | 
							h.flushBatchesLocked()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// flushLoop runs in a goroutine and periodically flushes batches
 | 
				
			||||||
 | 
					func (h *BatchedDatabaseHandler) flushLoop() {
 | 
				
			||||||
 | 
						defer h.wg.Done()
 | 
				
			||||||
 | 
						ticker := time.NewTicker(batchTimeout)
 | 
				
			||||||
 | 
						defer ticker.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case <-ticker.C:
 | 
				
			||||||
 | 
								h.mu.Lock()
 | 
				
			||||||
 | 
								if time.Since(h.lastFlush) >= batchTimeout {
 | 
				
			||||||
 | 
									h.flushBatchesLocked()
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								h.mu.Unlock()
 | 
				
			||||||
 | 
							case <-h.stopCh:
 | 
				
			||||||
 | 
								// Final flush
 | 
				
			||||||
 | 
								h.mu.Lock()
 | 
				
			||||||
 | 
								h.flushBatchesLocked()
 | 
				
			||||||
 | 
								h.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// flushBatchesLocked flushes all batches to the database (must be called with mutex held)
 | 
				
			||||||
 | 
					func (h *BatchedDatabaseHandler) flushBatchesLocked() {
 | 
				
			||||||
 | 
						if len(h.prefixBatch) == 0 && len(h.asnBatch) == 0 && len(h.peeringBatch) == 0 {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						start := time.Now()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Process ASNs first (deduped)
 | 
				
			||||||
 | 
						asnMap := make(map[int]time.Time)
 | 
				
			||||||
 | 
						for _, op := range h.asnBatch {
 | 
				
			||||||
 | 
							if existing, ok := asnMap[op.number]; !ok || op.timestamp.After(existing) {
 | 
				
			||||||
 | 
								asnMap[op.number] = op.timestamp
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						asnCache := make(map[int]*database.ASN)
 | 
				
			||||||
 | 
						for asn, ts := range asnMap {
 | 
				
			||||||
 | 
							asnObj, err := h.db.GetOrCreateASN(asn, ts)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							asnCache[asn] = asnObj
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Process prefixes (deduped)
 | 
				
			||||||
 | 
						prefixMap := make(map[string]time.Time)
 | 
				
			||||||
 | 
						for _, op := range h.prefixBatch {
 | 
				
			||||||
 | 
							if existing, ok := prefixMap[op.prefix]; !ok || op.timestamp.After(existing) {
 | 
				
			||||||
 | 
								prefixMap[op.prefix] = op.timestamp
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for prefix, ts := range prefixMap {
 | 
				
			||||||
 | 
							_, err := h.db.GetOrCreatePrefix(prefix, ts)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								h.logger.Error("Failed to get/create prefix", "prefix", prefix, "error", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Process peerings (deduped)
 | 
				
			||||||
 | 
						type peeringKey struct {
 | 
				
			||||||
 | 
							from, to int
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						peeringMap := make(map[peeringKey]time.Time)
 | 
				
			||||||
 | 
						for _, op := range h.peeringBatch {
 | 
				
			||||||
 | 
							key := peeringKey{from: op.fromASN, to: op.toASN}
 | 
				
			||||||
 | 
							if existing, ok := peeringMap[key]; !ok || op.timestamp.After(existing) {
 | 
				
			||||||
 | 
								peeringMap[key] = op.timestamp
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for key, ts := range peeringMap {
 | 
				
			||||||
 | 
							fromAS := asnCache[key.from]
 | 
				
			||||||
 | 
							toAS := asnCache[key.to]
 | 
				
			||||||
 | 
							if fromAS != nil && toAS != nil {
 | 
				
			||||||
 | 
								err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									h.logger.Error("Failed to record peering",
 | 
				
			||||||
 | 
										"from_asn", key.from,
 | 
				
			||||||
 | 
										"to_asn", key.to,
 | 
				
			||||||
 | 
										"error", err,
 | 
				
			||||||
 | 
									)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Clear batches
 | 
				
			||||||
 | 
						h.prefixBatch = h.prefixBatch[:0]
 | 
				
			||||||
 | 
						h.asnBatch = h.asnBatch[:0]
 | 
				
			||||||
 | 
						h.peeringBatch = h.peeringBatch[:0]
 | 
				
			||||||
 | 
						h.lastFlush = time.Now()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						h.logger.Debug("Flushed database batches",
 | 
				
			||||||
 | 
							"duration", time.Since(start),
 | 
				
			||||||
 | 
							"asns", len(asnMap),
 | 
				
			||||||
 | 
							"prefixes", len(prefixMap),
 | 
				
			||||||
 | 
							"peerings", len(peeringMap),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Stop gracefully stops the handler and flushes remaining batches
 | 
				
			||||||
 | 
					func (h *BatchedDatabaseHandler) Stop() {
 | 
				
			||||||
 | 
						close(h.stopCh)
 | 
				
			||||||
 | 
						h.wg.Wait()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -2,8 +2,6 @@ package routewatch
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"sync"
 | 
					 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"git.eeqj.de/sneak/routewatch/internal/database"
 | 
						"git.eeqj.de/sneak/routewatch/internal/database"
 | 
				
			||||||
	"git.eeqj.de/sneak/routewatch/internal/logger"
 | 
						"git.eeqj.de/sneak/routewatch/internal/logger"
 | 
				
			||||||
@ -12,50 +10,21 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	// peerHandlerQueueSize is the queue capacity for peer tracking operations
 | 
						// peerHandlerQueueSize is the queue capacity for peer tracking operations
 | 
				
			||||||
	peerHandlerQueueSize = 50000
 | 
						peerHandlerQueueSize = 500
 | 
				
			||||||
 | 
					 | 
				
			||||||
	// peerBatchSize is the number of peer updates to batch together
 | 
					 | 
				
			||||||
	peerBatchSize = 500
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// peerBatchTimeout is the maximum time to wait before flushing a batch
 | 
					 | 
				
			||||||
	peerBatchTimeout = 5 * time.Second
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// PeerHandler tracks BGP peers from all message types using batched operations
 | 
					// PeerHandler tracks BGP peers from all message types
 | 
				
			||||||
type PeerHandler struct {
 | 
					type PeerHandler struct {
 | 
				
			||||||
	db     database.Store
 | 
						db     database.Store
 | 
				
			||||||
	logger *logger.Logger
 | 
						logger *logger.Logger
 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Batching
 | 
					 | 
				
			||||||
	mu        sync.Mutex
 | 
					 | 
				
			||||||
	peerBatch []peerUpdate
 | 
					 | 
				
			||||||
	lastFlush time.Time
 | 
					 | 
				
			||||||
	stopCh    chan struct{}
 | 
					 | 
				
			||||||
	wg        sync.WaitGroup
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type peerUpdate struct {
 | 
					// NewPeerHandler creates a new peer tracking handler
 | 
				
			||||||
	peerIP      string
 | 
					 | 
				
			||||||
	peerASN     int
 | 
					 | 
				
			||||||
	messageType string
 | 
					 | 
				
			||||||
	timestamp   time.Time
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// NewPeerHandler creates a new batched peer tracking handler
 | 
					 | 
				
			||||||
func NewPeerHandler(db database.Store, logger *logger.Logger) *PeerHandler {
 | 
					func NewPeerHandler(db database.Store, logger *logger.Logger) *PeerHandler {
 | 
				
			||||||
	h := &PeerHandler{
 | 
						return &PeerHandler{
 | 
				
			||||||
		db:        db,
 | 
							db:     db,
 | 
				
			||||||
		logger:    logger,
 | 
							logger: logger,
 | 
				
			||||||
		peerBatch: make([]peerUpdate, 0, peerBatchSize),
 | 
					 | 
				
			||||||
		lastFlush: time.Now(),
 | 
					 | 
				
			||||||
		stopCh:    make(chan struct{}),
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Start the flush timer goroutine
 | 
					 | 
				
			||||||
	h.wg.Add(1)
 | 
					 | 
				
			||||||
	go h.flushLoop()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return h
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// WantsMessage returns true for all message types since we track peers from all messages
 | 
					// WantsMessage returns true for all message types since we track peers from all messages
 | 
				
			||||||
@ -65,7 +34,7 @@ func (h *PeerHandler) WantsMessage(_ string) bool {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// QueueCapacity returns the desired queue capacity for this handler
 | 
					// QueueCapacity returns the desired queue capacity for this handler
 | 
				
			||||||
func (h *PeerHandler) QueueCapacity() int {
 | 
					func (h *PeerHandler) QueueCapacity() int {
 | 
				
			||||||
	// Batching allows us to use a larger queue
 | 
						// Peer tracking is lightweight but involves database ops, use moderate queue
 | 
				
			||||||
	return peerHandlerQueueSize
 | 
						return peerHandlerQueueSize
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -79,81 +48,13 @@ func (h *PeerHandler) HandleMessage(msg *ristypes.RISMessage) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	h.mu.Lock()
 | 
						// Update peer in database
 | 
				
			||||||
	defer h.mu.Unlock()
 | 
						if err := h.db.UpdatePeer(msg.Peer, peerASN, msg.Type, msg.ParsedTimestamp); err != nil {
 | 
				
			||||||
 | 
							h.logger.Error("Failed to update peer",
 | 
				
			||||||
	// Add to batch
 | 
								"peer", msg.Peer,
 | 
				
			||||||
	h.peerBatch = append(h.peerBatch, peerUpdate{
 | 
								"peer_asn", peerASN,
 | 
				
			||||||
		peerIP:      msg.Peer,
 | 
								"message_type", msg.Type,
 | 
				
			||||||
		peerASN:     peerASN,
 | 
								"error", err,
 | 
				
			||||||
		messageType: msg.Type,
 | 
							)
 | 
				
			||||||
		timestamp:   msg.ParsedTimestamp,
 | 
					 | 
				
			||||||
	})
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Check if we need to flush
 | 
					 | 
				
			||||||
	if len(h.peerBatch) >= peerBatchSize {
 | 
					 | 
				
			||||||
		h.flushBatchLocked()
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
// flushLoop runs in a goroutine and periodically flushes batches
 | 
					 | 
				
			||||||
func (h *PeerHandler) flushLoop() {
 | 
					 | 
				
			||||||
	defer h.wg.Done()
 | 
					 | 
				
			||||||
	ticker := time.NewTicker(peerBatchTimeout)
 | 
					 | 
				
			||||||
	defer ticker.Stop()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for {
 | 
					 | 
				
			||||||
		select {
 | 
					 | 
				
			||||||
		case <-ticker.C:
 | 
					 | 
				
			||||||
			h.mu.Lock()
 | 
					 | 
				
			||||||
			if time.Since(h.lastFlush) >= peerBatchTimeout {
 | 
					 | 
				
			||||||
				h.flushBatchLocked()
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			h.mu.Unlock()
 | 
					 | 
				
			||||||
		case <-h.stopCh:
 | 
					 | 
				
			||||||
			// Final flush
 | 
					 | 
				
			||||||
			h.mu.Lock()
 | 
					 | 
				
			||||||
			h.flushBatchLocked()
 | 
					 | 
				
			||||||
			h.mu.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			return
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// flushBatchLocked flushes the peer batch to the database (must be called with mutex held)
 | 
					 | 
				
			||||||
func (h *PeerHandler) flushBatchLocked() {
 | 
					 | 
				
			||||||
	if len(h.peerBatch) == 0 {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Deduplicate by peer IP, keeping the latest update for each peer
 | 
					 | 
				
			||||||
	peerMap := make(map[string]peerUpdate)
 | 
					 | 
				
			||||||
	for _, update := range h.peerBatch {
 | 
					 | 
				
			||||||
		if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) {
 | 
					 | 
				
			||||||
			peerMap[update.peerIP] = update
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Apply updates
 | 
					 | 
				
			||||||
	for _, update := range peerMap {
 | 
					 | 
				
			||||||
		if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil {
 | 
					 | 
				
			||||||
			h.logger.Error("Failed to update peer",
 | 
					 | 
				
			||||||
				"peer", update.peerIP,
 | 
					 | 
				
			||||||
				"peer_asn", update.peerASN,
 | 
					 | 
				
			||||||
				"message_type", update.messageType,
 | 
					 | 
				
			||||||
				"error", err,
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Clear batch
 | 
					 | 
				
			||||||
	h.peerBatch = h.peerBatch[:0]
 | 
					 | 
				
			||||||
	h.lastFlush = time.Now()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Stop gracefully stops the handler and flushes remaining batches
 | 
					 | 
				
			||||||
func (h *PeerHandler) Stop() {
 | 
					 | 
				
			||||||
	close(h.stopCh)
 | 
					 | 
				
			||||||
	h.wg.Wait()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										170
									
								
								internal/routewatch/peerhandler_batched.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										170
									
								
								internal/routewatch/peerhandler_batched.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,170 @@
 | 
				
			|||||||
 | 
					package routewatch
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"strconv"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"git.eeqj.de/sneak/routewatch/internal/database"
 | 
				
			||||||
 | 
						"git.eeqj.de/sneak/routewatch/internal/logger"
 | 
				
			||||||
 | 
						"git.eeqj.de/sneak/routewatch/internal/ristypes"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						// batchedPeerHandlerQueueSize is the queue capacity for peer tracking operations
 | 
				
			||||||
 | 
						batchedPeerHandlerQueueSize = 2000
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// peerBatchSize is the number of peer updates to batch together
 | 
				
			||||||
 | 
						peerBatchSize = 500
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// peerBatchTimeout is the maximum time to wait before flushing a batch
 | 
				
			||||||
 | 
						peerBatchTimeout = 5 * time.Second
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// BatchedPeerHandler tracks BGP peers from all message types using batched operations
 | 
				
			||||||
 | 
					type BatchedPeerHandler struct {
 | 
				
			||||||
 | 
						db     database.Store
 | 
				
			||||||
 | 
						logger *logger.Logger
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Batching
 | 
				
			||||||
 | 
						mu        sync.Mutex
 | 
				
			||||||
 | 
						peerBatch []peerUpdate
 | 
				
			||||||
 | 
						lastFlush time.Time
 | 
				
			||||||
 | 
						stopCh    chan struct{}
 | 
				
			||||||
 | 
						wg        sync.WaitGroup
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type peerUpdate struct {
 | 
				
			||||||
 | 
						peerIP      string
 | 
				
			||||||
 | 
						peerASN     int
 | 
				
			||||||
 | 
						messageType string
 | 
				
			||||||
 | 
						timestamp   time.Time
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewBatchedPeerHandler creates a new batched peer tracking handler
 | 
				
			||||||
 | 
					func NewBatchedPeerHandler(db database.Store, logger *logger.Logger) *BatchedPeerHandler {
 | 
				
			||||||
 | 
						h := &BatchedPeerHandler{
 | 
				
			||||||
 | 
							db:        db,
 | 
				
			||||||
 | 
							logger:    logger,
 | 
				
			||||||
 | 
							peerBatch: make([]peerUpdate, 0, peerBatchSize),
 | 
				
			||||||
 | 
							lastFlush: time.Now(),
 | 
				
			||||||
 | 
							stopCh:    make(chan struct{}),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Start the flush timer goroutine
 | 
				
			||||||
 | 
						h.wg.Add(1)
 | 
				
			||||||
 | 
						go h.flushLoop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return h
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// WantsMessage returns true for all message types since we track peers from all messages
 | 
				
			||||||
 | 
					func (h *BatchedPeerHandler) WantsMessage(_ string) bool {
 | 
				
			||||||
 | 
						return true
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// QueueCapacity returns the desired queue capacity for this handler
 | 
				
			||||||
 | 
					func (h *BatchedPeerHandler) QueueCapacity() int {
 | 
				
			||||||
 | 
						// Batching allows us to use a larger queue
 | 
				
			||||||
 | 
						return batchedPeerHandlerQueueSize
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// HandleMessage processes a message to track peer information
 | 
				
			||||||
 | 
					func (h *BatchedPeerHandler) HandleMessage(msg *ristypes.RISMessage) {
 | 
				
			||||||
 | 
						// Parse peer ASN from string
 | 
				
			||||||
 | 
						peerASN := 0
 | 
				
			||||||
 | 
						if msg.PeerASN != "" {
 | 
				
			||||||
 | 
							if asn, err := strconv.Atoi(msg.PeerASN); err == nil {
 | 
				
			||||||
 | 
								peerASN = asn
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						h.mu.Lock()
 | 
				
			||||||
 | 
						defer h.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Add to batch
 | 
				
			||||||
 | 
						h.peerBatch = append(h.peerBatch, peerUpdate{
 | 
				
			||||||
 | 
							peerIP:      msg.Peer,
 | 
				
			||||||
 | 
							peerASN:     peerASN,
 | 
				
			||||||
 | 
							messageType: msg.Type,
 | 
				
			||||||
 | 
							timestamp:   msg.ParsedTimestamp,
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Check if we need to flush
 | 
				
			||||||
 | 
						if len(h.peerBatch) >= peerBatchSize {
 | 
				
			||||||
 | 
							h.flushBatchLocked()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// flushLoop runs in a goroutine and periodically flushes batches
 | 
				
			||||||
 | 
					func (h *BatchedPeerHandler) flushLoop() {
 | 
				
			||||||
 | 
						defer h.wg.Done()
 | 
				
			||||||
 | 
						ticker := time.NewTicker(peerBatchTimeout)
 | 
				
			||||||
 | 
						defer ticker.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case <-ticker.C:
 | 
				
			||||||
 | 
								h.mu.Lock()
 | 
				
			||||||
 | 
								if time.Since(h.lastFlush) >= peerBatchTimeout {
 | 
				
			||||||
 | 
									h.flushBatchLocked()
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								h.mu.Unlock()
 | 
				
			||||||
 | 
							case <-h.stopCh:
 | 
				
			||||||
 | 
								// Final flush
 | 
				
			||||||
 | 
								h.mu.Lock()
 | 
				
			||||||
 | 
								h.flushBatchLocked()
 | 
				
			||||||
 | 
								h.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// flushBatchLocked flushes the peer batch to the database (must be called with mutex held)
 | 
				
			||||||
 | 
					func (h *BatchedPeerHandler) flushBatchLocked() {
 | 
				
			||||||
 | 
						if len(h.peerBatch) == 0 {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						start := time.Now()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Deduplicate by peer IP, keeping the latest update for each peer
 | 
				
			||||||
 | 
						peerMap := make(map[string]peerUpdate)
 | 
				
			||||||
 | 
						for _, update := range h.peerBatch {
 | 
				
			||||||
 | 
							if existing, ok := peerMap[update.peerIP]; !ok || update.timestamp.After(existing.timestamp) {
 | 
				
			||||||
 | 
								peerMap[update.peerIP] = update
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Apply updates
 | 
				
			||||||
 | 
						successCount := 0
 | 
				
			||||||
 | 
						for _, update := range peerMap {
 | 
				
			||||||
 | 
							if err := h.db.UpdatePeer(update.peerIP, update.peerASN, update.messageType, update.timestamp); err != nil {
 | 
				
			||||||
 | 
								h.logger.Error("Failed to update peer",
 | 
				
			||||||
 | 
									"peer", update.peerIP,
 | 
				
			||||||
 | 
									"peer_asn", update.peerASN,
 | 
				
			||||||
 | 
									"message_type", update.messageType,
 | 
				
			||||||
 | 
									"error", err,
 | 
				
			||||||
 | 
								)
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								successCount++
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Clear batch
 | 
				
			||||||
 | 
						h.peerBatch = h.peerBatch[:0]
 | 
				
			||||||
 | 
						h.lastFlush = time.Now()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						h.logger.Debug("Flushed peer batch",
 | 
				
			||||||
 | 
							"duration", time.Since(start),
 | 
				
			||||||
 | 
							"total_updates", len(peerMap),
 | 
				
			||||||
 | 
							"successful", successCount,
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Stop gracefully stops the handler and flushes remaining batches
 | 
				
			||||||
 | 
					func (h *BatchedPeerHandler) Stop() {
 | 
				
			||||||
 | 
						close(h.stopCh)
 | 
				
			||||||
 | 
						h.wg.Wait()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -1,273 +0,0 @@
 | 
				
			|||||||
package routewatch
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
import (
 | 
					 | 
				
			||||||
	"net"
 | 
					 | 
				
			||||||
	"strings"
 | 
					 | 
				
			||||||
	"sync"
 | 
					 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	"git.eeqj.de/sneak/routewatch/internal/database"
 | 
					 | 
				
			||||||
	"git.eeqj.de/sneak/routewatch/internal/logger"
 | 
					 | 
				
			||||||
	"git.eeqj.de/sneak/routewatch/internal/ristypes"
 | 
					 | 
				
			||||||
	"github.com/google/uuid"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
const (
 | 
					 | 
				
			||||||
	// prefixHandlerQueueSize is the queue capacity for prefix tracking operations
 | 
					 | 
				
			||||||
	prefixHandlerQueueSize = 50000
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// prefixBatchSize is the number of prefix updates to batch together
 | 
					 | 
				
			||||||
	prefixBatchSize = 2000
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// prefixBatchTimeout is the maximum time to wait before flushing a batch
 | 
					 | 
				
			||||||
	prefixBatchTimeout = 5 * time.Second
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// IP version constants
 | 
					 | 
				
			||||||
	ipv4Version = 4
 | 
					 | 
				
			||||||
	ipv6Version = 6
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// PrefixHandler tracks BGP prefixes and maintains a live routing table in the database.
 | 
					 | 
				
			||||||
// Routes are added on announcement and deleted on withdrawal.
 | 
					 | 
				
			||||||
type PrefixHandler struct {
 | 
					 | 
				
			||||||
	db     database.Store
 | 
					 | 
				
			||||||
	logger *logger.Logger
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Batching
 | 
					 | 
				
			||||||
	mu        sync.Mutex
 | 
					 | 
				
			||||||
	batch     []prefixUpdate
 | 
					 | 
				
			||||||
	lastFlush time.Time
 | 
					 | 
				
			||||||
	stopCh    chan struct{}
 | 
					 | 
				
			||||||
	wg        sync.WaitGroup
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type prefixUpdate struct {
 | 
					 | 
				
			||||||
	prefix      string
 | 
					 | 
				
			||||||
	originASN   int
 | 
					 | 
				
			||||||
	peer        string
 | 
					 | 
				
			||||||
	messageType string
 | 
					 | 
				
			||||||
	timestamp   time.Time
 | 
					 | 
				
			||||||
	path        []int
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// NewPrefixHandler creates a new batched prefix tracking handler
 | 
					 | 
				
			||||||
func NewPrefixHandler(db database.Store, logger *logger.Logger) *PrefixHandler {
 | 
					 | 
				
			||||||
	h := &PrefixHandler{
 | 
					 | 
				
			||||||
		db:        db,
 | 
					 | 
				
			||||||
		logger:    logger,
 | 
					 | 
				
			||||||
		batch:     make([]prefixUpdate, 0, prefixBatchSize),
 | 
					 | 
				
			||||||
		lastFlush: time.Now(),
 | 
					 | 
				
			||||||
		stopCh:    make(chan struct{}),
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Start the flush timer goroutine
 | 
					 | 
				
			||||||
	h.wg.Add(1)
 | 
					 | 
				
			||||||
	go h.flushLoop()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return h
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// WantsMessage returns true if this handler wants to process messages of the given type
 | 
					 | 
				
			||||||
func (h *PrefixHandler) WantsMessage(messageType string) bool {
 | 
					 | 
				
			||||||
	// We only care about UPDATE messages for the routing table
 | 
					 | 
				
			||||||
	return messageType == "UPDATE"
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// QueueCapacity returns the desired queue capacity for this handler
 | 
					 | 
				
			||||||
func (h *PrefixHandler) QueueCapacity() int {
 | 
					 | 
				
			||||||
	// Batching allows us to use a larger queue
 | 
					 | 
				
			||||||
	return prefixHandlerQueueSize
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// HandleMessage processes a message to track prefix information
 | 
					 | 
				
			||||||
func (h *PrefixHandler) HandleMessage(msg *ristypes.RISMessage) {
 | 
					 | 
				
			||||||
	// Use the pre-parsed timestamp
 | 
					 | 
				
			||||||
	timestamp := msg.ParsedTimestamp
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Get origin ASN from path (last element)
 | 
					 | 
				
			||||||
	var originASN int
 | 
					 | 
				
			||||||
	if len(msg.Path) > 0 {
 | 
					 | 
				
			||||||
		originASN = msg.Path[len(msg.Path)-1]
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	h.mu.Lock()
 | 
					 | 
				
			||||||
	defer h.mu.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Process announcements
 | 
					 | 
				
			||||||
	for _, announcement := range msg.Announcements {
 | 
					 | 
				
			||||||
		for _, prefix := range announcement.Prefixes {
 | 
					 | 
				
			||||||
			h.batch = append(h.batch, prefixUpdate{
 | 
					 | 
				
			||||||
				prefix:      prefix,
 | 
					 | 
				
			||||||
				originASN:   originASN,
 | 
					 | 
				
			||||||
				peer:        msg.Peer,
 | 
					 | 
				
			||||||
				messageType: "announcement",
 | 
					 | 
				
			||||||
				timestamp:   timestamp,
 | 
					 | 
				
			||||||
				path:        msg.Path,
 | 
					 | 
				
			||||||
			})
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Process withdrawals
 | 
					 | 
				
			||||||
	for _, prefix := range msg.Withdrawals {
 | 
					 | 
				
			||||||
		h.batch = append(h.batch, prefixUpdate{
 | 
					 | 
				
			||||||
			prefix:      prefix,
 | 
					 | 
				
			||||||
			originASN:   originASN, // Use the originASN from path if available
 | 
					 | 
				
			||||||
			peer:        msg.Peer,
 | 
					 | 
				
			||||||
			messageType: "withdrawal",
 | 
					 | 
				
			||||||
			timestamp:   timestamp,
 | 
					 | 
				
			||||||
			path:        msg.Path,
 | 
					 | 
				
			||||||
		})
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Check if we need to flush
 | 
					 | 
				
			||||||
	if len(h.batch) >= prefixBatchSize {
 | 
					 | 
				
			||||||
		h.flushBatchLocked()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// flushLoop runs in a goroutine and periodically flushes batches
 | 
					 | 
				
			||||||
func (h *PrefixHandler) flushLoop() {
 | 
					 | 
				
			||||||
	defer h.wg.Done()
 | 
					 | 
				
			||||||
	ticker := time.NewTicker(prefixBatchTimeout)
 | 
					 | 
				
			||||||
	defer ticker.Stop()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for {
 | 
					 | 
				
			||||||
		select {
 | 
					 | 
				
			||||||
		case <-ticker.C:
 | 
					 | 
				
			||||||
			h.mu.Lock()
 | 
					 | 
				
			||||||
			if time.Since(h.lastFlush) >= prefixBatchTimeout {
 | 
					 | 
				
			||||||
				h.flushBatchLocked()
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			h.mu.Unlock()
 | 
					 | 
				
			||||||
		case <-h.stopCh:
 | 
					 | 
				
			||||||
			// Final flush
 | 
					 | 
				
			||||||
			h.mu.Lock()
 | 
					 | 
				
			||||||
			h.flushBatchLocked()
 | 
					 | 
				
			||||||
			h.mu.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			return
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// flushBatchLocked flushes the prefix batch to the database (must be called with mutex held)
 | 
					 | 
				
			||||||
func (h *PrefixHandler) flushBatchLocked() {
 | 
					 | 
				
			||||||
	if len(h.batch) == 0 {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Group updates by prefix to deduplicate
 | 
					 | 
				
			||||||
	// For each prefix, keep the latest update
 | 
					 | 
				
			||||||
	prefixMap := make(map[string]prefixUpdate)
 | 
					 | 
				
			||||||
	for _, update := range h.batch {
 | 
					 | 
				
			||||||
		key := update.prefix
 | 
					 | 
				
			||||||
		if existing, ok := prefixMap[key]; !ok || update.timestamp.After(existing.timestamp) {
 | 
					 | 
				
			||||||
			prefixMap[key] = update
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Apply updates to database
 | 
					 | 
				
			||||||
	for _, update := range prefixMap {
 | 
					 | 
				
			||||||
		// Get or create prefix
 | 
					 | 
				
			||||||
		prefix, err := h.db.GetOrCreatePrefix(update.prefix, update.timestamp)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			h.logger.Error("Failed to get/create prefix",
 | 
					 | 
				
			||||||
				"prefix", update.prefix,
 | 
					 | 
				
			||||||
				"error", err,
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			continue
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// For announcements, get ASN info and create announcement record
 | 
					 | 
				
			||||||
		if update.messageType == "announcement" && update.originASN > 0 {
 | 
					 | 
				
			||||||
			h.processAnnouncement(prefix, update)
 | 
					 | 
				
			||||||
		} else if update.messageType == "withdrawal" {
 | 
					 | 
				
			||||||
			h.processWithdrawal(prefix, update)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Clear batch
 | 
					 | 
				
			||||||
	h.batch = h.batch[:0]
 | 
					 | 
				
			||||||
	h.lastFlush = time.Now()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// parseCIDR extracts the mask length and IP version from a prefix string
 | 
					 | 
				
			||||||
func parseCIDR(prefix string) (maskLength int, ipVersion int, err error) {
 | 
					 | 
				
			||||||
	_, ipNet, err := net.ParseCIDR(prefix)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return 0, 0, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	ones, _ := ipNet.Mask.Size()
 | 
					 | 
				
			||||||
	if strings.Contains(prefix, ":") {
 | 
					 | 
				
			||||||
		return ones, ipv6Version, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return ones, ipv4Version, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// processAnnouncement handles storing an announcement in the database
 | 
					 | 
				
			||||||
func (h *PrefixHandler) processAnnouncement(_ *database.Prefix, update prefixUpdate) {
 | 
					 | 
				
			||||||
	// Parse CIDR to get mask length
 | 
					 | 
				
			||||||
	maskLength, ipVersion, err := parseCIDR(update.prefix)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		h.logger.Error("Failed to parse CIDR",
 | 
					 | 
				
			||||||
			"prefix", update.prefix,
 | 
					 | 
				
			||||||
			"error", err,
 | 
					 | 
				
			||||||
		)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Create live route record
 | 
					 | 
				
			||||||
	liveRoute := &database.LiveRoute{
 | 
					 | 
				
			||||||
		ID:          uuid.New(),
 | 
					 | 
				
			||||||
		Prefix:      update.prefix,
 | 
					 | 
				
			||||||
		MaskLength:  maskLength,
 | 
					 | 
				
			||||||
		IPVersion:   ipVersion,
 | 
					 | 
				
			||||||
		OriginASN:   update.originASN,
 | 
					 | 
				
			||||||
		PeerIP:      update.peer,
 | 
					 | 
				
			||||||
		ASPath:      update.path,
 | 
					 | 
				
			||||||
		NextHop:     update.peer, // Using peer as next hop
 | 
					 | 
				
			||||||
		LastUpdated: update.timestamp,
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err := h.db.UpsertLiveRoute(liveRoute); err != nil {
 | 
					 | 
				
			||||||
		h.logger.Error("Failed to upsert live route",
 | 
					 | 
				
			||||||
			"prefix", update.prefix,
 | 
					 | 
				
			||||||
			"error", err,
 | 
					 | 
				
			||||||
		)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// processWithdrawal handles removing a route from the live routing table
 | 
					 | 
				
			||||||
func (h *PrefixHandler) processWithdrawal(_ *database.Prefix, update prefixUpdate) {
 | 
					 | 
				
			||||||
	// For withdrawals, we need to delete the route from live_routes
 | 
					 | 
				
			||||||
	// Since we have the origin ASN from the update, we can delete the specific route
 | 
					 | 
				
			||||||
	if update.originASN > 0 {
 | 
					 | 
				
			||||||
		if err := h.db.DeleteLiveRoute(update.prefix, update.originASN, update.peer); err != nil {
 | 
					 | 
				
			||||||
			h.logger.Error("Failed to delete live route",
 | 
					 | 
				
			||||||
				"prefix", update.prefix,
 | 
					 | 
				
			||||||
				"origin_asn", update.originASN,
 | 
					 | 
				
			||||||
				"peer", update.peer,
 | 
					 | 
				
			||||||
				"error", err,
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		// If no origin ASN, just delete all routes for this prefix from this peer
 | 
					 | 
				
			||||||
		if err := h.db.DeleteLiveRoute(update.prefix, 0, update.peer); err != nil {
 | 
					 | 
				
			||||||
			h.logger.Error("Failed to delete live route (no origin ASN)",
 | 
					 | 
				
			||||||
				"prefix", update.prefix,
 | 
					 | 
				
			||||||
				"peer", update.peer,
 | 
					 | 
				
			||||||
				"error", err,
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Stop gracefully stops the handler and flushes remaining batches
 | 
					 | 
				
			||||||
func (h *PrefixHandler) Stop() {
 | 
					 | 
				
			||||||
	close(h.stopCh)
 | 
					 | 
				
			||||||
	h.wg.Wait()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
@ -64,26 +64,18 @@ type RoutingTable struct {
 | 
				
			|||||||
	lastMetricsReset time.Time
 | 
						lastMetricsReset time.Time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Configuration
 | 
						// Configuration
 | 
				
			||||||
	snapshotDir            string
 | 
						snapshotDir string
 | 
				
			||||||
	routeExpirationTimeout time.Duration
 | 
					 | 
				
			||||||
	logger                 *logger.Logger
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Expiration management
 | 
					 | 
				
			||||||
	stopExpiration chan struct{}
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// New creates a new routing table, loading from snapshot if available
 | 
					// New creates a new routing table, loading from snapshot if available
 | 
				
			||||||
func New(cfg *config.Config, logger *logger.Logger) *RoutingTable {
 | 
					func New(cfg *config.Config, logger *logger.Logger) *RoutingTable {
 | 
				
			||||||
	rt := &RoutingTable{
 | 
						rt := &RoutingTable{
 | 
				
			||||||
		routes:                 make(map[RouteKey]*Route),
 | 
							routes:           make(map[RouteKey]*Route),
 | 
				
			||||||
		byPrefix:               make(map[uuid.UUID]map[RouteKey]*Route),
 | 
							byPrefix:         make(map[uuid.UUID]map[RouteKey]*Route),
 | 
				
			||||||
		byOriginASN:            make(map[uuid.UUID]map[RouteKey]*Route),
 | 
							byOriginASN:      make(map[uuid.UUID]map[RouteKey]*Route),
 | 
				
			||||||
		byPeerASN:              make(map[int]map[RouteKey]*Route),
 | 
							byPeerASN:        make(map[int]map[RouteKey]*Route),
 | 
				
			||||||
		lastMetricsReset:       time.Now(),
 | 
							lastMetricsReset: time.Now(),
 | 
				
			||||||
		snapshotDir:            cfg.GetStateDir(),
 | 
							snapshotDir:      cfg.GetStateDir(),
 | 
				
			||||||
		routeExpirationTimeout: cfg.RouteExpirationTimeout,
 | 
					 | 
				
			||||||
		logger:                 logger,
 | 
					 | 
				
			||||||
		stopExpiration:         make(chan struct{}),
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Try to load from snapshot
 | 
						// Try to load from snapshot
 | 
				
			||||||
@ -91,9 +83,6 @@ func New(cfg *config.Config, logger *logger.Logger) *RoutingTable {
 | 
				
			|||||||
		logger.Warn("Failed to load routing table from snapshot", "error", err)
 | 
							logger.Warn("Failed to load routing table from snapshot", "error", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Start expiration goroutine
 | 
					 | 
				
			||||||
	go rt.expireRoutesLoop()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return rt
 | 
						return rt
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -533,72 +522,3 @@ func (rt *RoutingTable) loadFromSnapshot(logger *logger.Logger) error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
// expireRoutesLoop periodically removes expired routes
 | 
					 | 
				
			||||||
func (rt *RoutingTable) expireRoutesLoop() {
 | 
					 | 
				
			||||||
	// Run every minute to check for expired routes
 | 
					 | 
				
			||||||
	ticker := time.NewTicker(1 * time.Minute)
 | 
					 | 
				
			||||||
	defer ticker.Stop()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for {
 | 
					 | 
				
			||||||
		select {
 | 
					 | 
				
			||||||
		case <-ticker.C:
 | 
					 | 
				
			||||||
			rt.expireStaleRoutes()
 | 
					 | 
				
			||||||
		case <-rt.stopExpiration:
 | 
					 | 
				
			||||||
			return
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// expireStaleRoutes removes routes that haven't been updated recently
 | 
					 | 
				
			||||||
func (rt *RoutingTable) expireStaleRoutes() {
 | 
					 | 
				
			||||||
	rt.mu.Lock()
 | 
					 | 
				
			||||||
	defer rt.mu.Unlock()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	now := time.Now().UTC()
 | 
					 | 
				
			||||||
	cutoffTime := now.Add(-rt.routeExpirationTimeout)
 | 
					 | 
				
			||||||
	expiredCount := 0
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Collect keys to delete (can't delete while iterating)
 | 
					 | 
				
			||||||
	var keysToDelete []RouteKey
 | 
					 | 
				
			||||||
	for key, route := range rt.routes {
 | 
					 | 
				
			||||||
		// Use AnnouncedAt as the last update time
 | 
					 | 
				
			||||||
		if route.AnnouncedAt.Before(cutoffTime) {
 | 
					 | 
				
			||||||
			keysToDelete = append(keysToDelete, key)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Delete expired routes
 | 
					 | 
				
			||||||
	for _, key := range keysToDelete {
 | 
					 | 
				
			||||||
		route, exists := rt.routes[key]
 | 
					 | 
				
			||||||
		if !exists {
 | 
					 | 
				
			||||||
			continue
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		rt.removeFromIndexes(key, route)
 | 
					 | 
				
			||||||
		delete(rt.routes, key)
 | 
					 | 
				
			||||||
		expiredCount++
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// Update metrics
 | 
					 | 
				
			||||||
		if isIPv6(route.Prefix) {
 | 
					 | 
				
			||||||
			rt.ipv6Routes--
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			rt.ipv4Routes--
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if expiredCount > 0 {
 | 
					 | 
				
			||||||
		rt.logger.Info("Expired stale routes",
 | 
					 | 
				
			||||||
			"count", expiredCount,
 | 
					 | 
				
			||||||
			"timeout", rt.routeExpirationTimeout,
 | 
					 | 
				
			||||||
			"remaining_routes", len(rt.routes),
 | 
					 | 
				
			||||||
		)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Stop gracefully stops the routing table background tasks
 | 
					 | 
				
			||||||
func (rt *RoutingTable) Stop() {
 | 
					 | 
				
			||||||
	if rt.stopExpiration != nil {
 | 
					 | 
				
			||||||
		close(rt.stopExpiration)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
				
			|||||||
@ -114,25 +114,23 @@ func (s *Server) handleRoot() http.HandlerFunc {
 | 
				
			|||||||
func (s *Server) handleStatusJSON() http.HandlerFunc {
 | 
					func (s *Server) handleStatusJSON() http.HandlerFunc {
 | 
				
			||||||
	// Stats represents the statistics response
 | 
						// Stats represents the statistics response
 | 
				
			||||||
	type Stats struct {
 | 
						type Stats struct {
 | 
				
			||||||
		Uptime                 string                        `json:"uptime"`
 | 
							Uptime            string  `json:"uptime"`
 | 
				
			||||||
		TotalMessages          uint64                        `json:"total_messages"`
 | 
							TotalMessages     uint64  `json:"total_messages"`
 | 
				
			||||||
		TotalBytes             uint64                        `json:"total_bytes"`
 | 
							TotalBytes        uint64  `json:"total_bytes"`
 | 
				
			||||||
		MessagesPerSec         float64                       `json:"messages_per_sec"`
 | 
							MessagesPerSec    float64 `json:"messages_per_sec"`
 | 
				
			||||||
		MbitsPerSec            float64                       `json:"mbits_per_sec"`
 | 
							MbitsPerSec       float64 `json:"mbits_per_sec"`
 | 
				
			||||||
		Connected              bool                          `json:"connected"`
 | 
							Connected         bool    `json:"connected"`
 | 
				
			||||||
		ASNs                   int                           `json:"asns"`
 | 
							ASNs              int     `json:"asns"`
 | 
				
			||||||
		Prefixes               int                           `json:"prefixes"`
 | 
							Prefixes          int     `json:"prefixes"`
 | 
				
			||||||
		IPv4Prefixes           int                           `json:"ipv4_prefixes"`
 | 
							IPv4Prefixes      int     `json:"ipv4_prefixes"`
 | 
				
			||||||
		IPv6Prefixes           int                           `json:"ipv6_prefixes"`
 | 
							IPv6Prefixes      int     `json:"ipv6_prefixes"`
 | 
				
			||||||
		Peerings               int                           `json:"peerings"`
 | 
							Peerings          int     `json:"peerings"`
 | 
				
			||||||
		DatabaseSizeBytes      int64                         `json:"database_size_bytes"`
 | 
							DatabaseSizeBytes int64   `json:"database_size_bytes"`
 | 
				
			||||||
		LiveRoutes             int                           `json:"live_routes"`
 | 
							LiveRoutes        int     `json:"live_routes"`
 | 
				
			||||||
		IPv4Routes             int                           `json:"ipv4_routes"`
 | 
							IPv4Routes        int     `json:"ipv4_routes"`
 | 
				
			||||||
		IPv6Routes             int                           `json:"ipv6_routes"`
 | 
							IPv6Routes        int     `json:"ipv6_routes"`
 | 
				
			||||||
		IPv4UpdatesPerSec      float64                       `json:"ipv4_updates_per_sec"`
 | 
							IPv4UpdatesPerSec float64 `json:"ipv4_updates_per_sec"`
 | 
				
			||||||
		IPv6UpdatesPerSec      float64                       `json:"ipv6_updates_per_sec"`
 | 
							IPv6UpdatesPerSec float64 `json:"ipv6_updates_per_sec"`
 | 
				
			||||||
		IPv4PrefixDistribution []database.PrefixDistribution `json:"ipv4_prefix_distribution"`
 | 
					 | 
				
			||||||
		IPv6PrefixDistribution []database.PrefixDistribution `json:"ipv6_prefix_distribution"`
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return func(w http.ResponseWriter, r *http.Request) {
 | 
						return func(w http.ResponseWriter, r *http.Request) {
 | 
				
			||||||
@ -147,6 +145,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
 | 
				
			|||||||
		errChan := make(chan error)
 | 
							errChan := make(chan error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		go func() {
 | 
							go func() {
 | 
				
			||||||
 | 
								s.logger.Debug("Starting database stats query")
 | 
				
			||||||
			dbStats, err := s.db.GetStats()
 | 
								dbStats, err := s.db.GetStats()
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				s.logger.Debug("Database stats query failed", "error", err)
 | 
									s.logger.Debug("Database stats query failed", "error", err)
 | 
				
			||||||
@ -154,6 +153,7 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
				return
 | 
									return
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
								s.logger.Debug("Database stats query completed")
 | 
				
			||||||
			statsChan <- dbStats
 | 
								statsChan <- dbStats
 | 
				
			||||||
		}()
 | 
							}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -200,25 +200,23 @@ func (s *Server) handleStatusJSON() http.HandlerFunc {
 | 
				
			|||||||
		rtStats := s.routingTable.GetDetailedStats()
 | 
							rtStats := s.routingTable.GetDetailedStats()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		stats := Stats{
 | 
							stats := Stats{
 | 
				
			||||||
			Uptime:                 uptime,
 | 
								Uptime:            uptime,
 | 
				
			||||||
			TotalMessages:          metrics.TotalMessages,
 | 
								TotalMessages:     metrics.TotalMessages,
 | 
				
			||||||
			TotalBytes:             metrics.TotalBytes,
 | 
								TotalBytes:        metrics.TotalBytes,
 | 
				
			||||||
			MessagesPerSec:         metrics.MessagesPerSec,
 | 
								MessagesPerSec:    metrics.MessagesPerSec,
 | 
				
			||||||
			MbitsPerSec:            metrics.BitsPerSec / bitsPerMegabit,
 | 
								MbitsPerSec:       metrics.BitsPerSec / bitsPerMegabit,
 | 
				
			||||||
			Connected:              metrics.Connected,
 | 
								Connected:         metrics.Connected,
 | 
				
			||||||
			ASNs:                   dbStats.ASNs,
 | 
								ASNs:              dbStats.ASNs,
 | 
				
			||||||
			Prefixes:               dbStats.Prefixes,
 | 
								Prefixes:          dbStats.Prefixes,
 | 
				
			||||||
			IPv4Prefixes:           dbStats.IPv4Prefixes,
 | 
								IPv4Prefixes:      dbStats.IPv4Prefixes,
 | 
				
			||||||
			IPv6Prefixes:           dbStats.IPv6Prefixes,
 | 
								IPv6Prefixes:      dbStats.IPv6Prefixes,
 | 
				
			||||||
			Peerings:               dbStats.Peerings,
 | 
								Peerings:          dbStats.Peerings,
 | 
				
			||||||
			DatabaseSizeBytes:      dbStats.FileSizeBytes,
 | 
								DatabaseSizeBytes: dbStats.FileSizeBytes,
 | 
				
			||||||
			LiveRoutes:             dbStats.LiveRoutes,
 | 
								LiveRoutes:        rtStats.TotalRoutes,
 | 
				
			||||||
			IPv4Routes:             rtStats.IPv4Routes,
 | 
								IPv4Routes:        rtStats.IPv4Routes,
 | 
				
			||||||
			IPv6Routes:             rtStats.IPv6Routes,
 | 
								IPv6Routes:        rtStats.IPv6Routes,
 | 
				
			||||||
			IPv4UpdatesPerSec:      rtStats.IPv4UpdatesRate,
 | 
								IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate,
 | 
				
			||||||
			IPv6UpdatesPerSec:      rtStats.IPv6UpdatesRate,
 | 
								IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate,
 | 
				
			||||||
			IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution,
 | 
					 | 
				
			||||||
			IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution,
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		w.Header().Set("Content-Type", "application/json")
 | 
							w.Header().Set("Content-Type", "application/json")
 | 
				
			||||||
@ -248,26 +246,24 @@ func (s *Server) handleStats() http.HandlerFunc {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// StatsResponse represents the API statistics response
 | 
						// StatsResponse represents the API statistics response
 | 
				
			||||||
	type StatsResponse struct {
 | 
						type StatsResponse struct {
 | 
				
			||||||
		Uptime                 string                        `json:"uptime"`
 | 
							Uptime            string             `json:"uptime"`
 | 
				
			||||||
		TotalMessages          uint64                        `json:"total_messages"`
 | 
							TotalMessages     uint64             `json:"total_messages"`
 | 
				
			||||||
		TotalBytes             uint64                        `json:"total_bytes"`
 | 
							TotalBytes        uint64             `json:"total_bytes"`
 | 
				
			||||||
		MessagesPerSec         float64                       `json:"messages_per_sec"`
 | 
							MessagesPerSec    float64            `json:"messages_per_sec"`
 | 
				
			||||||
		MbitsPerSec            float64                       `json:"mbits_per_sec"`
 | 
							MbitsPerSec       float64            `json:"mbits_per_sec"`
 | 
				
			||||||
		Connected              bool                          `json:"connected"`
 | 
							Connected         bool               `json:"connected"`
 | 
				
			||||||
		ASNs                   int                           `json:"asns"`
 | 
							ASNs              int                `json:"asns"`
 | 
				
			||||||
		Prefixes               int                           `json:"prefixes"`
 | 
							Prefixes          int                `json:"prefixes"`
 | 
				
			||||||
		IPv4Prefixes           int                           `json:"ipv4_prefixes"`
 | 
							IPv4Prefixes      int                `json:"ipv4_prefixes"`
 | 
				
			||||||
		IPv6Prefixes           int                           `json:"ipv6_prefixes"`
 | 
							IPv6Prefixes      int                `json:"ipv6_prefixes"`
 | 
				
			||||||
		Peerings               int                           `json:"peerings"`
 | 
							Peerings          int                `json:"peerings"`
 | 
				
			||||||
		DatabaseSizeBytes      int64                         `json:"database_size_bytes"`
 | 
							DatabaseSizeBytes int64              `json:"database_size_bytes"`
 | 
				
			||||||
		LiveRoutes             int                           `json:"live_routes"`
 | 
							LiveRoutes        int                `json:"live_routes"`
 | 
				
			||||||
		IPv4Routes             int                           `json:"ipv4_routes"`
 | 
							IPv4Routes        int                `json:"ipv4_routes"`
 | 
				
			||||||
		IPv6Routes             int                           `json:"ipv6_routes"`
 | 
							IPv6Routes        int                `json:"ipv6_routes"`
 | 
				
			||||||
		IPv4UpdatesPerSec      float64                       `json:"ipv4_updates_per_sec"`
 | 
							IPv4UpdatesPerSec float64            `json:"ipv4_updates_per_sec"`
 | 
				
			||||||
		IPv6UpdatesPerSec      float64                       `json:"ipv6_updates_per_sec"`
 | 
							IPv6UpdatesPerSec float64            `json:"ipv6_updates_per_sec"`
 | 
				
			||||||
		HandlerStats           []HandlerStatsInfo            `json:"handler_stats"`
 | 
							HandlerStats      []HandlerStatsInfo `json:"handler_stats"`
 | 
				
			||||||
		IPv4PrefixDistribution []database.PrefixDistribution `json:"ipv4_prefix_distribution"`
 | 
					 | 
				
			||||||
		IPv6PrefixDistribution []database.PrefixDistribution `json:"ipv6_prefix_distribution"`
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return func(w http.ResponseWriter, r *http.Request) {
 | 
						return func(w http.ResponseWriter, r *http.Request) {
 | 
				
			||||||
@ -291,6 +287,7 @@ func (s *Server) handleStats() http.HandlerFunc {
 | 
				
			|||||||
		errChan := make(chan error)
 | 
							errChan := make(chan error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		go func() {
 | 
							go func() {
 | 
				
			||||||
 | 
								s.logger.Debug("Starting database stats query")
 | 
				
			||||||
			dbStats, err := s.db.GetStats()
 | 
								dbStats, err := s.db.GetStats()
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				s.logger.Debug("Database stats query failed", "error", err)
 | 
									s.logger.Debug("Database stats query failed", "error", err)
 | 
				
			||||||
@ -298,6 +295,7 @@ func (s *Server) handleStats() http.HandlerFunc {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
				return
 | 
									return
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
								s.logger.Debug("Database stats query completed")
 | 
				
			||||||
			statsChan <- dbStats
 | 
								statsChan <- dbStats
 | 
				
			||||||
		}()
 | 
							}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -345,26 +343,24 @@ func (s *Server) handleStats() http.HandlerFunc {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		stats := StatsResponse{
 | 
							stats := StatsResponse{
 | 
				
			||||||
			Uptime:                 uptime,
 | 
								Uptime:            uptime,
 | 
				
			||||||
			TotalMessages:          metrics.TotalMessages,
 | 
								TotalMessages:     metrics.TotalMessages,
 | 
				
			||||||
			TotalBytes:             metrics.TotalBytes,
 | 
								TotalBytes:        metrics.TotalBytes,
 | 
				
			||||||
			MessagesPerSec:         metrics.MessagesPerSec,
 | 
								MessagesPerSec:    metrics.MessagesPerSec,
 | 
				
			||||||
			MbitsPerSec:            metrics.BitsPerSec / bitsPerMegabit,
 | 
								MbitsPerSec:       metrics.BitsPerSec / bitsPerMegabit,
 | 
				
			||||||
			Connected:              metrics.Connected,
 | 
								Connected:         metrics.Connected,
 | 
				
			||||||
			ASNs:                   dbStats.ASNs,
 | 
								ASNs:              dbStats.ASNs,
 | 
				
			||||||
			Prefixes:               dbStats.Prefixes,
 | 
								Prefixes:          dbStats.Prefixes,
 | 
				
			||||||
			IPv4Prefixes:           dbStats.IPv4Prefixes,
 | 
								IPv4Prefixes:      dbStats.IPv4Prefixes,
 | 
				
			||||||
			IPv6Prefixes:           dbStats.IPv6Prefixes,
 | 
								IPv6Prefixes:      dbStats.IPv6Prefixes,
 | 
				
			||||||
			Peerings:               dbStats.Peerings,
 | 
								Peerings:          dbStats.Peerings,
 | 
				
			||||||
			DatabaseSizeBytes:      dbStats.FileSizeBytes,
 | 
								DatabaseSizeBytes: dbStats.FileSizeBytes,
 | 
				
			||||||
			LiveRoutes:             dbStats.LiveRoutes,
 | 
								LiveRoutes:        rtStats.TotalRoutes,
 | 
				
			||||||
			IPv4Routes:             rtStats.IPv4Routes,
 | 
								IPv4Routes:        rtStats.IPv4Routes,
 | 
				
			||||||
			IPv6Routes:             rtStats.IPv6Routes,
 | 
								IPv6Routes:        rtStats.IPv6Routes,
 | 
				
			||||||
			IPv4UpdatesPerSec:      rtStats.IPv4UpdatesRate,
 | 
								IPv4UpdatesPerSec: rtStats.IPv4UpdatesRate,
 | 
				
			||||||
			IPv6UpdatesPerSec:      rtStats.IPv6UpdatesRate,
 | 
								IPv6UpdatesPerSec: rtStats.IPv6UpdatesRate,
 | 
				
			||||||
			HandlerStats:           handlerStatsInfo,
 | 
								HandlerStats:      handlerStatsInfo,
 | 
				
			||||||
			IPv4PrefixDistribution: dbStats.IPv4PrefixDistribution,
 | 
					 | 
				
			||||||
			IPv6PrefixDistribution: dbStats.IPv6PrefixDistribution,
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		w.Header().Set("Content-Type", "application/json")
 | 
							w.Header().Set("Content-Type", "application/json")
 | 
				
			||||||
 | 
				
			|||||||
@ -153,22 +153,6 @@
 | 
				
			|||||||
        </div>
 | 
					        </div>
 | 
				
			||||||
    </div>
 | 
					    </div>
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
    <div class="status-grid">
 | 
					 | 
				
			||||||
        <div class="status-card">
 | 
					 | 
				
			||||||
            <h2>IPv4 Prefix Distribution</h2>
 | 
					 | 
				
			||||||
            <div id="ipv4-prefix-distribution">
 | 
					 | 
				
			||||||
                <!-- Will be populated dynamically -->
 | 
					 | 
				
			||||||
            </div>
 | 
					 | 
				
			||||||
        </div>
 | 
					 | 
				
			||||||
        
 | 
					 | 
				
			||||||
        <div class="status-card">
 | 
					 | 
				
			||||||
            <h2>IPv6 Prefix Distribution</h2>
 | 
					 | 
				
			||||||
            <div id="ipv6-prefix-distribution">
 | 
					 | 
				
			||||||
                <!-- Will be populated dynamically -->
 | 
					 | 
				
			||||||
            </div>
 | 
					 | 
				
			||||||
        </div>
 | 
					 | 
				
			||||||
    </div>
 | 
					 | 
				
			||||||
    
 | 
					 | 
				
			||||||
    <div id="handler-stats-container" class="status-grid">
 | 
					    <div id="handler-stats-container" class="status-grid">
 | 
				
			||||||
        <!-- Handler stats will be dynamically added here -->
 | 
					        <!-- Handler stats will be dynamically added here -->
 | 
				
			||||||
    </div>
 | 
					    </div>
 | 
				
			||||||
@ -186,29 +170,6 @@
 | 
				
			|||||||
            return num.toLocaleString();
 | 
					            return num.toLocaleString();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        function updatePrefixDistribution(elementId, distribution) {
 | 
					 | 
				
			||||||
            const container = document.getElementById(elementId);
 | 
					 | 
				
			||||||
            container.innerHTML = '';
 | 
					 | 
				
			||||||
            
 | 
					 | 
				
			||||||
            if (!distribution || distribution.length === 0) {
 | 
					 | 
				
			||||||
                container.innerHTML = '<div class="metric"><span class="metric-label">No data</span></div>';
 | 
					 | 
				
			||||||
                return;
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            
 | 
					 | 
				
			||||||
            // Sort by mask length
 | 
					 | 
				
			||||||
            distribution.sort((a, b) => a.mask_length - b.mask_length);
 | 
					 | 
				
			||||||
            
 | 
					 | 
				
			||||||
            distribution.forEach(item => {
 | 
					 | 
				
			||||||
                const metric = document.createElement('div');
 | 
					 | 
				
			||||||
                metric.className = 'metric';
 | 
					 | 
				
			||||||
                metric.innerHTML = `
 | 
					 | 
				
			||||||
                    <span class="metric-label">/${item.mask_length}</span>
 | 
					 | 
				
			||||||
                    <span class="metric-value">${formatNumber(item.count)}</span>
 | 
					 | 
				
			||||||
                `;
 | 
					 | 
				
			||||||
                container.appendChild(metric);
 | 
					 | 
				
			||||||
            });
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        
 | 
					 | 
				
			||||||
        function updateHandlerStats(handlerStats) {
 | 
					        function updateHandlerStats(handlerStats) {
 | 
				
			||||||
            const container = document.getElementById('handler-stats-container');
 | 
					            const container = document.getElementById('handler-stats-container');
 | 
				
			||||||
            container.innerHTML = '';
 | 
					            container.innerHTML = '';
 | 
				
			||||||
@ -288,10 +249,6 @@
 | 
				
			|||||||
                    // Update handler stats
 | 
					                    // Update handler stats
 | 
				
			||||||
                    updateHandlerStats(data.handler_stats || []);
 | 
					                    updateHandlerStats(data.handler_stats || []);
 | 
				
			||||||
                    
 | 
					                    
 | 
				
			||||||
                    // Update prefix distribution
 | 
					 | 
				
			||||||
                    updatePrefixDistribution('ipv4-prefix-distribution', data.ipv4_prefix_distribution);
 | 
					 | 
				
			||||||
                    updatePrefixDistribution('ipv6-prefix-distribution', data.ipv6_prefix_distribution);
 | 
					 | 
				
			||||||
                    
 | 
					 | 
				
			||||||
                    // Clear any errors
 | 
					                    // Clear any errors
 | 
				
			||||||
                    document.getElementById('error').style.display = 'none';
 | 
					                    document.getElementById('error').style.display = 'none';
 | 
				
			||||||
                })
 | 
					                })
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user