From a25851c25ca1c78a607a481d647eae480b53b805 Mon Sep 17 00:00:00 2001 From: Jeffrey Paul Date: Tue, 5 Nov 2019 16:46:52 -0800 Subject: [PATCH] adding more sanity --- archiver.go | 2 +- instance.go | 38 ++++++++++++++++++---------- locator.go | 73 +++++++++++++++++++++++++++++++++++++++-------------- manager.go | 23 ++++++++++------- 4 files changed, 94 insertions(+), 42 deletions(-) diff --git a/archiver.go b/archiver.go index 11ca0b9..5444a33 100644 --- a/archiver.go +++ b/archiver.go @@ -23,7 +23,7 @@ func (a *TootArchiver) RunForever() { t := time.Now() a.startup = &t - newInstanceHostnameNotifications := make(chan InstanceHostname, 10000) + newInstanceHostnameNotifications := make(chan InstanceHostname) a.locator = NewInstanceLocator() a.manager = NewInstanceManager() diff --git a/instance.go b/instance.go index 5e96bd3..286ef7c 100644 --- a/instance.go +++ b/instance.go @@ -41,7 +41,7 @@ const ( ) type Instance struct { - sync.RWMutex + structLock sync.Mutex errorCount uint successCount uint highestId int @@ -62,14 +62,22 @@ func NewInstance(hostname InstanceHostname) *Instance { self := new(Instance) self.hostname = string(hostname) self.status = InstanceStatusUnknown - self.setNextFetchAfter(1 * time.Second) + self.setNextFetchAfter(86400 * time.Second) return self } +func (self *Instance) Lock() { + self.structLock.Lock() +} + +func (self *Instance) Unlock() { + self.structLock.Unlock() +} + func (self *Instance) bumpFetch() { self.Lock() defer self.Unlock() - self.nextFetch = time.Now().Add(10 * time.Second) + self.nextFetch = time.Now().Add(100 * time.Second) } func (self *Instance) setNextFetchAfter(d time.Duration) { @@ -82,9 +90,10 @@ func (self *Instance) Fetch() { self.fetchingLock.Lock() defer self.fetchingLock.Unlock() + self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) + err := self.detectNodeTypeIfNecessary() if err != nil { - self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) log.Debug(). Str("hostname", self.hostname). Err(err). @@ -93,19 +102,22 @@ func (self *Instance) Fetch() { } self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) - //log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", self.hostname) + log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", self.hostname) } func (self *Instance) dueForFetch() bool { self.Lock() defer self.Unlock() + if !self.identified { + return false + } nf := self.nextFetch return nf.Before(time.Now()) } func (self *Instance) nodeIdentified() bool { - self.RLock() - defer self.RUnlock() + self.Lock() + defer self.Unlock() if self.implementation > Unknown { return true } @@ -121,14 +133,12 @@ func (self *Instance) detectNodeTypeIfNecessary() error { } func (self *Instance) registerError() { - self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) self.Lock() defer self.Unlock() self.errorCount++ } func (self *Instance) registerSuccess() { - self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) self.Lock() defer self.Unlock() self.successCount++ @@ -229,9 +239,9 @@ func (i *Instance) fetchNodeInfo() error { //FIXME make sure the nodeinfourl is on the same domain as the instance //hostname - i.RLock() + i.Lock() url := i.nodeInfoUrl - i.RUnlock() + i.Unlock() resp, err := c.Get(url) @@ -277,7 +287,6 @@ func (i *Instance) fetchNodeInfo() error { i.Lock() i.serverVersionString = ni.Software.Version i.serverImplementationString = ni.Software.Name - ni.Software.Name = strings.ToLower(ni.Software.Name) if ni.Software.Name == "pleroma" { @@ -292,7 +301,10 @@ func (i *Instance) fetchNodeInfo() error { i.registerSuccess() return nil } else if ni.Software.Name == "mastodon" { - i.registerSuccess() + log.Debug(). + Str("hostname", i.hostname). + Str("software", ni.Software.Name). + Msg("detected server software") i.identified = true i.implementation = Mastodon i.status = InstanceStatusIdentified diff --git a/locator.go b/locator.go index 14c5389..e87a27c 100644 --- a/locator.go +++ b/locator.go @@ -7,6 +7,7 @@ import "time" import "sync" import "github.com/rs/zerolog/log" +import "golang.org/x/sync/semaphore" const INDEX_API_TIMEOUT = time.Second * 60 @@ -30,8 +31,6 @@ const pleromaIndexUrl = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api type InstanceLocator struct { pleromaIndexNextRefresh *time.Time mastodonIndexNextRefresh *time.Time - mastodonIndexFetchLock sync.Mutex - pleromaIndexFetchLock sync.Mutex reportInstanceVia chan InstanceHostname sync.Mutex } @@ -52,37 +51,49 @@ func (self *InstanceLocator) AddInstanceNotificationChannel(via chan InstanceHos func (self *InstanceLocator) addInstance(hostname InstanceHostname) { // receiver (InstanceManager) is responsible for de-duping against its - // map + // map, we just spray self.reportInstanceVia <- hostname } func (self *InstanceLocator) Locate() { log.Info().Msg("InstanceLocator starting") x := time.Now() + var pleromaSemaphore = semaphore.NewWeighted(1) + var mastodonSemaphore = semaphore.NewWeighted(1) + for { + log.Info().Msg("InstanceLocator tick") + go func() { - self.pleromaIndexFetchLock.Lock() if self.pleromaIndexNextRefresh.Before(time.Now()) { + if !pleromaSemaphore.TryAcquire(1) { + return + } self.locatePleroma() + pleromaSemaphore.Release(1) } - self.pleromaIndexFetchLock.Unlock() }() + go func() { - self.mastodonIndexFetchLock.Lock() if self.mastodonIndexNextRefresh.Before(time.Now()) { + if !mastodonSemaphore.TryAcquire(1) { + return + } self.locateMastodon() + mastodonSemaphore.Release(1) } - self.mastodonIndexFetchLock.Unlock() }() + time.Sleep(1 * time.Second) + if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) { x = time.Now() log.Debug(). - Str("nextMastodonIndexFetch", self.mastodonIndexNextRefresh.Format(time.RFC3339)). + Str("nextMastodonIndexFetch", time.Now().Sub(*self.mastodonIndexNextRefresh).String()). Send() log.Debug(). - Str("nextPleromaIndexFetch", self.pleromaIndexNextRefresh.Format(time.RFC3339)). + Str("nextMastodonIndexFetch", time.Now().Sub(*self.pleromaIndexNextRefresh).String()). Send() } } @@ -126,6 +137,11 @@ func (self *InstanceLocator) locateMastodon() { return } + t := time.Now().Add(INDEX_CHECK_INTERVAL) + self.Lock() + self.mastodonIndexNextRefresh = &t + self.Unlock() + mi := new(MastodonIndexResponse) err = json.Unmarshal(body, &mi) if err != nil { @@ -137,14 +153,20 @@ func (self *InstanceLocator) locateMastodon() { return } + hosts := make(map[string]bool) + x := 0 for _, instance := range mi.Instances { - self.addInstance(InstanceHostname(instance.Name)) + hosts[instance.Name] = true + x++ + } + log.Info(). + Int("count", x). + Msg("received hosts from mastodon index") + + for k, _ := range hosts { + self.addInstance(InstanceHostname(k)) } - t := time.Now().Add(INDEX_CHECK_INTERVAL) - self.Lock() - self.mastodonIndexNextRefresh = &t - self.Unlock() } func (self *InstanceLocator) locatePleroma() { @@ -182,6 +204,12 @@ func (self *InstanceLocator) locatePleroma() { } // fetch worked + + t := time.Now().Add(INDEX_CHECK_INTERVAL) + self.Lock() + self.pleromaIndexNextRefresh = &t + self.Unlock() + pi := new(PleromaIndexResponse) err = json.Unmarshal(body, &pi) if err != nil { @@ -193,11 +221,18 @@ func (self *InstanceLocator) locatePleroma() { return } + hosts := make(map[string]bool) + x := 0 for _, instance := range *pi { - self.addInstance(InstanceHostname(instance.Domain)) + hosts[instance.Domain] = true + x++ } - t := time.Now().Add(INDEX_CHECK_INTERVAL) - self.Lock() - self.pleromaIndexNextRefresh = &t - self.Unlock() + log.Info(). + Int("count", x). + Msg("received hosts from pleroma index") + + for k, _ := range hosts { + self.addInstance(InstanceHostname(k)) + } + } diff --git a/manager.go b/manager.go index 615ac4d..4770dcc 100644 --- a/manager.go +++ b/manager.go @@ -8,6 +8,8 @@ import "runtime" //import "github.com/gin-gonic/gin" import "github.com/rs/zerolog/log" +const HOST_DISCOVERY_PARALLELISM = 10 + type InstanceBackend interface { //FIXME } @@ -17,11 +19,12 @@ type InstanceManager struct { instances map[InstanceHostname]*Instance newInstanceNotifications chan InstanceHostname startup time.Time - addLock sync.Mutex + hostAdderSemaphore chan bool } func NewInstanceManager() *InstanceManager { i := new(InstanceManager) + i.hostAdderSemaphore = make(chan bool, HOST_DISCOVERY_PARALLELISM) i.instances = make(map[InstanceHostname]*Instance) return i } @@ -110,22 +113,26 @@ func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool { } func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { - // we do these one at a time - self.addLock.Lock() - defer self.addLock.Unlock() - if self.hostnameExists(newhn) { return } + // this blocks on the channel size, limiting concurrency + self.hostAdderSemaphore <- true + i := NewInstance(newhn) // we do node detection under the addLock to avoid thundering // on startup i.detectNodeTypeIfNecessary() + // pop an item from the buffered channel + <-self.hostAdderSemaphore + + // lock the map to insert self.Lock() - defer self.Unlock() self.instances[newhn] = i + self.Unlock() + } func (self *InstanceManager) receiveNewInstanceHostnames() { @@ -134,9 +141,7 @@ func (self *InstanceManager) receiveNewInstanceHostnames() { newhn = <-self.newInstanceNotifications // receive them fast out of the channel, let the adding function lock to add // them one at a time - go func() { - self.addInstanceByHostname(newhn) - }() + go self.addInstanceByHostname(newhn) } }