Optimize database write performance
- Use SQLite UPSERT for UpdateLiveRoute to eliminate SELECT+UPDATE/INSERT pattern - Add connection string optimizations (synchronous=NORMAL, cache_size) - Add WAL checkpoint configuration for better write performance - Add index on live_routes(id) for UPDATE operations - Set WAL autocheckpoint to 1000 pages These changes should reduce write amplification and improve overall throughput by: 1. Reducing from 2 queries to 1 for route updates 2. Better WAL checkpoint management 3. More efficient UPDATE operations with dedicated index
This commit is contained in:
parent
397ccd21fe
commit
1f8ececedf
@ -95,7 +95,8 @@ func NewWithConfig(config Config, logger *slog.Logger) (*Database, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Add connection parameters for modernc.org/sqlite
|
// Add connection parameters for modernc.org/sqlite
|
||||||
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL", config.Path)
|
// Enable WAL mode and other performance optimizations
|
||||||
|
dsn := fmt.Sprintf("file:%s?_busy_timeout=5000&_journal_mode=WAL&_synchronous=NORMAL&_cache_size=-512000", config.Path)
|
||||||
db, err := sql.Open("sqlite", dsn)
|
db, err := sql.Open("sqlite", dsn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to open database: %w", err)
|
return nil, fmt.Errorf("failed to open database: %w", err)
|
||||||
@ -123,12 +124,14 @@ func NewWithConfig(config Config, logger *slog.Logger) (*Database, error) {
|
|||||||
func (d *Database) Initialize() error {
|
func (d *Database) Initialize() error {
|
||||||
// Set SQLite pragmas for better performance
|
// Set SQLite pragmas for better performance
|
||||||
pragmas := []string{
|
pragmas := []string{
|
||||||
"PRAGMA journal_mode=WAL", // Already set in connection string
|
"PRAGMA journal_mode=WAL", // Already set in connection string
|
||||||
"PRAGMA synchronous=NORMAL", // Faster than FULL, still safe
|
"PRAGMA synchronous=NORMAL", // Faster than FULL, still safe
|
||||||
"PRAGMA cache_size=-524288", // 512MB cache
|
"PRAGMA cache_size=-524288", // 512MB cache
|
||||||
"PRAGMA temp_store=MEMORY", // Use memory for temp tables
|
"PRAGMA temp_store=MEMORY", // Use memory for temp tables
|
||||||
"PRAGMA mmap_size=268435456", // 256MB memory-mapped I/O
|
"PRAGMA mmap_size=268435456", // 256MB memory-mapped I/O
|
||||||
"PRAGMA optimize", // Run optimizer
|
"PRAGMA wal_autocheckpoint=1000", // Checkpoint every 1000 pages
|
||||||
|
"PRAGMA wal_checkpoint(PASSIVE)", // Checkpoint now
|
||||||
|
"PRAGMA optimize", // Run optimizer
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, pragma := range pragmas {
|
for _, pragma := range pragmas {
|
||||||
@ -344,33 +347,18 @@ func (d *Database) UpdateLiveRoute(
|
|||||||
nextHop string,
|
nextHop string,
|
||||||
timestamp time.Time,
|
timestamp time.Time,
|
||||||
) error {
|
) error {
|
||||||
// Check if route already exists
|
// Use SQLite's UPSERT capability to avoid the SELECT+UPDATE/INSERT pattern
|
||||||
var routeID sql.NullString
|
// This reduces the number of queries and improves performance
|
||||||
err := d.queryRow(`
|
err := d.exec(`
|
||||||
SELECT id FROM live_routes
|
INSERT INTO live_routes (id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at, withdrawn_at)
|
||||||
WHERE prefix_id = ? AND origin_asn_id = ? AND peer_asn = ? AND withdrawn_at IS NULL`,
|
VALUES (?, ?, ?, ?, ?, ?, NULL)
|
||||||
prefixID.String(), originASNID.String(), peerASN).Scan(&routeID)
|
ON CONFLICT(prefix_id, origin_asn_id, peer_asn) DO UPDATE SET
|
||||||
|
next_hop = excluded.next_hop,
|
||||||
if err != nil && err != sql.ErrNoRows {
|
announced_at = excluded.announced_at,
|
||||||
return err
|
withdrawn_at = NULL
|
||||||
}
|
WHERE withdrawn_at IS NULL`,
|
||||||
|
generateUUID().String(), prefixID.String(), originASNID.String(),
|
||||||
if routeID.Valid {
|
peerASN, nextHop, timestamp)
|
||||||
// Route exists and is active, update it
|
|
||||||
err = d.exec(`
|
|
||||||
UPDATE live_routes
|
|
||||||
SET next_hop = ?, announced_at = ?
|
|
||||||
WHERE id = ?`,
|
|
||||||
nextHop, timestamp, routeID.String)
|
|
||||||
} else {
|
|
||||||
// Either new route or re-announcement of withdrawn route
|
|
||||||
err = d.exec(`
|
|
||||||
INSERT OR REPLACE INTO live_routes
|
|
||||||
(id, prefix_id, origin_asn_id, peer_asn, next_hop, announced_at, withdrawn_at)
|
|
||||||
VALUES (?, ?, ?, ?, ?, ?, NULL)`,
|
|
||||||
generateUUID().String(), prefixID.String(), originASNID.String(),
|
|
||||||
peerASN, nextHop, timestamp)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -99,6 +99,10 @@ CREATE INDEX IF NOT EXISTS idx_live_routes_covering
|
|||||||
ON live_routes(prefix_id, origin_asn_id, peer_asn, id)
|
ON live_routes(prefix_id, origin_asn_id, peer_asn, id)
|
||||||
WHERE withdrawn_at IS NULL;
|
WHERE withdrawn_at IS NULL;
|
||||||
|
|
||||||
|
-- Index for UPDATE by id operations
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_live_routes_id
|
||||||
|
ON live_routes(id);
|
||||||
|
|
||||||
-- Index for stats queries
|
-- Index for stats queries
|
||||||
CREATE INDEX IF NOT EXISTS idx_live_routes_stats
|
CREATE INDEX IF NOT EXISTS idx_live_routes_stats
|
||||||
ON live_routes(withdrawn_at)
|
ON live_routes(withdrawn_at)
|
||||||
|
Loading…
Reference in New Issue
Block a user