diff --git a/internal/routewatch/asnfetcher.go b/internal/routewatch/asnfetcher.go index f883a56..6117f11 100644 --- a/internal/routewatch/asnfetcher.go +++ b/internal/routewatch/asnfetcher.go @@ -13,8 +13,17 @@ import ( // ASN fetcher configuration constants. const ( - // backgroundFetchInterval is how often the background fetcher runs. - backgroundFetchInterval = time.Minute + // baseInterval is the starting interval between fetch attempts. + baseInterval = 15 * time.Second + + // minInterval is the minimum interval after successes (rate limit). + minInterval = 10 * time.Second + + // maxInterval is the maximum interval after failures (backoff cap). + maxInterval = 5 * time.Minute + + // backoffMultiplier is how much to multiply interval on failure. + backoffMultiplier = 2 // whoisStaleThreshold is how old WHOIS data can be before refresh. whoisStaleThreshold = 30 * 24 * time.Hour // 30 days @@ -31,16 +40,25 @@ type ASNFetcher struct { immediateQueue chan int stopCh chan struct{} wg sync.WaitGroup + + // fetchMu ensures only one fetch runs at a time + fetchMu sync.Mutex + + // interval tracking with mutex protection + intervalMu sync.Mutex + currentInterval time.Duration + consecutiveFails int } // NewASNFetcher creates a new ASN fetcher. func NewASNFetcher(db database.Store, logger *slog.Logger) *ASNFetcher { return &ASNFetcher{ - db: db, - whoisClient: whois.NewClient(), - logger: logger.With("component", "asn_fetcher"), - immediateQueue: make(chan int, immediateQueueSize), - stopCh: make(chan struct{}), + db: db, + whoisClient: whois.NewClient(), + logger: logger.With("component", "asn_fetcher"), + immediateQueue: make(chan int, immediateQueueSize), + stopCh: make(chan struct{}), + currentInterval: baseInterval, } } @@ -48,7 +66,11 @@ func NewASNFetcher(db database.Store, logger *slog.Logger) *ASNFetcher { func (f *ASNFetcher) Start() { f.wg.Add(1) go f.run() - f.logger.Info("ASN fetcher started", "interval", backgroundFetchInterval) + f.logger.Info("ASN fetcher started", + "base_interval", baseInterval, + "min_interval", minInterval, + "max_interval", maxInterval, + ) } // Stop gracefully shuts down the fetcher. @@ -69,12 +91,65 @@ func (f *ASNFetcher) QueueImmediate(asn int) { } } +// getInterval returns the current fetch interval. +func (f *ASNFetcher) getInterval() time.Duration { + f.intervalMu.Lock() + defer f.intervalMu.Unlock() + + return f.currentInterval +} + +// recordSuccess decreases the interval on successful fetch. +func (f *ASNFetcher) recordSuccess() { + f.intervalMu.Lock() + defer f.intervalMu.Unlock() + + f.consecutiveFails = 0 + + // Decrease interval by half, but not below minimum + newInterval := f.currentInterval / backoffMultiplier + if newInterval < minInterval { + newInterval = minInterval + } + + if newInterval != f.currentInterval { + f.logger.Debug("Decreased fetch interval", + "old_interval", f.currentInterval, + "new_interval", newInterval, + ) + f.currentInterval = newInterval + } +} + +// recordFailure increases the interval on failed fetch using exponential backoff. +func (f *ASNFetcher) recordFailure() { + f.intervalMu.Lock() + defer f.intervalMu.Unlock() + + f.consecutiveFails++ + + // Exponential backoff: multiply by 2, capped at max + newInterval := f.currentInterval * backoffMultiplier + if newInterval > maxInterval { + newInterval = maxInterval + } + + if newInterval != f.currentInterval { + f.logger.Debug("Increased fetch interval due to failure", + "old_interval", f.currentInterval, + "new_interval", newInterval, + "consecutive_failures", f.consecutiveFails, + ) + f.currentInterval = newInterval + } +} + // run is the main background loop. func (f *ASNFetcher) run() { defer f.wg.Done() - ticker := time.NewTicker(backgroundFetchInterval) - defer ticker.Stop() + timer := time.NewTimer(f.getInterval()) + defer timer.Stop() for { select { @@ -82,18 +157,44 @@ func (f *ASNFetcher) run() { return case asn := <-f.immediateQueue: - // Process immediate request - f.fetchAndUpdate(asn) + // Process immediate request (respects lock) + f.tryFetch(asn) + // Reset timer after immediate fetch + timer.Reset(f.getInterval()) - case <-ticker.C: + case <-timer.C: // Background fetch of stale/missing ASN f.fetchNextStale() + // Reset timer with potentially updated interval + timer.Reset(f.getInterval()) } } } +// tryFetch attempts to fetch and update an ASN, respecting the fetch lock. +// Returns true if fetch was successful. +func (f *ASNFetcher) tryFetch(asn int) bool { + // Try to acquire lock, skip if another fetch is running + if !f.fetchMu.TryLock() { + f.logger.Debug("Skipping fetch, another fetch in progress", "asn", asn) + + return false + } + defer f.fetchMu.Unlock() + + return f.fetchAndUpdate(asn) +} + // fetchNextStale finds and fetches the next ASN needing WHOIS data. func (f *ASNFetcher) fetchNextStale() { + // Try to acquire lock, skip if another fetch is running + if !f.fetchMu.TryLock() { + f.logger.Debug("Skipping stale fetch, another fetch in progress") + + return + } + defer f.fetchMu.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() @@ -101,7 +202,9 @@ func (f *ASNFetcher) fetchNextStale() { if err != nil { if err != database.ErrNoStaleASN { f.logger.Error("Failed to get stale ASN", "error", err) + f.recordFailure() } + // No stale ASN is not a failure, just nothing to do return } @@ -110,7 +213,8 @@ func (f *ASNFetcher) fetchNextStale() { } // fetchAndUpdate performs a WHOIS lookup and updates the database. -func (f *ASNFetcher) fetchAndUpdate(asn int) { +// Returns true if successful. +func (f *ASNFetcher) fetchAndUpdate(asn int) bool { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() @@ -119,8 +223,9 @@ func (f *ASNFetcher) fetchAndUpdate(asn int) { info, err := f.whoisClient.LookupASN(ctx, asn) if err != nil { f.logger.Error("WHOIS lookup failed", "asn", asn, "error", err) + f.recordFailure() - return + return false } // Update database with WHOIS data @@ -142,14 +247,19 @@ func (f *ASNFetcher) fetchAndUpdate(asn int) { }) if err != nil { f.logger.Error("Failed to update ASN WHOIS data", "asn", asn, "error", err) + f.recordFailure() - return + return false } + f.recordSuccess() f.logger.Info("Updated ASN WHOIS data", "asn", asn, "org_name", info.OrgName, "country", info.CountryCode, "rir", info.RIR, + "next_interval", f.getInterval(), ) + + return true }