adding more sanity

This commit is contained in:
Jeffrey Paul 2019-11-05 16:46:52 -08:00
parent 1c7e2f11e0
commit a25851c25c
4 changed files with 94 additions and 42 deletions

View File

@ -23,7 +23,7 @@ func (a *TootArchiver) RunForever() {
t := time.Now() t := time.Now()
a.startup = &t a.startup = &t
newInstanceHostnameNotifications := make(chan InstanceHostname, 10000) newInstanceHostnameNotifications := make(chan InstanceHostname)
a.locator = NewInstanceLocator() a.locator = NewInstanceLocator()
a.manager = NewInstanceManager() a.manager = NewInstanceManager()

View File

@ -41,7 +41,7 @@ const (
) )
type Instance struct { type Instance struct {
sync.RWMutex structLock sync.Mutex
errorCount uint errorCount uint
successCount uint successCount uint
highestId int highestId int
@ -62,14 +62,22 @@ func NewInstance(hostname InstanceHostname) *Instance {
self := new(Instance) self := new(Instance)
self.hostname = string(hostname) self.hostname = string(hostname)
self.status = InstanceStatusUnknown self.status = InstanceStatusUnknown
self.setNextFetchAfter(1 * time.Second) self.setNextFetchAfter(86400 * time.Second)
return self return self
} }
func (self *Instance) Lock() {
self.structLock.Lock()
}
func (self *Instance) Unlock() {
self.structLock.Unlock()
}
func (self *Instance) bumpFetch() { func (self *Instance) bumpFetch() {
self.Lock() self.Lock()
defer self.Unlock() defer self.Unlock()
self.nextFetch = time.Now().Add(10 * time.Second) self.nextFetch = time.Now().Add(100 * time.Second)
} }
func (self *Instance) setNextFetchAfter(d time.Duration) { func (self *Instance) setNextFetchAfter(d time.Duration) {
@ -82,9 +90,10 @@ func (self *Instance) Fetch() {
self.fetchingLock.Lock() self.fetchingLock.Lock()
defer self.fetchingLock.Unlock() defer self.fetchingLock.Unlock()
self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL)
err := self.detectNodeTypeIfNecessary() err := self.detectNodeTypeIfNecessary()
if err != nil { if err != nil {
self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL)
log.Debug(). log.Debug().
Str("hostname", self.hostname). Str("hostname", self.hostname).
Err(err). Err(err).
@ -93,19 +102,22 @@ func (self *Instance) Fetch() {
} }
self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL)
//log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", self.hostname) log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", self.hostname)
} }
func (self *Instance) dueForFetch() bool { func (self *Instance) dueForFetch() bool {
self.Lock() self.Lock()
defer self.Unlock() defer self.Unlock()
if !self.identified {
return false
}
nf := self.nextFetch nf := self.nextFetch
return nf.Before(time.Now()) return nf.Before(time.Now())
} }
func (self *Instance) nodeIdentified() bool { func (self *Instance) nodeIdentified() bool {
self.RLock() self.Lock()
defer self.RUnlock() defer self.Unlock()
if self.implementation > Unknown { if self.implementation > Unknown {
return true return true
} }
@ -121,14 +133,12 @@ func (self *Instance) detectNodeTypeIfNecessary() error {
} }
func (self *Instance) registerError() { func (self *Instance) registerError() {
self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL)
self.Lock() self.Lock()
defer self.Unlock() defer self.Unlock()
self.errorCount++ self.errorCount++
} }
func (self *Instance) registerSuccess() { func (self *Instance) registerSuccess() {
self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL)
self.Lock() self.Lock()
defer self.Unlock() defer self.Unlock()
self.successCount++ self.successCount++
@ -229,9 +239,9 @@ func (i *Instance) fetchNodeInfo() error {
//FIXME make sure the nodeinfourl is on the same domain as the instance //FIXME make sure the nodeinfourl is on the same domain as the instance
//hostname //hostname
i.RLock() i.Lock()
url := i.nodeInfoUrl url := i.nodeInfoUrl
i.RUnlock() i.Unlock()
resp, err := c.Get(url) resp, err := c.Get(url)
@ -277,7 +287,6 @@ func (i *Instance) fetchNodeInfo() error {
i.Lock() i.Lock()
i.serverVersionString = ni.Software.Version i.serverVersionString = ni.Software.Version
i.serverImplementationString = ni.Software.Name i.serverImplementationString = ni.Software.Name
ni.Software.Name = strings.ToLower(ni.Software.Name) ni.Software.Name = strings.ToLower(ni.Software.Name)
if ni.Software.Name == "pleroma" { if ni.Software.Name == "pleroma" {
@ -292,7 +301,10 @@ func (i *Instance) fetchNodeInfo() error {
i.registerSuccess() i.registerSuccess()
return nil return nil
} else if ni.Software.Name == "mastodon" { } else if ni.Software.Name == "mastodon" {
i.registerSuccess() log.Debug().
Str("hostname", i.hostname).
Str("software", ni.Software.Name).
Msg("detected server software")
i.identified = true i.identified = true
i.implementation = Mastodon i.implementation = Mastodon
i.status = InstanceStatusIdentified i.status = InstanceStatusIdentified

View File

@ -7,6 +7,7 @@ import "time"
import "sync" import "sync"
import "github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
import "golang.org/x/sync/semaphore"
const INDEX_API_TIMEOUT = time.Second * 60 const INDEX_API_TIMEOUT = time.Second * 60
@ -30,8 +31,6 @@ const pleromaIndexUrl = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api
type InstanceLocator struct { type InstanceLocator struct {
pleromaIndexNextRefresh *time.Time pleromaIndexNextRefresh *time.Time
mastodonIndexNextRefresh *time.Time mastodonIndexNextRefresh *time.Time
mastodonIndexFetchLock sync.Mutex
pleromaIndexFetchLock sync.Mutex
reportInstanceVia chan InstanceHostname reportInstanceVia chan InstanceHostname
sync.Mutex sync.Mutex
} }
@ -52,37 +51,49 @@ func (self *InstanceLocator) AddInstanceNotificationChannel(via chan InstanceHos
func (self *InstanceLocator) addInstance(hostname InstanceHostname) { func (self *InstanceLocator) addInstance(hostname InstanceHostname) {
// receiver (InstanceManager) is responsible for de-duping against its // receiver (InstanceManager) is responsible for de-duping against its
// map // map, we just spray
self.reportInstanceVia <- hostname self.reportInstanceVia <- hostname
} }
func (self *InstanceLocator) Locate() { func (self *InstanceLocator) Locate() {
log.Info().Msg("InstanceLocator starting") log.Info().Msg("InstanceLocator starting")
x := time.Now() x := time.Now()
var pleromaSemaphore = semaphore.NewWeighted(1)
var mastodonSemaphore = semaphore.NewWeighted(1)
for { for {
log.Info().Msg("InstanceLocator tick") log.Info().Msg("InstanceLocator tick")
go func() { go func() {
self.pleromaIndexFetchLock.Lock()
if self.pleromaIndexNextRefresh.Before(time.Now()) { if self.pleromaIndexNextRefresh.Before(time.Now()) {
if !pleromaSemaphore.TryAcquire(1) {
return
}
self.locatePleroma() self.locatePleroma()
pleromaSemaphore.Release(1)
} }
self.pleromaIndexFetchLock.Unlock()
}() }()
go func() { go func() {
self.mastodonIndexFetchLock.Lock()
if self.mastodonIndexNextRefresh.Before(time.Now()) { if self.mastodonIndexNextRefresh.Before(time.Now()) {
if !mastodonSemaphore.TryAcquire(1) {
return
}
self.locateMastodon() self.locateMastodon()
mastodonSemaphore.Release(1)
} }
self.mastodonIndexFetchLock.Unlock()
}() }()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) { if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) {
x = time.Now() x = time.Now()
log.Debug(). log.Debug().
Str("nextMastodonIndexFetch", self.mastodonIndexNextRefresh.Format(time.RFC3339)). Str("nextMastodonIndexFetch", time.Now().Sub(*self.mastodonIndexNextRefresh).String()).
Send() Send()
log.Debug(). log.Debug().
Str("nextPleromaIndexFetch", self.pleromaIndexNextRefresh.Format(time.RFC3339)). Str("nextMastodonIndexFetch", time.Now().Sub(*self.pleromaIndexNextRefresh).String()).
Send() Send()
} }
} }
@ -126,6 +137,11 @@ func (self *InstanceLocator) locateMastodon() {
return return
} }
t := time.Now().Add(INDEX_CHECK_INTERVAL)
self.Lock()
self.mastodonIndexNextRefresh = &t
self.Unlock()
mi := new(MastodonIndexResponse) mi := new(MastodonIndexResponse)
err = json.Unmarshal(body, &mi) err = json.Unmarshal(body, &mi)
if err != nil { if err != nil {
@ -137,14 +153,20 @@ func (self *InstanceLocator) locateMastodon() {
return return
} }
hosts := make(map[string]bool)
x := 0
for _, instance := range mi.Instances { for _, instance := range mi.Instances {
self.addInstance(InstanceHostname(instance.Name)) hosts[instance.Name] = true
x++
}
log.Info().
Int("count", x).
Msg("received hosts from mastodon index")
for k, _ := range hosts {
self.addInstance(InstanceHostname(k))
} }
t := time.Now().Add(INDEX_CHECK_INTERVAL)
self.Lock()
self.mastodonIndexNextRefresh = &t
self.Unlock()
} }
func (self *InstanceLocator) locatePleroma() { func (self *InstanceLocator) locatePleroma() {
@ -182,6 +204,12 @@ func (self *InstanceLocator) locatePleroma() {
} }
// fetch worked // fetch worked
t := time.Now().Add(INDEX_CHECK_INTERVAL)
self.Lock()
self.pleromaIndexNextRefresh = &t
self.Unlock()
pi := new(PleromaIndexResponse) pi := new(PleromaIndexResponse)
err = json.Unmarshal(body, &pi) err = json.Unmarshal(body, &pi)
if err != nil { if err != nil {
@ -193,11 +221,18 @@ func (self *InstanceLocator) locatePleroma() {
return return
} }
hosts := make(map[string]bool)
x := 0
for _, instance := range *pi { for _, instance := range *pi {
self.addInstance(InstanceHostname(instance.Domain)) hosts[instance.Domain] = true
x++
} }
t := time.Now().Add(INDEX_CHECK_INTERVAL) log.Info().
self.Lock() Int("count", x).
self.pleromaIndexNextRefresh = &t Msg("received hosts from pleroma index")
self.Unlock()
for k, _ := range hosts {
self.addInstance(InstanceHostname(k))
}
} }

View File

@ -8,6 +8,8 @@ import "runtime"
//import "github.com/gin-gonic/gin" //import "github.com/gin-gonic/gin"
import "github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
const HOST_DISCOVERY_PARALLELISM = 10
type InstanceBackend interface { type InstanceBackend interface {
//FIXME //FIXME
} }
@ -17,11 +19,12 @@ type InstanceManager struct {
instances map[InstanceHostname]*Instance instances map[InstanceHostname]*Instance
newInstanceNotifications chan InstanceHostname newInstanceNotifications chan InstanceHostname
startup time.Time startup time.Time
addLock sync.Mutex hostAdderSemaphore chan bool
} }
func NewInstanceManager() *InstanceManager { func NewInstanceManager() *InstanceManager {
i := new(InstanceManager) i := new(InstanceManager)
i.hostAdderSemaphore = make(chan bool, HOST_DISCOVERY_PARALLELISM)
i.instances = make(map[InstanceHostname]*Instance) i.instances = make(map[InstanceHostname]*Instance)
return i return i
} }
@ -110,22 +113,26 @@ func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool {
} }
func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) {
// we do these one at a time
self.addLock.Lock()
defer self.addLock.Unlock()
if self.hostnameExists(newhn) { if self.hostnameExists(newhn) {
return return
} }
// this blocks on the channel size, limiting concurrency
self.hostAdderSemaphore <- true
i := NewInstance(newhn) i := NewInstance(newhn)
// we do node detection under the addLock to avoid thundering // we do node detection under the addLock to avoid thundering
// on startup // on startup
i.detectNodeTypeIfNecessary() i.detectNodeTypeIfNecessary()
// pop an item from the buffered channel
<-self.hostAdderSemaphore
// lock the map to insert
self.Lock() self.Lock()
defer self.Unlock()
self.instances[newhn] = i self.instances[newhn] = i
self.Unlock()
} }
func (self *InstanceManager) receiveNewInstanceHostnames() { func (self *InstanceManager) receiveNewInstanceHostnames() {
@ -134,9 +141,7 @@ func (self *InstanceManager) receiveNewInstanceHostnames() {
newhn = <-self.newInstanceNotifications newhn = <-self.newInstanceNotifications
// receive them fast out of the channel, let the adding function lock to add // receive them fast out of the channel, let the adding function lock to add
// them one at a time // them one at a time
go func() { go self.addInstanceByHostname(newhn)
self.addInstanceByHostname(newhn)
}()
} }
} }