Add adaptive rate limiting to ASN WHOIS fetcher

- Reduce base interval from 60s to 15s for faster initial fetching
- Add exponential backoff on failure (up to 5 minute max interval)
- Decrease interval on success (down to 10 second minimum)
- Add mutex to prevent concurrent WHOIS fetches
- Track consecutive failures for backoff calculation
This commit is contained in:
Jeffrey Paul 2025-12-27 15:51:06 +07:00
parent 3b159454eb
commit 8eaf4e5f4b

View File

@ -13,8 +13,17 @@ import (
// ASN fetcher configuration constants.
const (
// backgroundFetchInterval is how often the background fetcher runs.
backgroundFetchInterval = time.Minute
// baseInterval is the starting interval between fetch attempts.
baseInterval = 15 * time.Second
// minInterval is the minimum interval after successes (rate limit).
minInterval = 10 * time.Second
// maxInterval is the maximum interval after failures (backoff cap).
maxInterval = 5 * time.Minute
// backoffMultiplier is how much to multiply interval on failure.
backoffMultiplier = 2
// whoisStaleThreshold is how old WHOIS data can be before refresh.
whoisStaleThreshold = 30 * 24 * time.Hour // 30 days
@ -31,16 +40,25 @@ type ASNFetcher struct {
immediateQueue chan int
stopCh chan struct{}
wg sync.WaitGroup
// fetchMu ensures only one fetch runs at a time
fetchMu sync.Mutex
// interval tracking with mutex protection
intervalMu sync.Mutex
currentInterval time.Duration
consecutiveFails int
}
// NewASNFetcher creates a new ASN fetcher.
func NewASNFetcher(db database.Store, logger *slog.Logger) *ASNFetcher {
return &ASNFetcher{
db: db,
whoisClient: whois.NewClient(),
logger: logger.With("component", "asn_fetcher"),
immediateQueue: make(chan int, immediateQueueSize),
stopCh: make(chan struct{}),
db: db,
whoisClient: whois.NewClient(),
logger: logger.With("component", "asn_fetcher"),
immediateQueue: make(chan int, immediateQueueSize),
stopCh: make(chan struct{}),
currentInterval: baseInterval,
}
}
@ -48,7 +66,11 @@ func NewASNFetcher(db database.Store, logger *slog.Logger) *ASNFetcher {
func (f *ASNFetcher) Start() {
f.wg.Add(1)
go f.run()
f.logger.Info("ASN fetcher started", "interval", backgroundFetchInterval)
f.logger.Info("ASN fetcher started",
"base_interval", baseInterval,
"min_interval", minInterval,
"max_interval", maxInterval,
)
}
// Stop gracefully shuts down the fetcher.
@ -69,12 +91,65 @@ func (f *ASNFetcher) QueueImmediate(asn int) {
}
}
// getInterval returns the current fetch interval.
func (f *ASNFetcher) getInterval() time.Duration {
f.intervalMu.Lock()
defer f.intervalMu.Unlock()
return f.currentInterval
}
// recordSuccess decreases the interval on successful fetch.
func (f *ASNFetcher) recordSuccess() {
f.intervalMu.Lock()
defer f.intervalMu.Unlock()
f.consecutiveFails = 0
// Decrease interval by half, but not below minimum
newInterval := f.currentInterval / backoffMultiplier
if newInterval < minInterval {
newInterval = minInterval
}
if newInterval != f.currentInterval {
f.logger.Debug("Decreased fetch interval",
"old_interval", f.currentInterval,
"new_interval", newInterval,
)
f.currentInterval = newInterval
}
}
// recordFailure increases the interval on failed fetch using exponential backoff.
func (f *ASNFetcher) recordFailure() {
f.intervalMu.Lock()
defer f.intervalMu.Unlock()
f.consecutiveFails++
// Exponential backoff: multiply by 2, capped at max
newInterval := f.currentInterval * backoffMultiplier
if newInterval > maxInterval {
newInterval = maxInterval
}
if newInterval != f.currentInterval {
f.logger.Debug("Increased fetch interval due to failure",
"old_interval", f.currentInterval,
"new_interval", newInterval,
"consecutive_failures", f.consecutiveFails,
)
f.currentInterval = newInterval
}
}
// run is the main background loop.
func (f *ASNFetcher) run() {
defer f.wg.Done()
ticker := time.NewTicker(backgroundFetchInterval)
defer ticker.Stop()
timer := time.NewTimer(f.getInterval())
defer timer.Stop()
for {
select {
@ -82,18 +157,44 @@ func (f *ASNFetcher) run() {
return
case asn := <-f.immediateQueue:
// Process immediate request
f.fetchAndUpdate(asn)
// Process immediate request (respects lock)
f.tryFetch(asn)
// Reset timer after immediate fetch
timer.Reset(f.getInterval())
case <-ticker.C:
case <-timer.C:
// Background fetch of stale/missing ASN
f.fetchNextStale()
// Reset timer with potentially updated interval
timer.Reset(f.getInterval())
}
}
}
// tryFetch attempts to fetch and update an ASN, respecting the fetch lock.
// Returns true if fetch was successful.
func (f *ASNFetcher) tryFetch(asn int) bool {
// Try to acquire lock, skip if another fetch is running
if !f.fetchMu.TryLock() {
f.logger.Debug("Skipping fetch, another fetch in progress", "asn", asn)
return false
}
defer f.fetchMu.Unlock()
return f.fetchAndUpdate(asn)
}
// fetchNextStale finds and fetches the next ASN needing WHOIS data.
func (f *ASNFetcher) fetchNextStale() {
// Try to acquire lock, skip if another fetch is running
if !f.fetchMu.TryLock() {
f.logger.Debug("Skipping stale fetch, another fetch in progress")
return
}
defer f.fetchMu.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
@ -101,7 +202,9 @@ func (f *ASNFetcher) fetchNextStale() {
if err != nil {
if err != database.ErrNoStaleASN {
f.logger.Error("Failed to get stale ASN", "error", err)
f.recordFailure()
}
// No stale ASN is not a failure, just nothing to do
return
}
@ -110,7 +213,8 @@ func (f *ASNFetcher) fetchNextStale() {
}
// fetchAndUpdate performs a WHOIS lookup and updates the database.
func (f *ASNFetcher) fetchAndUpdate(asn int) {
// Returns true if successful.
func (f *ASNFetcher) fetchAndUpdate(asn int) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
@ -119,8 +223,9 @@ func (f *ASNFetcher) fetchAndUpdate(asn int) {
info, err := f.whoisClient.LookupASN(ctx, asn)
if err != nil {
f.logger.Error("WHOIS lookup failed", "asn", asn, "error", err)
f.recordFailure()
return
return false
}
// Update database with WHOIS data
@ -142,14 +247,19 @@ func (f *ASNFetcher) fetchAndUpdate(asn int) {
})
if err != nil {
f.logger.Error("Failed to update ASN WHOIS data", "asn", asn, "error", err)
f.recordFailure()
return
return false
}
f.recordSuccess()
f.logger.Info("Updated ASN WHOIS data",
"asn", asn,
"org_name", info.OrgName,
"country", info.CountryCode,
"rir", info.RIR,
"next_interval", f.getInterval(),
)
return true
}