Simplify peerings table to store AS numbers directly
- Rename asn_peerings table to peerings - Change columns from from_asn_id/to_asn_id to as_a/as_b (integers) - Remove foreign key constraints to asns table - Update RecordPeering to use AS numbers directly - Add validation in RecordPeering to ensure: - Both AS numbers are > 0 - AS numbers are different - as_a is always lower than as_b (normalized) - Update PeeringHandler to no longer need ASN cache - Simplify the code by removing unnecessary ASN lookups
This commit is contained in:
parent
1157003db7
commit
54bb0ba1cb
@ -254,7 +254,20 @@ func (d *Database) RecordAnnouncement(announcement *Announcement) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RecordPeering records a peering relationship between two ASNs.
|
// RecordPeering records a peering relationship between two ASNs.
|
||||||
func (d *Database) RecordPeering(fromASNID, toASNID string, timestamp time.Time) error {
|
func (d *Database) RecordPeering(asA, asB int, timestamp time.Time) error {
|
||||||
|
// Validate ASNs
|
||||||
|
if asA <= 0 || asB <= 0 {
|
||||||
|
return fmt.Errorf("invalid ASN: asA=%d, asB=%d", asA, asB)
|
||||||
|
}
|
||||||
|
if asA == asB {
|
||||||
|
return fmt.Errorf("cannot create peering with same ASN: %d", asA)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normalize: ensure asA < asB
|
||||||
|
if asA > asB {
|
||||||
|
asA, asB = asB, asA
|
||||||
|
}
|
||||||
|
|
||||||
tx, err := d.beginTx()
|
tx, err := d.beginTx()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -266,20 +279,20 @@ func (d *Database) RecordPeering(fromASNID, toASNID string, timestamp time.Time)
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
var exists bool
|
var exists bool
|
||||||
err = tx.QueryRow("SELECT EXISTS(SELECT 1 FROM asn_peerings WHERE from_asn_id = ? AND to_asn_id = ?)",
|
err = tx.QueryRow("SELECT EXISTS(SELECT 1 FROM peerings WHERE as_a = ? AND as_b = ?)",
|
||||||
fromASNID, toASNID).Scan(&exists)
|
asA, asB).Scan(&exists)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if exists {
|
if exists {
|
||||||
_, err = tx.Exec("UPDATE asn_peerings SET last_seen = ? WHERE from_asn_id = ? AND to_asn_id = ?",
|
_, err = tx.Exec("UPDATE peerings SET last_seen = ? WHERE as_a = ? AND as_b = ?",
|
||||||
timestamp, fromASNID, toASNID)
|
timestamp, asA, asB)
|
||||||
} else {
|
} else {
|
||||||
_, err = tx.Exec(`
|
_, err = tx.Exec(`
|
||||||
INSERT INTO asn_peerings (id, from_asn_id, to_asn_id, first_seen, last_seen)
|
INSERT INTO peerings (id, as_a, as_b, first_seen, last_seen)
|
||||||
VALUES (?, ?, ?, ?, ?)`,
|
VALUES (?, ?, ?, ?, ?)`,
|
||||||
generateUUID().String(), fromASNID, toASNID, timestamp, timestamp)
|
generateUUID().String(), asA, asB, timestamp, timestamp)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -287,8 +300,8 @@ func (d *Database) RecordPeering(fromASNID, toASNID string, timestamp time.Time)
|
|||||||
|
|
||||||
if err = tx.Commit(); err != nil {
|
if err = tx.Commit(); err != nil {
|
||||||
d.logger.Error("Failed to commit transaction for peering",
|
d.logger.Error("Failed to commit transaction for peering",
|
||||||
"from_asn_id", fromASNID,
|
"as_a", asA,
|
||||||
"to_asn_id", toASNID,
|
"as_b", asB,
|
||||||
"error", err,
|
"error", err,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -374,7 +387,7 @@ func (d *Database) GetStats() (Stats, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Count peerings
|
// Count peerings
|
||||||
err = d.queryRow("SELECT COUNT(*) FROM asn_peerings").Scan(&stats.Peerings)
|
err = d.queryRow("SELECT COUNT(*) FROM peerings").Scan(&stats.Peerings)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return stats, err
|
return stats, err
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ type Store interface {
|
|||||||
RecordAnnouncement(announcement *Announcement) error
|
RecordAnnouncement(announcement *Announcement) error
|
||||||
|
|
||||||
// Peering operations
|
// Peering operations
|
||||||
RecordPeering(fromASNID, toASNID string, timestamp time.Time) error
|
RecordPeering(asA, asB int, timestamp time.Time) error
|
||||||
|
|
||||||
// Statistics
|
// Statistics
|
||||||
GetStats() (Stats, error)
|
GetStats() (Stats, error)
|
||||||
|
@ -29,15 +29,13 @@ CREATE TABLE IF NOT EXISTS announcements (
|
|||||||
FOREIGN KEY (origin_asn_id) REFERENCES asns(id)
|
FOREIGN KEY (origin_asn_id) REFERENCES asns(id)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS asn_peerings (
|
CREATE TABLE IF NOT EXISTS peerings (
|
||||||
id TEXT PRIMARY KEY,
|
id TEXT PRIMARY KEY,
|
||||||
from_asn_id TEXT NOT NULL,
|
as_a INTEGER NOT NULL,
|
||||||
to_asn_id TEXT NOT NULL,
|
as_b INTEGER NOT NULL,
|
||||||
first_seen DATETIME NOT NULL,
|
first_seen DATETIME NOT NULL,
|
||||||
last_seen DATETIME NOT NULL,
|
last_seen DATETIME NOT NULL,
|
||||||
FOREIGN KEY (from_asn_id) REFERENCES asns(id),
|
UNIQUE(as_a, as_b)
|
||||||
FOREIGN KEY (to_asn_id) REFERENCES asns(id),
|
|
||||||
UNIQUE(from_asn_id, to_asn_id)
|
|
||||||
);
|
);
|
||||||
|
|
||||||
-- BGP peers that send us messages
|
-- BGP peers that send us messages
|
||||||
@ -55,9 +53,9 @@ CREATE INDEX IF NOT EXISTS idx_prefixes_version_prefix ON prefixes(ip_version, p
|
|||||||
CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp);
|
CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp);
|
||||||
CREATE INDEX IF NOT EXISTS idx_announcements_prefix_id ON announcements(prefix_id);
|
CREATE INDEX IF NOT EXISTS idx_announcements_prefix_id ON announcements(prefix_id);
|
||||||
CREATE INDEX IF NOT EXISTS idx_announcements_asn_id ON announcements(asn_id);
|
CREATE INDEX IF NOT EXISTS idx_announcements_asn_id ON announcements(asn_id);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asn_peerings_from_asn ON asn_peerings(from_asn_id);
|
CREATE INDEX IF NOT EXISTS idx_peerings_as_a ON peerings(as_a);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asn_peerings_to_asn ON asn_peerings(to_asn_id);
|
CREATE INDEX IF NOT EXISTS idx_peerings_as_b ON peerings(as_b);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asn_peerings_lookup ON asn_peerings(from_asn_id, to_asn_id);
|
CREATE INDEX IF NOT EXISTS idx_peerings_lookup ON peerings(as_a, as_b);
|
||||||
|
|
||||||
-- Additional indexes for prefixes table
|
-- Additional indexes for prefixes table
|
||||||
CREATE INDEX IF NOT EXISTS idx_prefixes_prefix ON prefixes(prefix);
|
CREATE INDEX IF NOT EXISTS idx_prefixes_prefix ON prefixes(prefix);
|
||||||
|
@ -2,6 +2,7 @@ package routewatch
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
@ -119,11 +120,16 @@ func (m *mockStore) RecordAnnouncement(_ *database.Announcement) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RecordPeering mock implementation
|
// RecordPeering mock implementation
|
||||||
func (m *mockStore) RecordPeering(fromASNID, toASNID string, _ time.Time) error {
|
func (m *mockStore) RecordPeering(asA, asB int, _ time.Time) error {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
key := fromASNID + "_" + toASNID
|
// Normalize
|
||||||
|
if asA > asB {
|
||||||
|
asA, asB = asB, asA
|
||||||
|
}
|
||||||
|
|
||||||
|
key := fmt.Sprintf("%d_%d", asA, asB)
|
||||||
if !m.Peerings[key] {
|
if !m.Peerings[key] {
|
||||||
m.Peerings[key] = true
|
m.Peerings[key] = true
|
||||||
m.PeeringCount++
|
m.PeeringCount++
|
||||||
|
@ -33,9 +33,8 @@ type PeeringHandler struct {
|
|||||||
logger *logger.Logger
|
logger *logger.Logger
|
||||||
|
|
||||||
// In-memory AS path tracking
|
// In-memory AS path tracking
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
asPaths map[string]time.Time // key is JSON-encoded AS path
|
asPaths map[string]time.Time // key is JSON-encoded AS path
|
||||||
asnCache map[int]*database.ASN
|
|
||||||
|
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
}
|
}
|
||||||
@ -43,11 +42,10 @@ type PeeringHandler struct {
|
|||||||
// NewPeeringHandler creates a new batched peering handler
|
// NewPeeringHandler creates a new batched peering handler
|
||||||
func NewPeeringHandler(db database.Store, logger *logger.Logger) *PeeringHandler {
|
func NewPeeringHandler(db database.Store, logger *logger.Logger) *PeeringHandler {
|
||||||
h := &PeeringHandler{
|
h := &PeeringHandler{
|
||||||
db: db,
|
db: db,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
asPaths: make(map[string]time.Time),
|
asPaths: make(map[string]time.Time),
|
||||||
asnCache: make(map[int]*database.ASN),
|
stopCh: make(chan struct{}),
|
||||||
stopCh: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the periodic processing goroutines
|
// Start the periodic processing goroutines
|
||||||
@ -167,7 +165,6 @@ func (h *PeeringHandler) processPeerings() {
|
|||||||
low, high int
|
low, high int
|
||||||
}
|
}
|
||||||
peerings := make(map[peeringKey]time.Time)
|
peerings := make(map[peeringKey]time.Time)
|
||||||
uniqueASNs := make(map[int]struct{})
|
|
||||||
|
|
||||||
for pathJSON, timestamp := range pathsCopy {
|
for pathJSON, timestamp := range pathsCopy {
|
||||||
var path []int
|
var path []int
|
||||||
@ -182,6 +179,11 @@ func (h *PeeringHandler) processPeerings() {
|
|||||||
asn1 := path[i]
|
asn1 := path[i]
|
||||||
asn2 := path[i+1]
|
asn2 := path[i+1]
|
||||||
|
|
||||||
|
// Skip invalid ASNs
|
||||||
|
if asn1 <= 0 || asn2 <= 0 || asn1 == asn2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Normalize: lower AS number first
|
// Normalize: lower AS number first
|
||||||
low, high := asn1, asn2
|
low, high := asn1, asn2
|
||||||
if low > high {
|
if low > high {
|
||||||
@ -193,22 +195,6 @@ func (h *PeeringHandler) processPeerings() {
|
|||||||
if existing, ok := peerings[key]; !ok || timestamp.After(existing) {
|
if existing, ok := peerings[key]; !ok || timestamp.After(existing) {
|
||||||
peerings[key] = timestamp
|
peerings[key] = timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
uniqueASNs[asn1] = struct{}{}
|
|
||||||
uniqueASNs[asn2] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get or create ASNs
|
|
||||||
for asn := range uniqueASNs {
|
|
||||||
if _, ok := h.asnCache[asn]; !ok {
|
|
||||||
asnObj, err := h.db.GetOrCreateASN(asn, time.Now())
|
|
||||||
if err != nil {
|
|
||||||
h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err)
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
h.asnCache[asn] = asnObj
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -216,19 +202,15 @@ func (h *PeeringHandler) processPeerings() {
|
|||||||
start := time.Now()
|
start := time.Now()
|
||||||
successCount := 0
|
successCount := 0
|
||||||
for key, ts := range peerings {
|
for key, ts := range peerings {
|
||||||
fromAS := h.asnCache[key.low]
|
err := h.db.RecordPeering(key.low, key.high, ts)
|
||||||
toAS := h.asnCache[key.high]
|
if err != nil {
|
||||||
if fromAS != nil && toAS != nil {
|
h.logger.Error("Failed to record peering",
|
||||||
err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts)
|
"as_a", key.low,
|
||||||
if err != nil {
|
"as_b", key.high,
|
||||||
h.logger.Error("Failed to record peering",
|
"error", err,
|
||||||
"from_asn", key.low,
|
)
|
||||||
"to_asn", key.high,
|
} else {
|
||||||
"error", err,
|
successCount++
|
||||||
)
|
|
||||||
} else {
|
|
||||||
successCount++
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user