// Package database provides SQLite storage for BGP routing data including ASNs, prefixes, announcements and peerings. package database import ( "database/sql" _ "embed" "fmt" "log/slog" "os" "path/filepath" "runtime" "time" "git.eeqj.de/sneak/routewatch/pkg/asinfo" "github.com/google/uuid" _ "github.com/mattn/go-sqlite3" // CGO SQLite driver ) //go:embed schema.sql var dbSchema string const dirPermissions = 0750 // rwxr-x--- // Database manages the SQLite database connection and operations. type Database struct { db *sql.DB logger *slog.Logger path string } // Config holds database configuration type Config struct { Path string } // getDefaultDatabasePath returns the appropriate database path for the OS func getDefaultDatabasePath() string { const dbFilename = "db.sqlite" switch runtime.GOOS { case "darwin": // macOS: ~/Library/Application Support/berlin.sneak.app.routewatch/db.sqlite home, err := os.UserHomeDir() if err != nil { return dbFilename } appSupport := filepath.Join(home, "Library", "Application Support", "berlin.sneak.app.routewatch") if err := os.MkdirAll(appSupport, dirPermissions); err != nil { return dbFilename } return filepath.Join(appSupport, dbFilename) default: // Linux and others: /var/lib/routewatch/db.sqlite dbDir := "/var/lib/routewatch" if err := os.MkdirAll(dbDir, dirPermissions); err != nil { // Fall back to user's home directory if can't create system directory home, err := os.UserHomeDir() if err != nil { return dbFilename } userDir := filepath.Join(home, ".local", "share", "routewatch") if err := os.MkdirAll(userDir, dirPermissions); err != nil { return dbFilename } return filepath.Join(userDir, dbFilename) } return filepath.Join(dbDir, dbFilename) } } // NewConfig provides default database configuration func NewConfig() Config { return Config{ Path: getDefaultDatabasePath(), } } // New creates a new database connection and initializes the schema. func New(logger *slog.Logger) (*Database, error) { config := NewConfig() return NewWithConfig(config, logger) } // NewWithConfig creates a new database connection with custom configuration func NewWithConfig(config Config, logger *slog.Logger) (*Database, error) { // Log database path logger.Info("Opening database", "path", config.Path) // Ensure directory exists dir := filepath.Dir(config.Path) if err := os.MkdirAll(dir, dirPermissions); err != nil { return nil, fmt.Errorf("failed to create database directory: %w", err) } // Add connection parameters for go-sqlite3 // Enable WAL mode and other performance optimizations dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&cache=shared", config.Path) db, err := sql.Open("sqlite3", dsn) if err != nil { return nil, fmt.Errorf("failed to open database: %w", err) } if err := db.Ping(); err != nil { return nil, fmt.Errorf("failed to ping database: %w", err) } // Set connection pool parameters db.SetMaxOpenConns(1) // Force serialization since SQLite doesn't handle true concurrency well db.SetMaxIdleConns(1) db.SetConnMaxLifetime(0) database := &Database{db: db, logger: logger, path: config.Path} if err := database.Initialize(); err != nil { return nil, fmt.Errorf("failed to initialize database: %w", err) } return database, nil } // Initialize creates the database schema if it doesn't exist. func (d *Database) Initialize() error { // Set SQLite pragmas for better performance pragmas := []string{ "PRAGMA journal_mode=WAL", // Already set in connection string "PRAGMA synchronous=NORMAL", // Faster than FULL, still safe "PRAGMA cache_size=-524288", // 512MB cache (negative = KB) "PRAGMA temp_store=MEMORY", // Use memory for temp tables "PRAGMA mmap_size=268435456", // 256MB memory-mapped I/O "PRAGMA wal_autocheckpoint=1000", // Checkpoint every 1000 pages "PRAGMA wal_checkpoint(PASSIVE)", // Checkpoint now "PRAGMA optimize", // Run optimizer } for _, pragma := range pragmas { if err := d.exec(pragma); err != nil { d.logger.Warn("Failed to set pragma", "pragma", pragma, "error", err) } } err := d.exec(dbSchema) return err } // Close closes the database connection. func (d *Database) Close() error { return d.db.Close() } // beginTx starts a new transaction with logging func (d *Database) beginTx() (*loggingTx, error) { tx, err := d.db.Begin() if err != nil { return nil, err } return &loggingTx{Tx: tx, logger: d.logger}, nil } // GetOrCreateASN retrieves an existing ASN or creates a new one if it doesn't exist. func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error) { tx, err := d.beginTx() if err != nil { return nil, err } defer func() { if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { d.logger.Error("Failed to rollback transaction", "error", err) } }() var asn ASN var idStr string var handle, description sql.NullString err = tx.QueryRow("SELECT id, number, handle, description, first_seen, last_seen FROM asns WHERE number = ?", number). Scan(&idStr, &asn.Number, &handle, &description, &asn.FirstSeen, &asn.LastSeen) if err == nil { // ASN exists, update last_seen asn.ID, _ = uuid.Parse(idStr) asn.Handle = handle.String asn.Description = description.String _, err = tx.Exec("UPDATE asns SET last_seen = ? WHERE id = ?", timestamp, asn.ID.String()) if err != nil { return nil, err } asn.LastSeen = timestamp if err = tx.Commit(); err != nil { d.logger.Error("Failed to commit transaction for ASN update", "asn", number, "error", err) return nil, err } return &asn, nil } if err != sql.ErrNoRows { return nil, err } // ASN doesn't exist, create it with ASN info lookup asn = ASN{ ID: generateUUID(), Number: number, FirstSeen: timestamp, LastSeen: timestamp, } // Look up ASN info if info, ok := asinfo.Get(number); ok { asn.Handle = info.Handle asn.Description = info.Description } _, err = tx.Exec("INSERT INTO asns (id, number, handle, description, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?)", asn.ID.String(), asn.Number, asn.Handle, asn.Description, asn.FirstSeen, asn.LastSeen) if err != nil { return nil, err } if err = tx.Commit(); err != nil { d.logger.Error("Failed to commit transaction for ASN creation", "asn", number, "error", err) return nil, err } return &asn, nil } // GetOrCreatePrefix retrieves an existing prefix or creates a new one if it doesn't exist. func (d *Database) GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefix, error) { tx, err := d.beginTx() if err != nil { return nil, err } defer func() { if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { d.logger.Error("Failed to rollback transaction", "error", err) } }() var p Prefix var idStr string err = tx.QueryRow("SELECT id, prefix, ip_version, first_seen, last_seen FROM prefixes WHERE prefix = ?", prefix). Scan(&idStr, &p.Prefix, &p.IPVersion, &p.FirstSeen, &p.LastSeen) if err == nil { // Prefix exists, update last_seen p.ID, _ = uuid.Parse(idStr) _, err = tx.Exec("UPDATE prefixes SET last_seen = ? WHERE id = ?", timestamp, p.ID.String()) if err != nil { return nil, err } p.LastSeen = timestamp if err = tx.Commit(); err != nil { d.logger.Error("Failed to commit transaction for prefix update", "prefix", prefix, "error", err) return nil, err } return &p, nil } if err != sql.ErrNoRows { return nil, err } // Prefix doesn't exist, create it p = Prefix{ ID: generateUUID(), Prefix: prefix, IPVersion: detectIPVersion(prefix), FirstSeen: timestamp, LastSeen: timestamp, } _, err = tx.Exec("INSERT INTO prefixes (id, prefix, ip_version, first_seen, last_seen) VALUES (?, ?, ?, ?, ?)", p.ID.String(), p.Prefix, p.IPVersion, p.FirstSeen, p.LastSeen) if err != nil { return nil, err } if err = tx.Commit(); err != nil { d.logger.Error("Failed to commit transaction for prefix creation", "prefix", prefix, "error", err) return nil, err } return &p, nil } // RecordAnnouncement inserts a new BGP announcement or withdrawal into the database. func (d *Database) RecordAnnouncement(announcement *Announcement) error { err := d.exec(` INSERT INTO announcements (id, prefix_id, asn_id, origin_asn_id, path, next_hop, timestamp, is_withdrawal) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, announcement.ID.String(), announcement.PrefixID.String(), announcement.ASNID.String(), announcement.OriginASNID.String(), announcement.Path, announcement.NextHop, announcement.Timestamp, announcement.IsWithdrawal) return err } // RecordPeering records a peering relationship between two ASNs. func (d *Database) RecordPeering(fromASNID, toASNID string, timestamp time.Time) error { tx, err := d.beginTx() if err != nil { return err } defer func() { if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { d.logger.Error("Failed to rollback transaction", "error", err) } }() var exists bool err = tx.QueryRow("SELECT EXISTS(SELECT 1 FROM asn_peerings WHERE from_asn_id = ? AND to_asn_id = ?)", fromASNID, toASNID).Scan(&exists) if err != nil { return err } if exists { _, err = tx.Exec("UPDATE asn_peerings SET last_seen = ? WHERE from_asn_id = ? AND to_asn_id = ?", timestamp, fromASNID, toASNID) } else { _, err = tx.Exec(` INSERT INTO asn_peerings (id, from_asn_id, to_asn_id, first_seen, last_seen) VALUES (?, ?, ?, ?, ?)`, generateUUID().String(), fromASNID, toASNID, timestamp, timestamp) } if err != nil { return err } if err = tx.Commit(); err != nil { d.logger.Error("Failed to commit transaction for peering", "from_asn_id", fromASNID, "to_asn_id", toASNID, "error", err, ) return err } return nil } // UpdatePeer updates or creates a BGP peer record func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error { tx, err := d.beginTx() if err != nil { return err } defer func() { if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { d.logger.Error("Failed to rollback transaction", "error", err) } }() var exists bool err = tx.QueryRow("SELECT EXISTS(SELECT 1 FROM bgp_peers WHERE peer_ip = ?)", peerIP).Scan(&exists) if err != nil { return err } if exists { _, err = tx.Exec( "UPDATE bgp_peers SET peer_asn = ?, last_seen = ?, last_message_type = ? WHERE peer_ip = ?", peerASN, timestamp, messageType, peerIP, ) } else { _, err = tx.Exec( "INSERT INTO bgp_peers (id, peer_ip, peer_asn, first_seen, last_seen, last_message_type) VALUES (?, ?, ?, ?, ?, ?)", generateUUID().String(), peerIP, peerASN, timestamp, timestamp, messageType, ) } if err != nil { return err } if err = tx.Commit(); err != nil { d.logger.Error("Failed to commit transaction for peer update", "peer_ip", peerIP, "peer_asn", peerASN, "error", err, ) return err } return nil } // GetStats returns database statistics func (d *Database) GetStats() (Stats, error) { var stats Stats // Count ASNs d.logger.Info("Counting ASNs") err := d.queryRow("SELECT COUNT(*) FROM asns").Scan(&stats.ASNs) if err != nil { return stats, err } // Count prefixes d.logger.Info("Counting prefixes") err = d.queryRow("SELECT COUNT(*) FROM prefixes").Scan(&stats.Prefixes) if err != nil { return stats, err } // Count IPv4 and IPv6 prefixes d.logger.Info("Counting IPv4 prefixes") const ipVersionV4 = 4 err = d.queryRow("SELECT COUNT(*) FROM prefixes WHERE ip_version = ?", ipVersionV4).Scan(&stats.IPv4Prefixes) if err != nil { return stats, err } d.logger.Info("Counting IPv6 prefixes") const ipVersionV6 = 6 err = d.queryRow("SELECT COUNT(*) FROM prefixes WHERE ip_version = ?", ipVersionV6).Scan(&stats.IPv6Prefixes) if err != nil { return stats, err } // Count peerings d.logger.Info("Counting peerings") err = d.queryRow("SELECT COUNT(*) FROM asn_peerings").Scan(&stats.Peerings) if err != nil { return stats, err } // Get database file size d.logger.Info("Getting database file size") fileInfo, err := os.Stat(d.path) if err != nil { d.logger.Warn("Failed to get database file size", "error", err) stats.FileSizeBytes = 0 } else { stats.FileSizeBytes = fileInfo.Size() } d.logger.Info("Stats collection complete") return stats, nil }