From 66318d85f24a462a0a40124ef9548da6e38e23a8 Mon Sep 17 00:00:00 2001 From: Jeffrey Paul Date: Mon, 4 Nov 2019 09:07:04 -0800 Subject: [PATCH] latest --- api.go | 3 +- archiver.go | 2 +- instance.go | 79 +++++++++++++++++--------------- instancelocator.go => locator.go | 0 instancemanager.go => manager.go | 78 ++++++++++++++++++------------- 5 files changed, 92 insertions(+), 70 deletions(-) rename instancelocator.go => locator.go (100%) rename instancemanager.go => manager.go (72%) diff --git a/api.go b/api.go index 66051ba..7fde215 100644 --- a/api.go +++ b/api.go @@ -48,11 +48,10 @@ func (a *TootArchiverAPIServer) getRouter() *gin.Engine { }) r.GET("/", func(c *gin.Context) { - ir := a.archiver.manager.instanceReport() + ir := a.archiver.manager.instanceSummaryReport() il := a.archiver.manager.instanceListForApi() c.JSON(200, gin.H{ - // FIXME(sneak) add more stuff here "status": "ok", "now": time.Now().UTC().Format(time.RFC3339), "uptime": a.archiver.Uptime().String(), diff --git a/archiver.go b/archiver.go index 11c4d5c..c91386f 100644 --- a/archiver.go +++ b/archiver.go @@ -23,7 +23,7 @@ func (a *TootArchiver) RunForever() { t := time.Now() a.startup = &t - newInstanceHostnameNotifications := make(chan InstanceHostname) + newInstanceHostnameNotifications := make(chan InstanceHostname, 10000) a.locator = NewInstanceLocator() diff --git a/instance.go b/instance.go index 3aee79e..f2004c7 100644 --- a/instance.go +++ b/instance.go @@ -9,6 +9,7 @@ import "sync" import "time" import "errors" +import "github.com/gin-gonic/gin" import "github.com/rs/zerolog/log" const NodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0" @@ -54,30 +55,26 @@ type Instance struct { } func NewInstance(hostname InstanceHostname) *Instance { - i := new(Instance) - i.hostname = string(hostname) - i.status = InstanceStatusUnknown - i.nextFetch = time.Now().Add(-1 * time.Second) - // FIXME make checks detect the node type instead of in the constructor - return i + self := new(Instance) + self.hostname = string(hostname) + self.status = InstanceStatusUnknown + self.nextFetch = time.Now().Add(-1 * time.Second) + return self } -func (i *Instance) setNextFetchAfter(d time.Duration) { - i.Lock() - defer i.Unlock() - i.nextFetch = time.Now().Add(d) -} - -func (self *Instance) setFetching(f bool) { +func (self *Instance) bumpFetch() { self.Lock() defer self.Unlock() - self.fetching = f + self.nextFetch = time.Now().Add(10 * time.Second) +} + +func (self *Instance) setNextFetchAfter(d time.Duration) { + self.Lock() + defer self.Unlock() + self.nextFetch = time.Now().Add(d) } func (self *Instance) Fetch() { - self.setFetching(true) - defer self.setFetching(false) - err := self.detectNodeTypeIfNecessary() if err != nil { self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) @@ -93,36 +90,46 @@ func (self *Instance) Fetch() { } func (self *Instance) dueForFetch() bool { + self.Lock() + defer self.Unlock() + nf := self.nextFetch + return nf.Before(time.Now()) +} + +func (self *Instance) nodeIdentified() bool { self.RLock() defer self.RUnlock() - if self.fetching { - return false + if self.impl > Unknown { + return true } - return self.nextFetch.Before(time.Now()) + return false } -func (i *Instance) detectNodeTypeIfNecessary() error { - i.RLock() - if i.impl > Unknown { - i.RUnlock() +func (self *Instance) detectNodeTypeIfNecessary() error { + if !self.nodeIdentified() { + return self.fetchNodeInfo() + } else { return nil } - i.RUnlock() - return i.fetchNodeInfo() } -func (i *Instance) registerError() { - i.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) - i.Lock() - defer i.Unlock() - i.errorCount++ +func (self *Instance) registerError() { + self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) + self.Lock() + defer self.Unlock() + self.errorCount++ } -func (i *Instance) registerSuccess() { - i.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) - i.Lock() - defer i.Unlock() - i.successCount++ +func (self *Instance) registerSuccess() { + self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) + self.Lock() + defer self.Unlock() + self.successCount++ +} + +func (self *Instance) ApiReport() *gin.H { + r := gin.H{} + return &r } func (i *Instance) Up() bool { diff --git a/instancelocator.go b/locator.go similarity index 100% rename from instancelocator.go rename to locator.go diff --git a/instancemanager.go b/manager.go similarity index 72% rename from instancemanager.go rename to manager.go index 5bb1bf3..ee24432 100644 --- a/instancemanager.go +++ b/manager.go @@ -5,6 +5,7 @@ import "time" import "fmt" import "runtime" +import "github.com/gin-gonic/gin" import "github.com/rs/zerolog/log" type InstanceBackend interface { @@ -66,48 +67,54 @@ func (self *InstanceManager) AddInstanceNotificationChannel(via chan InstanceHos } func (self *InstanceManager) Manage() { - self.managerLoop() + self.managerInfiniteLoop() } -func (self *InstanceManager) managerLoop() { +func (self *InstanceManager) managerInfiniteLoop() { log.Info().Msg("InstanceManager starting") go self.receiveNewInstanceHostnames() self.startup = time.Now() for { log.Info().Msg("InstanceManager tick") - self.Lock() - for _, v := range self.instances { - go func() { - if v.dueForFetch() { - v.Fetch() - } - }() - } - self.Unlock() + self.managerLoop() time.Sleep(1 * time.Second) } } +func (self *InstanceManager) managerLoop() { + self.Lock() + defer self.Unlock() + for _, v := range self.instances { + // wrap in a new goroutine because this needs to iterate + // fast and unlock fast + go func() { + if v.dueForFetch() { + go v.Fetch() + } + }() + } +} + func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool { self.Lock() + defer self.Unlock() for k, _ := range self.instances { if newhn == k { - self.Unlock() return true } } - self.Unlock() return false } func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { // only add it if we haven't seen the hostname before - if !self.hostnameExists(newhn) { - i := NewInstance(newhn) - self.Lock() - self.instances[newhn] = i - self.Unlock() + if self.hostnameExists(newhn) { + return } + i := NewInstance(newhn) + self.Lock() + defer self.Unlock() + self.instances[newhn] = i } func (self *InstanceManager) receiveNewInstanceHostnames() { @@ -119,7 +126,7 @@ func (self *InstanceManager) receiveNewInstanceHostnames() { } func (self *InstanceManager) logInstanceReport() { - r := self.instanceReport() + r := self.instanceSummaryReport() log.Info(). Uint("up", r.up). Uint("total", r.total). @@ -127,18 +134,18 @@ func (self *InstanceManager) logInstanceReport() { Msg("instance report") } -type InstanceReport struct { +type InstanceSummaryReport struct { up uint identified uint total uint } -func (r *InstanceReport) String() string { +func (r *InstanceSummaryReport) String() string { return fmt.Sprintf("up=%d identified=%d total=%d", r.up, r.identified, r.total) } func (self *InstanceManager) NumInstances() uint { - return self.instanceReport().total + return self.instanceSummaryReport().total } type InstanceListReport []*InstanceDetail @@ -149,26 +156,35 @@ type InstanceDetail struct { nextFetch string } -func (self *InstanceManager) instanceListForApi() InstanceListReport { - var output InstanceListReport +func (self *InstanceManager) listInstances() []*Instance { + var out []*Instance self.Lock() defer self.Unlock() for _, v := range self.instances { - id := &InstanceDetail{ - hostname: v.hostname, + out = append(out, v) + } + return out +} + +func (self *InstanceManager) instanceListForApi() []*gin.H { + var output []*gin.H + + l := self.listInstances() + for _, v := range l { + id := &gin.H{ + "hostname": v.hostname, + "up": v.Up(), + "nextFetch": string(time.Now().Sub(v.nextFetch)), } - id.up = v.Up() - id.nextFetch = string(time.Now().Sub(v.nextFetch)) output = append(output, id) - fmt.Printf("%s", output) } return output } -func (self *InstanceManager) instanceReport() *InstanceReport { +func (self *InstanceManager) instanceSummaryReport() *InstanceSummaryReport { self.Lock() defer self.Unlock() - r := new(InstanceReport) + r := new(InstanceSummaryReport) r.total = uint(len(self.instances))