diff --git a/internal/database/database.go b/internal/database/database.go index a05a6fa..c898715 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -254,7 +254,20 @@ func (d *Database) RecordAnnouncement(announcement *Announcement) error { } // RecordPeering records a peering relationship between two ASNs. -func (d *Database) RecordPeering(fromASNID, toASNID string, timestamp time.Time) error { +func (d *Database) RecordPeering(asA, asB int, timestamp time.Time) error { + // Validate ASNs + if asA <= 0 || asB <= 0 { + return fmt.Errorf("invalid ASN: asA=%d, asB=%d", asA, asB) + } + if asA == asB { + return fmt.Errorf("cannot create peering with same ASN: %d", asA) + } + + // Normalize: ensure asA < asB + if asA > asB { + asA, asB = asB, asA + } + tx, err := d.beginTx() if err != nil { return err @@ -266,20 +279,20 @@ func (d *Database) RecordPeering(fromASNID, toASNID string, timestamp time.Time) }() var exists bool - err = tx.QueryRow("SELECT EXISTS(SELECT 1 FROM asn_peerings WHERE from_asn_id = ? AND to_asn_id = ?)", - fromASNID, toASNID).Scan(&exists) + err = tx.QueryRow("SELECT EXISTS(SELECT 1 FROM peerings WHERE as_a = ? AND as_b = ?)", + asA, asB).Scan(&exists) if err != nil { return err } if exists { - _, err = tx.Exec("UPDATE asn_peerings SET last_seen = ? WHERE from_asn_id = ? AND to_asn_id = ?", - timestamp, fromASNID, toASNID) + _, err = tx.Exec("UPDATE peerings SET last_seen = ? WHERE as_a = ? AND as_b = ?", + timestamp, asA, asB) } else { _, err = tx.Exec(` - INSERT INTO asn_peerings (id, from_asn_id, to_asn_id, first_seen, last_seen) + INSERT INTO peerings (id, as_a, as_b, first_seen, last_seen) VALUES (?, ?, ?, ?, ?)`, - generateUUID().String(), fromASNID, toASNID, timestamp, timestamp) + generateUUID().String(), asA, asB, timestamp, timestamp) } if err != nil { return err @@ -287,8 +300,8 @@ func (d *Database) RecordPeering(fromASNID, toASNID string, timestamp time.Time) if err = tx.Commit(); err != nil { d.logger.Error("Failed to commit transaction for peering", - "from_asn_id", fromASNID, - "to_asn_id", toASNID, + "as_a", asA, + "as_b", asB, "error", err, ) @@ -374,7 +387,7 @@ func (d *Database) GetStats() (Stats, error) { } // Count peerings - err = d.queryRow("SELECT COUNT(*) FROM asn_peerings").Scan(&stats.Peerings) + err = d.queryRow("SELECT COUNT(*) FROM peerings").Scan(&stats.Peerings) if err != nil { return stats, err } diff --git a/internal/database/interface.go b/internal/database/interface.go index 1f76b1a..c2d4a3a 100644 --- a/internal/database/interface.go +++ b/internal/database/interface.go @@ -29,7 +29,7 @@ type Store interface { RecordAnnouncement(announcement *Announcement) error // Peering operations - RecordPeering(fromASNID, toASNID string, timestamp time.Time) error + RecordPeering(asA, asB int, timestamp time.Time) error // Statistics GetStats() (Stats, error) diff --git a/internal/database/schema.sql b/internal/database/schema.sql index 02c7f10..cc0b8af 100644 --- a/internal/database/schema.sql +++ b/internal/database/schema.sql @@ -29,15 +29,13 @@ CREATE TABLE IF NOT EXISTS announcements ( FOREIGN KEY (origin_asn_id) REFERENCES asns(id) ); -CREATE TABLE IF NOT EXISTS asn_peerings ( +CREATE TABLE IF NOT EXISTS peerings ( id TEXT PRIMARY KEY, - from_asn_id TEXT NOT NULL, - to_asn_id TEXT NOT NULL, + as_a INTEGER NOT NULL, + as_b INTEGER NOT NULL, first_seen DATETIME NOT NULL, last_seen DATETIME NOT NULL, - FOREIGN KEY (from_asn_id) REFERENCES asns(id), - FOREIGN KEY (to_asn_id) REFERENCES asns(id), - UNIQUE(from_asn_id, to_asn_id) + UNIQUE(as_a, as_b) ); -- BGP peers that send us messages @@ -55,9 +53,9 @@ CREATE INDEX IF NOT EXISTS idx_prefixes_version_prefix ON prefixes(ip_version, p CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp); CREATE INDEX IF NOT EXISTS idx_announcements_prefix_id ON announcements(prefix_id); CREATE INDEX IF NOT EXISTS idx_announcements_asn_id ON announcements(asn_id); -CREATE INDEX IF NOT EXISTS idx_asn_peerings_from_asn ON asn_peerings(from_asn_id); -CREATE INDEX IF NOT EXISTS idx_asn_peerings_to_asn ON asn_peerings(to_asn_id); -CREATE INDEX IF NOT EXISTS idx_asn_peerings_lookup ON asn_peerings(from_asn_id, to_asn_id); +CREATE INDEX IF NOT EXISTS idx_peerings_as_a ON peerings(as_a); +CREATE INDEX IF NOT EXISTS idx_peerings_as_b ON peerings(as_b); +CREATE INDEX IF NOT EXISTS idx_peerings_lookup ON peerings(as_a, as_b); -- Additional indexes for prefixes table CREATE INDEX IF NOT EXISTS idx_prefixes_prefix ON prefixes(prefix); diff --git a/internal/routewatch/app_integration_test.go b/internal/routewatch/app_integration_test.go index 1cf02c9..cb655bc 100644 --- a/internal/routewatch/app_integration_test.go +++ b/internal/routewatch/app_integration_test.go @@ -2,6 +2,7 @@ package routewatch import ( "context" + "fmt" "strings" "sync" "testing" @@ -119,11 +120,16 @@ func (m *mockStore) RecordAnnouncement(_ *database.Announcement) error { } // RecordPeering mock implementation -func (m *mockStore) RecordPeering(fromASNID, toASNID string, _ time.Time) error { +func (m *mockStore) RecordPeering(asA, asB int, _ time.Time) error { m.mu.Lock() defer m.mu.Unlock() - key := fromASNID + "_" + toASNID + // Normalize + if asA > asB { + asA, asB = asB, asA + } + + key := fmt.Sprintf("%d_%d", asA, asB) if !m.Peerings[key] { m.Peerings[key] = true m.PeeringCount++ diff --git a/internal/routewatch/peeringhandler.go b/internal/routewatch/peeringhandler.go index 62d5475..476e3b8 100644 --- a/internal/routewatch/peeringhandler.go +++ b/internal/routewatch/peeringhandler.go @@ -33,9 +33,8 @@ type PeeringHandler struct { logger *logger.Logger // In-memory AS path tracking - mu sync.RWMutex - asPaths map[string]time.Time // key is JSON-encoded AS path - asnCache map[int]*database.ASN + mu sync.RWMutex + asPaths map[string]time.Time // key is JSON-encoded AS path stopCh chan struct{} } @@ -43,11 +42,10 @@ type PeeringHandler struct { // NewPeeringHandler creates a new batched peering handler func NewPeeringHandler(db database.Store, logger *logger.Logger) *PeeringHandler { h := &PeeringHandler{ - db: db, - logger: logger, - asPaths: make(map[string]time.Time), - asnCache: make(map[int]*database.ASN), - stopCh: make(chan struct{}), + db: db, + logger: logger, + asPaths: make(map[string]time.Time), + stopCh: make(chan struct{}), } // Start the periodic processing goroutines @@ -167,7 +165,6 @@ func (h *PeeringHandler) processPeerings() { low, high int } peerings := make(map[peeringKey]time.Time) - uniqueASNs := make(map[int]struct{}) for pathJSON, timestamp := range pathsCopy { var path []int @@ -182,6 +179,11 @@ func (h *PeeringHandler) processPeerings() { asn1 := path[i] asn2 := path[i+1] + // Skip invalid ASNs + if asn1 <= 0 || asn2 <= 0 || asn1 == asn2 { + continue + } + // Normalize: lower AS number first low, high := asn1, asn2 if low > high { @@ -193,22 +195,6 @@ func (h *PeeringHandler) processPeerings() { if existing, ok := peerings[key]; !ok || timestamp.After(existing) { peerings[key] = timestamp } - - uniqueASNs[asn1] = struct{}{} - uniqueASNs[asn2] = struct{}{} - } - } - - // Get or create ASNs - for asn := range uniqueASNs { - if _, ok := h.asnCache[asn]; !ok { - asnObj, err := h.db.GetOrCreateASN(asn, time.Now()) - if err != nil { - h.logger.Error("Failed to get/create ASN", "asn", asn, "error", err) - - continue - } - h.asnCache[asn] = asnObj } } @@ -216,19 +202,15 @@ func (h *PeeringHandler) processPeerings() { start := time.Now() successCount := 0 for key, ts := range peerings { - fromAS := h.asnCache[key.low] - toAS := h.asnCache[key.high] - if fromAS != nil && toAS != nil { - err := h.db.RecordPeering(fromAS.ID.String(), toAS.ID.String(), ts) - if err != nil { - h.logger.Error("Failed to record peering", - "from_asn", key.low, - "to_asn", key.high, - "error", err, - ) - } else { - successCount++ - } + err := h.db.RecordPeering(key.low, key.high, ts) + if err != nil { + h.logger.Error("Failed to record peering", + "as_a", key.low, + "as_b", key.high, + "error", err, + ) + } else { + successCount++ } }