Major schema refactoring: simplify ASN and prefix tracking

- Remove UUID primary keys from ASNs table, use ASN number as primary key
- Update announcements table to reference ASN numbers directly
- Rename asns.number column to asns.asn for consistency
- Add prefix tracking to PrefixHandler to populate prefixes_v4/v6 tables
- Add UpdatePrefixesBatch method for efficient batch updates
- Update all database methods and models to use new schema
- Fix all references in code to use ASN field instead of Number
- Update test mocks to match new interfaces
This commit is contained in:
2025-07-28 22:58:55 +02:00
parent a165ecf759
commit c9da20e630
8 changed files with 201 additions and 53 deletions

View File

@@ -367,6 +367,100 @@ func (d *Database) DeleteLiveRouteBatch(deletions []LiveRouteDeletion) error {
return nil
}
// UpdatePrefixesBatch updates the last_seen time for multiple prefixes in a single transaction
func (d *Database) UpdatePrefixesBatch(prefixes map[string]time.Time) error {
if len(prefixes) == 0 {
return nil
}
d.lock("UpdatePrefixesBatch")
defer d.unlock()
tx, err := d.beginTx()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer func() {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
d.logger.Error("Failed to rollback transaction", "error", err)
}
}()
// Prepare statements for both IPv4 and IPv6 tables
selectV4Stmt, err := tx.Prepare("SELECT id FROM prefixes_v4 WHERE prefix = ?")
if err != nil {
return fmt.Errorf("failed to prepare IPv4 select statement: %w", err)
}
defer func() { _ = selectV4Stmt.Close() }()
updateV4Stmt, err := tx.Prepare("UPDATE prefixes_v4 SET last_seen = ? WHERE prefix = ?")
if err != nil {
return fmt.Errorf("failed to prepare IPv4 update statement: %w", err)
}
defer func() { _ = updateV4Stmt.Close() }()
insertV4Stmt, err := tx.Prepare("INSERT INTO prefixes_v4 (id, prefix, first_seen, last_seen) VALUES (?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare IPv4 insert statement: %w", err)
}
defer func() { _ = insertV4Stmt.Close() }()
selectV6Stmt, err := tx.Prepare("SELECT id FROM prefixes_v6 WHERE prefix = ?")
if err != nil {
return fmt.Errorf("failed to prepare IPv6 select statement: %w", err)
}
defer func() { _ = selectV6Stmt.Close() }()
updateV6Stmt, err := tx.Prepare("UPDATE prefixes_v6 SET last_seen = ? WHERE prefix = ?")
if err != nil {
return fmt.Errorf("failed to prepare IPv6 update statement: %w", err)
}
defer func() { _ = updateV6Stmt.Close() }()
insertV6Stmt, err := tx.Prepare("INSERT INTO prefixes_v6 (id, prefix, first_seen, last_seen) VALUES (?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare IPv6 insert statement: %w", err)
}
defer func() { _ = insertV6Stmt.Close() }()
for prefix, timestamp := range prefixes {
ipVersion := detectIPVersion(prefix)
var selectStmt, updateStmt, insertStmt *sql.Stmt
if ipVersion == ipVersionV4 {
selectStmt, updateStmt, insertStmt = selectV4Stmt, updateV4Stmt, insertV4Stmt
} else {
selectStmt, updateStmt, insertStmt = selectV6Stmt, updateV6Stmt, insertV6Stmt
}
var id string
err = selectStmt.QueryRow(prefix).Scan(&id)
switch err {
case nil:
// Prefix exists, update last_seen
_, err = updateStmt.Exec(timestamp, prefix)
if err != nil {
return fmt.Errorf("failed to update prefix %s: %w", prefix, err)
}
case sql.ErrNoRows:
// Prefix doesn't exist, create it
_, err = insertStmt.Exec(generateUUID().String(), prefix, timestamp, timestamp)
if err != nil {
return fmt.Errorf("failed to insert prefix %s: %w", prefix, err)
}
default:
return fmt.Errorf("failed to query prefix %s: %w", prefix, err)
}
}
if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
// GetOrCreateASNBatch creates or updates multiple ASNs in a single transaction
func (d *Database) GetOrCreateASNBatch(asns map[int]time.Time) error {
if len(asns) == 0 {
@@ -388,20 +482,20 @@ func (d *Database) GetOrCreateASNBatch(asns map[int]time.Time) error {
// Prepare statements
selectStmt, err := tx.Prepare(
"SELECT id, number, handle, description, first_seen, last_seen FROM asns WHERE number = ?")
"SELECT asn, handle, description, first_seen, last_seen FROM asns WHERE asn = ?")
if err != nil {
return fmt.Errorf("failed to prepare select statement: %w", err)
}
defer func() { _ = selectStmt.Close() }()
updateStmt, err := tx.Prepare("UPDATE asns SET last_seen = ? WHERE id = ?")
updateStmt, err := tx.Prepare("UPDATE asns SET last_seen = ? WHERE asn = ?")
if err != nil {
return fmt.Errorf("failed to prepare update statement: %w", err)
}
defer func() { _ = updateStmt.Close() }()
insertStmt, err := tx.Prepare(
"INSERT INTO asns (id, number, handle, description, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?)")
"INSERT INTO asns (asn, handle, description, first_seen, last_seen) VALUES (?, ?, ?, ?, ?)")
if err != nil {
return fmt.Errorf("failed to prepare insert statement: %w", err)
}
@@ -409,15 +503,13 @@ func (d *Database) GetOrCreateASNBatch(asns map[int]time.Time) error {
for number, timestamp := range asns {
var asn ASN
var idStr string
var handle, description sql.NullString
err = selectStmt.QueryRow(number).Scan(&idStr, &asn.Number, &handle, &description, &asn.FirstSeen, &asn.LastSeen)
err = selectStmt.QueryRow(number).Scan(&asn.ASN, &handle, &description, &asn.FirstSeen, &asn.LastSeen)
if err == nil {
// ASN exists, update last_seen
asn.ID, _ = uuid.Parse(idStr)
_, err = updateStmt.Exec(timestamp, asn.ID.String())
_, err = updateStmt.Exec(timestamp, number)
if err != nil {
return fmt.Errorf("failed to update ASN %d: %w", number, err)
}
@@ -428,8 +520,7 @@ func (d *Database) GetOrCreateASNBatch(asns map[int]time.Time) error {
if err == sql.ErrNoRows {
// ASN doesn't exist, create it
asn = ASN{
ID: generateUUID(),
Number: number,
ASN: number,
FirstSeen: timestamp,
LastSeen: timestamp,
}
@@ -440,7 +531,7 @@ func (d *Database) GetOrCreateASNBatch(asns map[int]time.Time) error {
asn.Description = info.Description
}
_, err = insertStmt.Exec(asn.ID.String(), asn.Number, asn.Handle, asn.Description, asn.FirstSeen, asn.LastSeen)
_, err = insertStmt.Exec(asn.ASN, asn.Handle, asn.Description, asn.FirstSeen, asn.LastSeen)
if err != nil {
return fmt.Errorf("failed to insert ASN %d: %w", number, err)
}
@@ -476,17 +567,15 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
}()
var asn ASN
var idStr string
var handle, description sql.NullString
err = tx.QueryRow("SELECT id, number, handle, description, first_seen, last_seen FROM asns WHERE number = ?", number).
Scan(&idStr, &asn.Number, &handle, &description, &asn.FirstSeen, &asn.LastSeen)
err = tx.QueryRow("SELECT asn, handle, description, first_seen, last_seen FROM asns WHERE asn = ?", number).
Scan(&asn.ASN, &handle, &description, &asn.FirstSeen, &asn.LastSeen)
if err == nil {
// ASN exists, update last_seen
asn.ID, _ = uuid.Parse(idStr)
asn.Handle = handle.String
asn.Description = description.String
_, err = tx.Exec("UPDATE asns SET last_seen = ? WHERE id = ?", timestamp, asn.ID.String())
_, err = tx.Exec("UPDATE asns SET last_seen = ? WHERE asn = ?", timestamp, number)
if err != nil {
return nil, err
}
@@ -507,8 +596,7 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
// ASN doesn't exist, create it with ASN info lookup
asn = ASN{
ID: generateUUID(),
Number: number,
ASN: number,
FirstSeen: timestamp,
LastSeen: timestamp,
}
@@ -519,8 +607,8 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
asn.Description = info.Description
}
_, err = tx.Exec("INSERT INTO asns (id, number, handle, description, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?)",
asn.ID.String(), asn.Number, asn.Handle, asn.Description, asn.FirstSeen, asn.LastSeen)
_, err = tx.Exec("INSERT INTO asns (asn, handle, description, first_seen, last_seen) VALUES (?, ?, ?, ?, ?)",
asn.ASN, asn.Handle, asn.Description, asn.FirstSeen, asn.LastSeen)
if err != nil {
return nil, err
}
@@ -615,10 +703,10 @@ func (d *Database) RecordAnnouncement(announcement *Announcement) error {
defer d.unlock()
err := d.exec(`
INSERT INTO announcements (id, prefix_id, asn_id, origin_asn_id, path, next_hop, timestamp, is_withdrawal)
INSERT INTO announcements (id, prefix_id, peer_asn, origin_asn, path, next_hop, timestamp, is_withdrawal)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
announcement.ID.String(), announcement.PrefixID.String(),
announcement.ASNID.String(), announcement.OriginASNID.String(),
announcement.PeerASN, announcement.OriginASN,
announcement.Path, announcement.NextHop, announcement.Timestamp, announcement.IsWithdrawal)
return err
@@ -815,13 +903,13 @@ func (d *Database) GetStatsContext(ctx context.Context) (Stats, error) {
return stats, err
}
// Count unique prefixes from live routes tables
err = d.db.QueryRowContext(ctx, "SELECT COUNT(DISTINCT prefix) FROM live_routes_v4").Scan(&stats.IPv4Prefixes)
// Count prefixes from both tables
err = d.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM prefixes_v4").Scan(&stats.IPv4Prefixes)
if err != nil {
return stats, err
}
err = d.db.QueryRowContext(ctx, "SELECT COUNT(DISTINCT prefix) FROM live_routes_v6").Scan(&stats.IPv6Prefixes)
err = d.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM prefixes_v6").Scan(&stats.IPv6Prefixes)
if err != nil {
return stats, err
}
@@ -1246,12 +1334,11 @@ func (d *Database) GetASDetails(asn int) (*ASN, []LiveRoute, error) {
func (d *Database) GetASDetailsContext(ctx context.Context, asn int) (*ASN, []LiveRoute, error) {
// Get AS information
var asnInfo ASN
var idStr string
var handle, description sql.NullString
err := d.db.QueryRowContext(ctx,
"SELECT id, number, handle, description, first_seen, last_seen FROM asns WHERE number = ?",
"SELECT asn, handle, description, first_seen, last_seen FROM asns WHERE asn = ?",
asn,
).Scan(&idStr, &asnInfo.Number, &handle, &description, &asnInfo.FirstSeen, &asnInfo.LastSeen)
).Scan(&asnInfo.ASN, &handle, &description, &asnInfo.FirstSeen, &asnInfo.LastSeen)
if err != nil {
if err == sql.ErrNoRows {
@@ -1261,7 +1348,6 @@ func (d *Database) GetASDetailsContext(ctx context.Context, asn int) (*ASN, []Li
return nil, nil, fmt.Errorf("failed to query AS: %w", err)
}
asnInfo.ID, _ = uuid.Parse(idStr)
asnInfo.Handle = handle.String
asnInfo.Description = description.String

View File

@@ -27,6 +27,7 @@ type Store interface {
// Prefix operations
GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefix, error)
UpdatePrefixesBatch(prefixes map[string]time.Time) error
// Announcement operations
RecordAnnouncement(announcement *Announcement) error

View File

@@ -8,8 +8,7 @@ import (
// ASN represents an Autonomous System Number
type ASN struct {
ID uuid.UUID `json:"id"`
Number int `json:"number"`
ASN int `json:"asn"`
Handle string `json:"handle"`
Description string `json:"description"`
FirstSeen time.Time `json:"first_seen"`
@@ -29,8 +28,8 @@ type Prefix struct {
type Announcement struct {
ID uuid.UUID `json:"id"`
PrefixID uuid.UUID `json:"prefix_id"`
ASNID uuid.UUID `json:"asn_id"`
OriginASNID uuid.UUID `json:"origin_asn_id"`
PeerASN int `json:"peer_asn"`
OriginASN int `json:"origin_asn"`
Path string `json:"path"` // JSON-encoded AS path
NextHop string `json:"next_hop"`
Timestamp time.Time `json:"timestamp"`
@@ -40,8 +39,8 @@ type Announcement struct {
// ASNPeering represents a peering relationship between two ASNs
type ASNPeering struct {
ID uuid.UUID `json:"id"`
FromASNID uuid.UUID `json:"from_asn_id"`
ToASNID uuid.UUID `json:"to_asn_id"`
ASA int `json:"as_a"`
ASB int `json:"as_b"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
}

View File

@@ -3,8 +3,7 @@
-- DO NOT make schema changes anywhere else in the codebase.
CREATE TABLE IF NOT EXISTS asns (
id TEXT PRIMARY KEY,
number INTEGER UNIQUE NOT NULL,
asn INTEGER PRIMARY KEY,
handle TEXT,
description TEXT,
first_seen DATETIME NOT NULL,
@@ -30,15 +29,14 @@ CREATE TABLE IF NOT EXISTS prefixes_v6 (
CREATE TABLE IF NOT EXISTS announcements (
id TEXT PRIMARY KEY,
prefix_id TEXT NOT NULL,
asn_id TEXT NOT NULL,
origin_asn_id TEXT NOT NULL,
peer_asn INTEGER NOT NULL,
origin_asn INTEGER NOT NULL,
path TEXT NOT NULL,
next_hop TEXT,
timestamp DATETIME NOT NULL,
is_withdrawal BOOLEAN NOT NULL DEFAULT 0,
FOREIGN KEY (prefix_id) REFERENCES prefixes(id),
FOREIGN KEY (asn_id) REFERENCES asns(id),
FOREIGN KEY (origin_asn_id) REFERENCES asns(id)
FOREIGN KEY (peer_asn) REFERENCES asns(asn),
FOREIGN KEY (origin_asn) REFERENCES asns(asn)
);
CREATE TABLE IF NOT EXISTS peerings (
@@ -67,13 +65,14 @@ CREATE INDEX IF NOT EXISTS idx_prefixes_v4_prefix ON prefixes_v4(prefix);
CREATE INDEX IF NOT EXISTS idx_prefixes_v6_prefix ON prefixes_v6(prefix);
CREATE INDEX IF NOT EXISTS idx_announcements_timestamp ON announcements(timestamp);
CREATE INDEX IF NOT EXISTS idx_announcements_prefix_id ON announcements(prefix_id);
CREATE INDEX IF NOT EXISTS idx_announcements_asn_id ON announcements(asn_id);
CREATE INDEX IF NOT EXISTS idx_announcements_peer_asn ON announcements(peer_asn);
CREATE INDEX IF NOT EXISTS idx_announcements_origin_asn ON announcements(origin_asn);
CREATE INDEX IF NOT EXISTS idx_peerings_as_a ON peerings(as_a);
CREATE INDEX IF NOT EXISTS idx_peerings_as_b ON peerings(as_b);
CREATE INDEX IF NOT EXISTS idx_peerings_lookup ON peerings(as_a, as_b);
-- Indexes for asns table
CREATE INDEX IF NOT EXISTS idx_asns_number ON asns(number);
CREATE INDEX IF NOT EXISTS idx_asns_asn ON asns(asn);
-- Indexes for bgp_peers table
CREATE INDEX IF NOT EXISTS idx_bgp_peers_asn ON bgp_peers(peer_asn);