Queries in the 50-70ms range are acceptable for now given SQLite's write serialization constraints. Setting threshold to 100ms to focus on truly problematic queries.
509 lines
14 KiB
Go
509 lines
14 KiB
Go
// Package database provides SQLite storage for BGP routing data including ASNs, prefixes, announcements and peerings.
|
|
package database
|
|
|
|
import (
|
|
"database/sql"
|
|
_ "embed"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
_ "modernc.org/sqlite" // Pure Go SQLite driver
|
|
)
|
|
|
|
//go:embed schema.sql
|
|
var dbSchema string
|
|
|
|
const dirPermissions = 0750 // rwxr-x---
|
|
|
|
// Database manages the SQLite database connection and operations.
|
|
type Database struct {
|
|
db *sql.DB
|
|
logger *slog.Logger
|
|
}
|
|
|
|
// Config holds database configuration
|
|
type Config struct {
|
|
Path string
|
|
}
|
|
|
|
// getDefaultDatabasePath returns the appropriate database path for the OS
|
|
func getDefaultDatabasePath() string {
|
|
const dbFilename = "db.sqlite"
|
|
|
|
switch runtime.GOOS {
|
|
case "darwin":
|
|
// macOS: ~/Library/Application Support/berlin.sneak.app.routewatch/db.sqlite
|
|
home, err := os.UserHomeDir()
|
|
if err != nil {
|
|
return dbFilename
|
|
}
|
|
appSupport := filepath.Join(home, "Library", "Application Support", "berlin.sneak.app.routewatch")
|
|
if err := os.MkdirAll(appSupport, dirPermissions); err != nil {
|
|
return dbFilename
|
|
}
|
|
|
|
return filepath.Join(appSupport, dbFilename)
|
|
default:
|
|
// Linux and others: /var/lib/routewatch/db.sqlite
|
|
dbDir := "/var/lib/routewatch"
|
|
if err := os.MkdirAll(dbDir, dirPermissions); err != nil {
|
|
// Fall back to user's home directory if can't create system directory
|
|
home, err := os.UserHomeDir()
|
|
if err != nil {
|
|
return dbFilename
|
|
}
|
|
userDir := filepath.Join(home, ".local", "share", "routewatch")
|
|
if err := os.MkdirAll(userDir, dirPermissions); err != nil {
|
|
return dbFilename
|
|
}
|
|
|
|
return filepath.Join(userDir, dbFilename)
|
|
}
|
|
|
|
return filepath.Join(dbDir, dbFilename)
|
|
}
|
|
}
|
|
|
|
// NewConfig provides default database configuration
|
|
func NewConfig() Config {
|
|
return Config{
|
|
Path: getDefaultDatabasePath(),
|
|
}
|
|
}
|
|
|
|
// New creates a new database connection and initializes the schema.
|
|
func New(logger *slog.Logger) (*Database, error) {
|
|
config := NewConfig()
|
|
|
|
return NewWithConfig(config, logger)
|
|
}
|
|
|
|
// NewWithConfig creates a new database connection with custom configuration
|
|
func NewWithConfig(config Config, logger *slog.Logger) (*Database, error) {
|
|
// Log database path
|
|
logger.Info("Opening database", "path", config.Path)
|
|
|
|
// Ensure directory exists
|
|
dir := filepath.Dir(config.Path)
|
|
if err := os.MkdirAll(dir, dirPermissions); err != nil {
|
|
return nil, fmt.Errorf("failed to create database directory: %w", err)
|
|
}
|
|
|
|
// Add connection parameters for modernc.org/sqlite
|
|
// Enable WAL mode and other performance optimizations
|
|
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&_cache_size=-512000", config.Path)
|
|
db, err := sql.Open("sqlite", dsn)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open database: %w", err)
|
|
}
|
|
|
|
if err := db.Ping(); err != nil {
|
|
return nil, fmt.Errorf("failed to ping database: %w", err)
|
|
}
|
|
|
|
// Set connection pool parameters
|
|
db.SetMaxOpenConns(1) // Force serialization since SQLite doesn't handle true concurrency well
|
|
db.SetMaxIdleConns(1)
|
|
db.SetConnMaxLifetime(0)
|
|
|
|
database := &Database{db: db, logger: logger}
|
|
|
|
if err := database.Initialize(); err != nil {
|
|
return nil, fmt.Errorf("failed to initialize database: %w", err)
|
|
}
|
|
|
|
return database, nil
|
|
}
|
|
|
|
// Initialize creates the database schema if it doesn't exist.
|
|
func (d *Database) Initialize() error {
|
|
// Set SQLite pragmas for better performance
|
|
pragmas := []string{
|
|
"PRAGMA journal_mode=WAL", // Already set in connection string
|
|
"PRAGMA synchronous=NORMAL", // Faster than FULL, still safe
|
|
"PRAGMA cache_size=-524288", // 512MB cache
|
|
"PRAGMA temp_store=MEMORY", // Use memory for temp tables
|
|
"PRAGMA mmap_size=268435456", // 256MB memory-mapped I/O
|
|
"PRAGMA wal_autocheckpoint=1000", // Checkpoint every 1000 pages
|
|
"PRAGMA wal_checkpoint(PASSIVE)", // Checkpoint now
|
|
"PRAGMA optimize", // Run optimizer
|
|
}
|
|
|
|
for _, pragma := range pragmas {
|
|
if err := d.exec(pragma); err != nil {
|
|
d.logger.Warn("Failed to set pragma", "pragma", pragma, "error", err)
|
|
}
|
|
}
|
|
|
|
err := d.exec(dbSchema)
|
|
|
|
return err
|
|
}
|
|
|
|
// Close closes the database connection.
|
|
func (d *Database) Close() error {
|
|
return d.db.Close()
|
|
}
|
|
|
|
// beginTx starts a new transaction with logging
|
|
func (d *Database) beginTx() (*loggingTx, error) {
|
|
tx, err := d.db.Begin()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &loggingTx{Tx: tx, logger: d.logger}, nil
|
|
}
|
|
|
|
// GetOrCreateASN retrieves an existing ASN or creates a new one if it doesn't exist.
|
|
func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error) {
|
|
tx, err := d.beginTx()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
|
|
d.logger.Error("Failed to rollback transaction", "error", err)
|
|
}
|
|
}()
|
|
|
|
var asn ASN
|
|
var idStr string
|
|
err = tx.QueryRow("SELECT id, number, first_seen, last_seen FROM asns WHERE number = ?", number).
|
|
Scan(&idStr, &asn.Number, &asn.FirstSeen, &asn.LastSeen)
|
|
|
|
if err == nil {
|
|
// ASN exists, update last_seen
|
|
asn.ID, _ = uuid.Parse(idStr)
|
|
_, err = tx.Exec("UPDATE asns SET last_seen = ? WHERE id = ?", timestamp, asn.ID.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
asn.LastSeen = timestamp
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
d.logger.Error("Failed to commit transaction for ASN update", "asn", number, "error", err)
|
|
|
|
return nil, err
|
|
}
|
|
|
|
return &asn, nil
|
|
}
|
|
|
|
if err != sql.ErrNoRows {
|
|
return nil, err
|
|
}
|
|
|
|
// ASN doesn't exist, create it
|
|
asn = ASN{
|
|
ID: generateUUID(),
|
|
Number: number,
|
|
FirstSeen: timestamp,
|
|
LastSeen: timestamp,
|
|
}
|
|
_, err = tx.Exec("INSERT INTO asns (id, number, first_seen, last_seen) VALUES (?, ?, ?, ?)",
|
|
asn.ID.String(), asn.Number, asn.FirstSeen, asn.LastSeen)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
d.logger.Error("Failed to commit transaction for ASN creation", "asn", number, "error", err)
|
|
|
|
return nil, err
|
|
}
|
|
|
|
return &asn, nil
|
|
}
|
|
|
|
// GetOrCreatePrefix retrieves an existing prefix or creates a new one if it doesn't exist.
|
|
func (d *Database) GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefix, error) {
|
|
tx, err := d.beginTx()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
|
|
d.logger.Error("Failed to rollback transaction", "error", err)
|
|
}
|
|
}()
|
|
|
|
var p Prefix
|
|
var idStr string
|
|
err = tx.QueryRow("SELECT id, prefix, ip_version, first_seen, last_seen FROM prefixes WHERE prefix = ?", prefix).
|
|
Scan(&idStr, &p.Prefix, &p.IPVersion, &p.FirstSeen, &p.LastSeen)
|
|
|
|
if err == nil {
|
|
// Prefix exists, update last_seen
|
|
p.ID, _ = uuid.Parse(idStr)
|
|
_, err = tx.Exec("UPDATE prefixes SET last_seen = ? WHERE id = ?", timestamp, p.ID.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p.LastSeen = timestamp
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
d.logger.Error("Failed to commit transaction for prefix update", "prefix", prefix, "error", err)
|
|
|
|
return nil, err
|
|
}
|
|
|
|
return &p, nil
|
|
}
|
|
|
|
if err != sql.ErrNoRows {
|
|
return nil, err
|
|
}
|
|
|
|
// Prefix doesn't exist, create it
|
|
p = Prefix{
|
|
ID: generateUUID(),
|
|
Prefix: prefix,
|
|
IPVersion: detectIPVersion(prefix),
|
|
FirstSeen: timestamp,
|
|
LastSeen: timestamp,
|
|
}
|
|
_, err = tx.Exec("INSERT INTO prefixes (id, prefix, ip_version, first_seen, last_seen) VALUES (?, ?, ?, ?, ?)",
|
|
p.ID.String(), p.Prefix, p.IPVersion, p.FirstSeen, p.LastSeen)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
d.logger.Error("Failed to commit transaction for prefix creation", "prefix", prefix, "error", err)
|
|
|
|
return nil, err
|
|
}
|
|
|
|
return &p, nil
|
|
}
|
|
|
|
// RecordAnnouncement inserts a new BGP announcement or withdrawal into the database.
|
|
func (d *Database) RecordAnnouncement(announcement *Announcement) error {
|
|
err := d.exec(`
|
|
INSERT INTO announcements (id, prefix_id, asn_id, origin_asn_id, path, next_hop, timestamp, is_withdrawal)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
announcement.ID.String(), announcement.PrefixID.String(),
|
|
announcement.ASNID.String(), announcement.OriginASNID.String(),
|
|
announcement.Path, announcement.NextHop, announcement.Timestamp, announcement.IsWithdrawal)
|
|
|
|
return err
|
|
}
|
|
|
|
// RecordPeering records a peering relationship between two ASNs.
|
|
func (d *Database) RecordPeering(fromASNID, toASNID string, timestamp time.Time) error {
|
|
tx, err := d.beginTx()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
|
|
d.logger.Error("Failed to rollback transaction", "error", err)
|
|
}
|
|
}()
|
|
|
|
var exists bool
|
|
err = tx.QueryRow("SELECT EXISTS(SELECT 1 FROM asn_peerings WHERE from_asn_id = ? AND to_asn_id = ?)",
|
|
fromASNID, toASNID).Scan(&exists)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if exists {
|
|
_, err = tx.Exec("UPDATE asn_peerings SET last_seen = ? WHERE from_asn_id = ? AND to_asn_id = ?",
|
|
timestamp, fromASNID, toASNID)
|
|
} else {
|
|
_, err = tx.Exec(`
|
|
INSERT INTO asn_peerings (id, from_asn_id, to_asn_id, first_seen, last_seen)
|
|
VALUES (?, ?, ?, ?, ?)`,
|
|
generateUUID().String(), fromASNID, toASNID, timestamp, timestamp)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
d.logger.Error("Failed to commit transaction for peering",
|
|
"from_asn_id", fromASNID,
|
|
"to_asn_id", toASNID,
|
|
"error", err,
|
|
)
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateLiveRoute updates the live routing table for an announcement
|
|
func (d *Database) UpdateLiveRoute(
|
|
prefixID, originASNID uuid.UUID,
|
|
peerASN int,
|
|
nextHop string,
|
|
timestamp time.Time,
|
|
) error {
|
|
// Use SQLite's UPSERT capability to avoid the SELECT+UPDATE/INSERT pattern
|
|
// This reduces the number of queries and improves performance
|
|
// Note: We removed the WHERE clause from ON CONFLICT UPDATE because
|
|
// if we're updating, we want to update regardless of withdrawn_at status
|
|
err := d.exec(`
|
|
INSERT INTO live_routes (id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at, withdrawn_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, NULL)
|
|
ON CONFLICT(prefix_id, origin_asn_id, peer_asn) DO UPDATE SET
|
|
next_hop = excluded.next_hop,
|
|
announced_at = excluded.announced_at,
|
|
withdrawn_at = NULL`,
|
|
generateUUID().String(), prefixID.String(), originASNID.String(),
|
|
peerASN, nextHop, timestamp)
|
|
|
|
return err
|
|
}
|
|
|
|
// WithdrawLiveRoute marks a route as withdrawn in the live routing table
|
|
func (d *Database) WithdrawLiveRoute(prefixID uuid.UUID, peerASN int, timestamp time.Time) error {
|
|
err := d.exec(`
|
|
UPDATE live_routes
|
|
SET withdrawn_at = ?
|
|
WHERE prefix_id = ? AND peer_asn = ? AND withdrawn_at IS NULL`,
|
|
timestamp, prefixID.String(), peerASN)
|
|
|
|
return err
|
|
}
|
|
|
|
// GetActiveLiveRoutes returns all currently active routes (not withdrawn)
|
|
func (d *Database) GetActiveLiveRoutes() ([]LiveRoute, error) {
|
|
rows, err := d.query(`
|
|
SELECT id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at
|
|
FROM live_routes
|
|
WHERE withdrawn_at IS NULL
|
|
ORDER BY announced_at DESC`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
_ = rows.Close()
|
|
}()
|
|
|
|
var routes []LiveRoute
|
|
for rows.Next() {
|
|
var route LiveRoute
|
|
var idStr, prefixIDStr, originASNIDStr string
|
|
err := rows.Scan(&idStr, &prefixIDStr, &originASNIDStr,
|
|
&route.PeerASN, &route.NextHop, &route.AnnouncedAt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
route.ID, _ = uuid.Parse(idStr)
|
|
route.PrefixID, _ = uuid.Parse(prefixIDStr)
|
|
route.OriginASNID, _ = uuid.Parse(originASNIDStr)
|
|
|
|
routes = append(routes, route)
|
|
}
|
|
|
|
return routes, rows.Err()
|
|
}
|
|
|
|
// UpdatePeer updates or creates a BGP peer record
|
|
func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
|
|
tx, err := d.beginTx()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
|
|
d.logger.Error("Failed to rollback transaction", "error", err)
|
|
}
|
|
}()
|
|
|
|
var exists bool
|
|
err = tx.QueryRow("SELECT EXISTS(SELECT 1 FROM bgp_peers WHERE peer_ip = ?)", peerIP).Scan(&exists)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if exists {
|
|
_, err = tx.Exec(
|
|
"UPDATE bgp_peers SET peer_asn = ?, last_seen = ?, last_message_type = ? WHERE peer_ip = ?",
|
|
peerASN, timestamp, messageType, peerIP,
|
|
)
|
|
} else {
|
|
_, err = tx.Exec(
|
|
"INSERT INTO bgp_peers (id, peer_ip, peer_asn, first_seen, last_seen, last_message_type) VALUES (?, ?, ?, ?, ?, ?)",
|
|
generateUUID().String(), peerIP, peerASN, timestamp, timestamp, messageType,
|
|
)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
d.logger.Error("Failed to commit transaction for peer update",
|
|
"peer_ip", peerIP,
|
|
"peer_asn", peerASN,
|
|
"error", err,
|
|
)
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetStats returns database statistics
|
|
func (d *Database) GetStats() (Stats, error) {
|
|
var stats Stats
|
|
|
|
// Count ASNs
|
|
d.logger.Info("Counting ASNs")
|
|
err := d.queryRow("SELECT COUNT(*) FROM asns").Scan(&stats.ASNs)
|
|
if err != nil {
|
|
return stats, err
|
|
}
|
|
|
|
// Count prefixes
|
|
d.logger.Info("Counting prefixes")
|
|
err = d.queryRow("SELECT COUNT(*) FROM prefixes").Scan(&stats.Prefixes)
|
|
if err != nil {
|
|
return stats, err
|
|
}
|
|
|
|
// Count IPv4 and IPv6 prefixes
|
|
d.logger.Info("Counting IPv4 prefixes")
|
|
const ipVersionV4 = 4
|
|
err = d.queryRow("SELECT COUNT(*) FROM prefixes WHERE ip_version = ?", ipVersionV4).Scan(&stats.IPv4Prefixes)
|
|
if err != nil {
|
|
return stats, err
|
|
}
|
|
|
|
d.logger.Info("Counting IPv6 prefixes")
|
|
const ipVersionV6 = 6
|
|
err = d.queryRow("SELECT COUNT(*) FROM prefixes WHERE ip_version = ?", ipVersionV6).Scan(&stats.IPv6Prefixes)
|
|
if err != nil {
|
|
return stats, err
|
|
}
|
|
|
|
// Count peerings
|
|
d.logger.Info("Counting peerings")
|
|
err = d.queryRow("SELECT COUNT(*) FROM asn_peerings").Scan(&stats.Peerings)
|
|
if err != nil {
|
|
return stats, err
|
|
}
|
|
|
|
// Count live routes
|
|
d.logger.Info("Counting live routes")
|
|
err = d.queryRow("SELECT COUNT(*) FROM live_routes WHERE withdrawn_at IS NULL").Scan(&stats.LiveRoutes)
|
|
if err != nil {
|
|
return stats, err
|
|
}
|
|
|
|
d.logger.Info("Stats collection complete")
|
|
|
|
return stats, nil
|
|
}
|