Compare commits
5 Commits
40d7f0185b
...
optimize-s
| Author | SHA1 | Date | |
|---|---|---|---|
| a78e5c6e92 | |||
| 9ef2a22db3 | |||
| 05805b8847 | |||
| ddb3cfa4f0 | |||
| 3ef60459b2 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -35,3 +35,4 @@ pkg/asinfo/asdata.json
|
||||
|
||||
# Debug output files
|
||||
out
|
||||
log.txt
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/config"
|
||||
@@ -42,9 +44,12 @@ var (
|
||||
|
||||
// Database manages the SQLite database connection and operations.
|
||||
type Database struct {
|
||||
db *sql.DB
|
||||
logger *logger.Logger
|
||||
path string
|
||||
db *sql.DB
|
||||
logger *logger.Logger
|
||||
path string
|
||||
mu sync.Mutex
|
||||
lockedAt time.Time
|
||||
lockedBy string
|
||||
}
|
||||
|
||||
// New creates a new database connection and initializes the schema.
|
||||
@@ -61,8 +66,11 @@ func New(cfg *config.Config, logger *logger.Logger) (*Database, error) {
|
||||
}
|
||||
|
||||
// Add connection parameters for go-sqlite3
|
||||
// Enable WAL mode and other performance optimizations
|
||||
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=OFF&cache=shared", dbPath)
|
||||
// Configure SQLite connection parameters
|
||||
dsn := fmt.Sprintf(
|
||||
"file:%s",
|
||||
dbPath,
|
||||
)
|
||||
db, err := sql.Open("sqlite3", dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open database: %w", err)
|
||||
@@ -73,7 +81,7 @@ func New(cfg *config.Config, logger *logger.Logger) (*Database, error) {
|
||||
}
|
||||
|
||||
// Set connection pool parameters
|
||||
// Multiple connections for better concurrency
|
||||
// Multiple connections allow concurrent reads while writes are serialized
|
||||
const maxConns = 10
|
||||
db.SetMaxOpenConns(maxConns)
|
||||
db.SetMaxIdleConns(maxConns)
|
||||
@@ -90,22 +98,15 @@ func New(cfg *config.Config, logger *logger.Logger) (*Database, error) {
|
||||
|
||||
// Initialize creates the database schema if it doesn't exist.
|
||||
func (d *Database) Initialize() error {
|
||||
// Set SQLite pragmas for extreme performance - prioritize speed over durability
|
||||
// Set SQLite pragmas for performance
|
||||
pragmas := []string{
|
||||
"PRAGMA journal_mode=WAL", // Write-Ahead Logging
|
||||
"PRAGMA synchronous=OFF", // Don't wait for disk writes
|
||||
"PRAGMA cache_size=-8388608", // 8GB cache (negative = KB)
|
||||
"PRAGMA temp_store=MEMORY", // Use memory for temp tables
|
||||
"PRAGMA mmap_size=10737418240", // 10GB memory-mapped I/O
|
||||
"PRAGMA page_size=8192", // 8KB pages for better performance
|
||||
"PRAGMA wal_autocheckpoint=100000", // Checkpoint every 100k pages (800MB)
|
||||
"PRAGMA wal_checkpoint(TRUNCATE)", // Checkpoint and truncate WAL now
|
||||
"PRAGMA busy_timeout=5000", // 5 second busy timeout
|
||||
"PRAGMA locking_mode=NORMAL", // Normal locking for multiple connections
|
||||
"PRAGMA read_uncommitted=true", // Allow dirty reads
|
||||
"PRAGMA analysis_limit=0", // Disable automatic ANALYZE
|
||||
"PRAGMA threads=4", // Use multiple threads for sorting
|
||||
"PRAGMA cache_spill=false", // Keep cache in memory, don't spill to disk
|
||||
"PRAGMA journal_mode=WAL", // Write-Ahead Logging
|
||||
"PRAGMA synchronous=OFF", // Don't wait for disk writes
|
||||
"PRAGMA cache_size=-3145728", // 3GB cache (upper limit for 2.4GB DB)
|
||||
"PRAGMA temp_store=MEMORY", // Use memory for temp tables
|
||||
"PRAGMA wal_checkpoint(TRUNCATE)", // Checkpoint and truncate WAL now
|
||||
"PRAGMA busy_timeout=5000", // 5 second busy timeout
|
||||
"PRAGMA analysis_limit=0", // Disable automatic ANALYZE
|
||||
}
|
||||
|
||||
for _, pragma := range pragmas {
|
||||
@@ -133,6 +134,33 @@ func (d *Database) Close() error {
|
||||
return d.db.Close()
|
||||
}
|
||||
|
||||
// lock acquires the database mutex and logs debug information
|
||||
func (d *Database) lock(operation string) {
|
||||
// Get caller information
|
||||
_, file, line, _ := runtime.Caller(1)
|
||||
caller := fmt.Sprintf("%s:%d", filepath.Base(file), line)
|
||||
|
||||
d.logger.Debug("Acquiring database lock", "operation", operation, "caller", caller)
|
||||
|
||||
d.mu.Lock()
|
||||
d.lockedAt = time.Now()
|
||||
d.lockedBy = fmt.Sprintf("%s (%s)", operation, caller)
|
||||
|
||||
d.logger.Debug("Database lock acquired", "operation", operation, "caller", caller)
|
||||
}
|
||||
|
||||
// unlock releases the database mutex and logs debug information including hold duration
|
||||
func (d *Database) unlock() {
|
||||
holdDuration := time.Since(d.lockedAt)
|
||||
lockedBy := d.lockedBy
|
||||
|
||||
d.lockedAt = time.Time{}
|
||||
d.lockedBy = ""
|
||||
d.mu.Unlock()
|
||||
|
||||
d.logger.Debug("Database lock released", "held_by", lockedBy, "duration_ms", holdDuration.Milliseconds())
|
||||
}
|
||||
|
||||
// beginTx starts a new transaction with logging
|
||||
func (d *Database) beginTx() (*loggingTx, error) {
|
||||
tx, err := d.db.Begin()
|
||||
@@ -149,6 +177,9 @@ func (d *Database) UpsertLiveRouteBatch(routes []*LiveRoute) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.lock("UpsertLiveRouteBatch")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
@@ -227,6 +258,9 @@ func (d *Database) DeleteLiveRouteBatch(deletions []LiveRouteDeletion) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.lock("DeleteLiveRouteBatch")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
@@ -294,6 +328,9 @@ func (d *Database) GetOrCreateASNBatch(asns map[int]time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.lock("GetOrCreateASNBatch")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
@@ -380,6 +417,9 @@ func (d *Database) GetOrCreateASNBatch(asns map[int]time.Time) error {
|
||||
|
||||
// GetOrCreateASN retrieves an existing ASN or creates a new one if it doesn't exist.
|
||||
func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error) {
|
||||
d.lock("GetOrCreateASN")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -451,6 +491,9 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
|
||||
|
||||
// GetOrCreatePrefix retrieves an existing prefix or creates a new one if it doesn't exist.
|
||||
func (d *Database) GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefix, error) {
|
||||
d.lock("GetOrCreatePrefix")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -513,6 +556,9 @@ func (d *Database) GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefi
|
||||
|
||||
// RecordAnnouncement inserts a new BGP announcement or withdrawal into the database.
|
||||
func (d *Database) RecordAnnouncement(announcement *Announcement) error {
|
||||
d.lock("RecordAnnouncement")
|
||||
defer d.unlock()
|
||||
|
||||
err := d.exec(`
|
||||
INSERT INTO announcements (id, prefix_id, asn_id, origin_asn_id, path, next_hop, timestamp, is_withdrawal)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
@@ -538,6 +584,9 @@ func (d *Database) RecordPeering(asA, asB int, timestamp time.Time) error {
|
||||
asA, asB = asB, asA
|
||||
}
|
||||
|
||||
d.lock("RecordPeering")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -587,6 +636,9 @@ func (d *Database) UpdatePeerBatch(peers map[string]PeerUpdate) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.lock("UpdatePeerBatch")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
@@ -646,6 +698,9 @@ func (d *Database) UpdatePeerBatch(peers map[string]PeerUpdate) error {
|
||||
|
||||
// UpdatePeer updates or creates a BGP peer record
|
||||
func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
|
||||
d.lock("UpdatePeer")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -758,6 +813,9 @@ func (d *Database) GetStats() (Stats, error) {
|
||||
|
||||
// UpsertLiveRoute inserts or updates a live route
|
||||
func (d *Database) UpsertLiveRoute(route *LiveRoute) error {
|
||||
d.lock("UpsertLiveRoute")
|
||||
defer d.unlock()
|
||||
|
||||
query := `
|
||||
INSERT INTO live_routes (id, prefix, mask_length, ip_version, origin_asn, peer_ip, as_path, next_hop,
|
||||
last_updated, v4_ip_start, v4_ip_end)
|
||||
@@ -807,6 +865,9 @@ func (d *Database) UpsertLiveRoute(route *LiveRoute) error {
|
||||
// DeleteLiveRoute deletes a live route
|
||||
// If originASN is 0, deletes all routes for the prefix/peer combination
|
||||
func (d *Database) DeleteLiveRoute(prefix string, originASN int, peerIP string) error {
|
||||
d.lock("DeleteLiveRoute")
|
||||
defer d.unlock()
|
||||
|
||||
var query string
|
||||
var err error
|
||||
|
||||
|
||||
Reference in New Issue
Block a user