5 Commits

Author SHA1 Message Date
a78e5c6e92 Add log.txt to .gitignore 2025-07-28 18:13:07 +02:00
9ef2a22db3 Remove SQLite pragmas that set default values
- Remove page_size, wal_autocheckpoint, locking_mode, mmap_size
- Keep only pragmas that change behavior from defaults
- Increase cache size to 3GB (upper limit for 2.4GB database)
2025-07-28 18:12:25 +02:00
05805b8847 Optimize SQLite settings for better balance
- Reduce cache size from 8GB to 512MB (still plenty for 2.4GB DB)
- Reduce mmap_size from 10GB to 256MB (reasonable default)
- Use default page size (4KB) instead of 8KB
- Use default WAL checkpoint interval (1000 pages)
- Remove redundant pragmas (threads, cache_spill, read_uncommitted)
- Clean up connection string to only use _txlock parameter
- Keep synchronous=OFF for performance (since we have mutex protection)
2025-07-28 18:06:31 +02:00
ddb3cfa4f0 Add mutex to serialize database access
- Add internal mutex to Database struct with lock/unlock wrappers
- Add debug logging for lock acquisition and release with timing
- Wrap all write operations with database mutex
- Use _txlock=immediate in SQLite connection string

This works around apparent issues with SQLite's internal locking
not properly respecting busy_timeout in production environment.
2025-07-28 17:56:26 +02:00
3ef60459b2 Fix SQLite transaction deadlocks with immediate mode
- Add _txlock=immediate to SQLite connection string
- This prevents deadlocks by acquiring write locks at transaction start
- Multiple concurrent writers now queue properly instead of failing instantly
- Resolves 'database table is locked' errors in production
2025-07-28 17:26:42 +02:00
3 changed files with 271017 additions and 125 deletions

1
.gitignore vendored
View File

@@ -35,3 +35,4 @@ pkg/asinfo/asdata.json
# Debug output files
out
log.txt

View File

