Add mutex to serialize database access
- Add internal mutex to Database struct with lock/unlock wrappers - Add debug logging for lock acquisition and release with timing - Wrap all write operations with database mutex - Use _txlock=immediate in SQLite connection string This works around apparent issues with SQLite's internal locking not properly respecting busy_timeout in production environment.
This commit is contained in:
@@ -10,6 +10,8 @@ import (
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.eeqj.de/sneak/routewatch/internal/config"
|
||||
@@ -42,9 +44,12 @@ var (
|
||||
|
||||
// Database manages the SQLite database connection and operations.
|
||||
type Database struct {
|
||||
db *sql.DB
|
||||
logger *logger.Logger
|
||||
path string
|
||||
db *sql.DB
|
||||
logger *logger.Logger
|
||||
path string
|
||||
mu sync.Mutex
|
||||
lockedAt time.Time
|
||||
lockedBy string
|
||||
}
|
||||
|
||||
// New creates a new database connection and initializes the schema.
|
||||
@@ -62,9 +67,10 @@ func New(cfg *config.Config, logger *logger.Logger) (*Database, error) {
|
||||
|
||||
// Add connection parameters for go-sqlite3
|
||||
// Enable WAL mode and other performance optimizations
|
||||
// Use immediate transactions to prevent deadlocks when multiple writers compete
|
||||
// Configure SQLite connection parameters
|
||||
// Use _txlock=immediate to acquire locks upfront and respect busy_timeout
|
||||
dsn := fmt.Sprintf(
|
||||
"file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=OFF&cache=shared&_txlock=immediate",
|
||||
"file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=OFF&_txlock=immediate",
|
||||
dbPath,
|
||||
)
|
||||
db, err := sql.Open("sqlite3", dsn)
|
||||
@@ -78,6 +84,7 @@ func New(cfg *config.Config, logger *logger.Logger) (*Database, error) {
|
||||
|
||||
// Set connection pool parameters
|
||||
// Multiple connections for better concurrency
|
||||
// Use multiple connections for read concurrency
|
||||
const maxConns = 10
|
||||
db.SetMaxOpenConns(maxConns)
|
||||
db.SetMaxIdleConns(maxConns)
|
||||
@@ -137,6 +144,33 @@ func (d *Database) Close() error {
|
||||
return d.db.Close()
|
||||
}
|
||||
|
||||
// lock acquires the database mutex and logs debug information
|
||||
func (d *Database) lock(operation string) {
|
||||
// Get caller information
|
||||
_, file, line, _ := runtime.Caller(1)
|
||||
caller := fmt.Sprintf("%s:%d", filepath.Base(file), line)
|
||||
|
||||
d.logger.Debug("Acquiring database lock", "operation", operation, "caller", caller)
|
||||
|
||||
d.mu.Lock()
|
||||
d.lockedAt = time.Now()
|
||||
d.lockedBy = fmt.Sprintf("%s (%s)", operation, caller)
|
||||
|
||||
d.logger.Debug("Database lock acquired", "operation", operation, "caller", caller)
|
||||
}
|
||||
|
||||
// unlock releases the database mutex and logs debug information including hold duration
|
||||
func (d *Database) unlock() {
|
||||
holdDuration := time.Since(d.lockedAt)
|
||||
lockedBy := d.lockedBy
|
||||
|
||||
d.lockedAt = time.Time{}
|
||||
d.lockedBy = ""
|
||||
d.mu.Unlock()
|
||||
|
||||
d.logger.Debug("Database lock released", "held_by", lockedBy, "duration_ms", holdDuration.Milliseconds())
|
||||
}
|
||||
|
||||
// beginTx starts a new transaction with logging
|
||||
func (d *Database) beginTx() (*loggingTx, error) {
|
||||
tx, err := d.db.Begin()
|
||||
@@ -153,6 +187,9 @@ func (d *Database) UpsertLiveRouteBatch(routes []*LiveRoute) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.lock("UpsertLiveRouteBatch")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
@@ -231,6 +268,9 @@ func (d *Database) DeleteLiveRouteBatch(deletions []LiveRouteDeletion) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.lock("DeleteLiveRouteBatch")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
@@ -298,6 +338,9 @@ func (d *Database) GetOrCreateASNBatch(asns map[int]time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.lock("GetOrCreateASNBatch")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
@@ -384,6 +427,9 @@ func (d *Database) GetOrCreateASNBatch(asns map[int]time.Time) error {
|
||||
|
||||
// GetOrCreateASN retrieves an existing ASN or creates a new one if it doesn't exist.
|
||||
func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error) {
|
||||
d.lock("GetOrCreateASN")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -455,6 +501,9 @@ func (d *Database) GetOrCreateASN(number int, timestamp time.Time) (*ASN, error)
|
||||
|
||||
// GetOrCreatePrefix retrieves an existing prefix or creates a new one if it doesn't exist.
|
||||
func (d *Database) GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefix, error) {
|
||||
d.lock("GetOrCreatePrefix")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -517,6 +566,9 @@ func (d *Database) GetOrCreatePrefix(prefix string, timestamp time.Time) (*Prefi
|
||||
|
||||
// RecordAnnouncement inserts a new BGP announcement or withdrawal into the database.
|
||||
func (d *Database) RecordAnnouncement(announcement *Announcement) error {
|
||||
d.lock("RecordAnnouncement")
|
||||
defer d.unlock()
|
||||
|
||||
err := d.exec(`
|
||||
INSERT INTO announcements (id, prefix_id, asn_id, origin_asn_id, path, next_hop, timestamp, is_withdrawal)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
@@ -542,6 +594,9 @@ func (d *Database) RecordPeering(asA, asB int, timestamp time.Time) error {
|
||||
asA, asB = asB, asA
|
||||
}
|
||||
|
||||
d.lock("RecordPeering")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -591,6 +646,9 @@ func (d *Database) UpdatePeerBatch(peers map[string]PeerUpdate) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.lock("UpdatePeerBatch")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
@@ -650,6 +708,9 @@ func (d *Database) UpdatePeerBatch(peers map[string]PeerUpdate) error {
|
||||
|
||||
// UpdatePeer updates or creates a BGP peer record
|
||||
func (d *Database) UpdatePeer(peerIP string, peerASN int, messageType string, timestamp time.Time) error {
|
||||
d.lock("UpdatePeer")
|
||||
defer d.unlock()
|
||||
|
||||
tx, err := d.beginTx()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -762,6 +823,9 @@ func (d *Database) GetStats() (Stats, error) {
|
||||
|
||||
// UpsertLiveRoute inserts or updates a live route
|
||||
func (d *Database) UpsertLiveRoute(route *LiveRoute) error {
|
||||
d.lock("UpsertLiveRoute")
|
||||
defer d.unlock()
|
||||
|
||||
query := `
|
||||
INSERT INTO live_routes (id, prefix, mask_length, ip_version, origin_asn, peer_ip, as_path, next_hop,
|
||||
last_updated, v4_ip_start, v4_ip_end)
|
||||
@@ -811,6 +875,9 @@ func (d *Database) UpsertLiveRoute(route *LiveRoute) error {
|
||||
// DeleteLiveRoute deletes a live route
|
||||
// If originASN is 0, deletes all routes for the prefix/peer combination
|
||||
func (d *Database) DeleteLiveRoute(prefix string, originASN int, peerIP string) error {
|
||||
d.lock("DeleteLiveRoute")
|
||||
defer d.unlock()
|
||||
|
||||
var query string
|
||||
var err error
|
||||
|
||||
|
||||
Reference in New Issue
Block a user