@@ -10,6 +10,8 @@ import (
"net"
"os"
"path/filepath"
"runtime"
"sync"
"time"
"git.eeqj.de/sneak/routewatch/internal/config"
@@ -45,6 +47,9 @@ type Database struct {
db *sql.DB
logger *logger.Logger
path string
mu sync.Mutex
lockedAt time.Time
lockedBy string
}
// New creates a new database connection and initializes the schema.
@@ -61,8 +66,11 @@ func New(cfg *config.Config, logger *logger.Logger) (*Database, error) {
}
// Add connection parameters for go-sqlite3
// Enable WAL mode and other performance optimizations
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=OFF&cache=shared", dbPath)
// Configure SQLite connection parameters
dsn := fmt.Sprintf(
"file:%s",
dbPath,
)
db, err := sql.Open("sqlite3", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
@@ -73,7 +81,7 @@ func New(cfg *config.Config, logger *logger.Logger) (*Database, error) {
}
// Set connection pool parameters
// Multiple connections for better concurrency
// Multiple connections allow concurrent reads while writes are serialized
const maxConns = 10
db.SetMaxOpenConns(maxConns)
db.SetMaxIdleConns(maxConns)
@@ -90,22 +98,15 @@ func New(cfg *config.Config, logger *logger.Logger) (*Database, error) {
// Initialize creates the database schema if it doesn't exist.
func (d *Database) Initialize() error {
// Set SQLite pragmas for extreme performance - prioritize speed over durability
// Set SQLite pragmas for performance
pragmas := []string{
"PRAGMA journal_mode=WAL", // Write-Ahead Logging
"PRAGMA synchronous=OFF", // Don't wait for disk writes
"PRAGMA cache_size=-8388608", // 8GB cache (negative = KB)
"PRAGMA cache_size=-3145728", // 3GB cache (upper limit for 2.4GB DB)
"PRAGMA temp_store=MEMORY", // Use memory for temp tables
"PRAGMA mmap_size=10737418240", // 10GB memory-mapped I/O
"PRAGMA page_size=8192", // 8KB pages for better performance
"PRAGMA wal_autocheckpoint=100000", // Checkpoint every 100k pages (800MB)
"PRAGMA wal_checkpoint(TRUNCATE)", // Checkpoint and truncate WAL now
"PRAGMA busy_timeout=5000", // 5 second busy timeout
"PRAGMA locking_mode=NORMAL", // Normal locking for multiple connections
"PRAGMA read_uncommitted=true", // Allow dirty reads
"PRAGMA analysis_limit=0", // Disable automatic ANALYZE
"PRAGMA threads=4", // Use multiple threads for sorting
"PRAGMA cache_spill=false", // Keep cache in memory, don't spill to disk
}
for _, pragma := range pragmas {
@@ -133,6 +134,33 @@ func (d *Database) Close() error {
return d.db.Close()
}
// lock acquires the database mutex and logs debug information
func (d *Database) lock(operation string) {
// Get caller information
_, file, line, _ := runtime.Caller(1)
caller := fmt.Sprintf("%s:%d", filepath.Base(file), line)
d.logger.Debug("Acquiring database lock", "operation", operation, "caller", caller)
d.mu.Lock()
d.lockedAt = time.Now()
d.lockedBy = fmt.Sprintf("%s (%s)", operation, caller)
d.logger.Debug("Database lock acquired", "operation", operation, "caller", caller)
}
// unlock releases the database mutex and logs debug information including hold duration
func (d *Database) unlock() {
holdDuration := time.Since(d.lockedAt)
lockedBy := d.lockedBy
d.lockedAt = time.Time{}
d.lockedBy = ""
d.mu.Unlock()
d.logger.Debug("Database lock released", "held_by", lockedBy, "duration_ms", holdDuration.Milliseconds())
}
// beginTx starts a new transaction with logging
func (d *Database) beginTx() (*loggingTx, error) {
tx, err := d.db.Begin()
@@ -149,6 +177,9 @@ func (d *Database) UpsertLiveRouteBatch(routes []*LiveRoute) error {
return nil
}
d.lock("UpsertLiveRouteBatch")
defer d.unlock()
tx, err := d.beginTx()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
@@ -227,6 +258,9 @@ func (d *Database) DeleteLiveRouteBatch(deletions []LiveRouteDeletion) error {
return nil
}
d.lock("DeleteLiveRouteBatch")
defer d.unlock()
tx, err := d.beginTx()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
@@ -294,6 +328,9 @@ func (d *Database) GetOrCreateASNBatch(asns map[int]time.Time) error {
return nil
}
d.lock("GetOrCreateASNBatch")
defer d.unlock()
tx, err := d.beginTx()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
@@ -380,6 +417,9 @@ func (d *Database) GetOrCreateASNBatch(asns map[int]time.Time) error {
// GetOrCreateASN retrieves an existing ASN or creates a new one if it doesn't exist.
func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error) {
d.lock("GetOrCreateASN")
defer d.unlock()
tx, err := d.beginTx()
if err != nil {
return nil, err
@@ -451,6 +491,9 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
// GetOrCreatePrefix retrieves an existing prefix or creates a new one if it doesn't exist.
func (d *Database) GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefix, error) {
d.lock("GetOrCreatePrefix")
defer d.unlock()
tx, err := d.beginTx()
if err != nil {
return nil, err
@@ -513,6 +556,9 @@ func (d *Database) GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefi
// RecordAnnouncement inserts a new BGP announcement or withdrawal into the database.
func (d *Database) RecordAnnouncement(announcement *Announcement) error {
d.lock("RecordAnnouncement")
defer d.unlock()
err := d.exec(`
INSERT INTO announcements (id, prefix_id, asn_id, origin_asn_id, path, next_hop, timestamp, is_withdrawal)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
@@ -538,6 +584,9 @@ func (d *Database) RecordPeering(asA, asB int, timestamp time.Time) error {
asA, asB = asB, asA
}
d.lock("RecordPeering")
defer d.unlock()
tx, err := d.beginTx()
if err != nil {
return err
@@ -587,6 +636,9 @@ func (d *Database) UpdatePeerBatch(peers map[string]PeerUpdate) error {
return nil
}
d.lock("UpdatePeerBatch")
defer d.unlock()
tx, err := d.beginTx()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
@@ -646,6 +698,9 @@ func (d *Database) UpdatePeerBatch(peers map[string]PeerUpdate) error {
// UpdatePeer updates or creates a BGP peer record
func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
d.lock("UpdatePeer")
defer d.unlock()
tx, err := d.beginTx()
if err != nil {
return err
@@ -758,6 +813,9 @@ func (d *Database) GetStats() (Stats, error) {
// UpsertLiveRoute inserts or updates a live route
func (d *Database) UpsertLiveRoute(route *LiveRoute) error {
d.lock("UpsertLiveRoute")
defer d.unlock()
query := `
INSERT INTO live_routes (id, prefix, mask_length, ip_version, origin_asn, peer_ip, as_path, next_hop,
last_updated, v4_ip_start, v4_ip_end)
@@ -807,6 +865,9 @@ func (d *Database) UpsertLiveRoute(route *LiveRoute) error {
// DeleteLiveRoute deletes a live route
// If originASN is 0, deletes all routes for the prefix/peer combination
func (d *Database) DeleteLiveRoute(prefix string, originASN int, peerIP string) error {
d.lock("DeleteLiveRoute")
defer d.unlock()
var query string
var err error

271036
log.txt

File diff suppressed because it is too large Load Diff