This commit is contained in:
Jeffrey Paul 2019-11-04 09:07:04 -08:00
parent 3b543fe5a5
commit 66318d85f2
5 changed files with 92 additions and 70 deletions

3
api.go
View File

@ -48,11 +48,10 @@ func (a *TootArchiverAPIServer) getRouter() *gin.Engine {
}) })
r.GET("/", func(c *gin.Context) { r.GET("/", func(c *gin.Context) {
ir := a.archiver.manager.instanceReport() ir := a.archiver.manager.instanceSummaryReport()
il := a.archiver.manager.instanceListForApi() il := a.archiver.manager.instanceListForApi()
c.JSON(200, gin.H{ c.JSON(200, gin.H{
// FIXME(sneak) add more stuff here
"status": "ok", "status": "ok",
"now": time.Now().UTC().Format(time.RFC3339), "now": time.Now().UTC().Format(time.RFC3339),
"uptime": a.archiver.Uptime().String(), "uptime": a.archiver.Uptime().String(),

View File

@ -23,7 +23,7 @@ func (a *TootArchiver) RunForever() {
t := time.Now() t := time.Now()
a.startup = &t a.startup = &t
newInstanceHostnameNotifications := make(chan InstanceHostname) newInstanceHostnameNotifications := make(chan InstanceHostname, 10000)
a.locator = NewInstanceLocator() a.locator = NewInstanceLocator()

View File

@ -9,6 +9,7 @@ import "sync"
import "time" import "time"
import "errors" import "errors"
import "github.com/gin-gonic/gin"
import "github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
const NodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0" const NodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0"
@ -54,30 +55,26 @@ type Instance struct {
} }
func NewInstance(hostname InstanceHostname) *Instance { func NewInstance(hostname InstanceHostname) *Instance {
i := new(Instance) self := new(Instance)
i.hostname = string(hostname) self.hostname = string(hostname)
i.status = InstanceStatusUnknown self.status = InstanceStatusUnknown
i.nextFetch = time.Now().Add(-1 * time.Second) self.nextFetch = time.Now().Add(-1 * time.Second)
// FIXME make checks detect the node type instead of in the constructor return self
return i
} }
func (i *Instance) setNextFetchAfter(d time.Duration) { func (self *Instance) bumpFetch() {
i.Lock()
defer i.Unlock()
i.nextFetch = time.Now().Add(d)
}
func (self *Instance) setFetching(f bool) {
self.Lock() self.Lock()
defer self.Unlock() defer self.Unlock()
self.fetching = f self.nextFetch = time.Now().Add(10 * time.Second)
}
func (self *Instance) setNextFetchAfter(d time.Duration) {
self.Lock()
defer self.Unlock()
self.nextFetch = time.Now().Add(d)
} }
func (self *Instance) Fetch() { func (self *Instance) Fetch() {
self.setFetching(true)
defer self.setFetching(false)
err := self.detectNodeTypeIfNecessary() err := self.detectNodeTypeIfNecessary()
if err != nil { if err != nil {
self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL)
@ -93,36 +90,46 @@ func (self *Instance) Fetch() {
} }
func (self *Instance) dueForFetch() bool { func (self *Instance) dueForFetch() bool {
self.Lock()
defer self.Unlock()
nf := self.nextFetch
return nf.Before(time.Now())
}
func (self *Instance) nodeIdentified() bool {
self.RLock() self.RLock()
defer self.RUnlock() defer self.RUnlock()
if self.fetching { if self.impl > Unknown {
return false return true
} }
return self.nextFetch.Before(time.Now()) return false
} }
func (i *Instance) detectNodeTypeIfNecessary() error { func (self *Instance) detectNodeTypeIfNecessary() error {
i.RLock() if !self.nodeIdentified() {
if i.impl > Unknown { return self.fetchNodeInfo()
i.RUnlock() } else {
return nil return nil
} }
i.RUnlock()
return i.fetchNodeInfo()
} }
func (i *Instance) registerError() { func (self *Instance) registerError() {
i.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL)
i.Lock() self.Lock()
defer i.Unlock() defer self.Unlock()
i.errorCount++ self.errorCount++
} }
func (i *Instance) registerSuccess() { func (self *Instance) registerSuccess() {
i.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL)
i.Lock() self.Lock()
defer i.Unlock() defer self.Unlock()
i.successCount++ self.successCount++
}
func (self *Instance) ApiReport() *gin.H {
r := gin.H{}
return &r
} }
func (i *Instance) Up() bool { func (i *Instance) Up() bool {

View File

@ -5,6 +5,7 @@ import "time"
import "fmt" import "fmt"
import "runtime" import "runtime"
import "github.com/gin-gonic/gin"
import "github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
type InstanceBackend interface { type InstanceBackend interface {
@ -66,48 +67,54 @@ func (self *InstanceManager) AddInstanceNotificationChannel(via chan InstanceHos
} }
func (self *InstanceManager) Manage() { func (self *InstanceManager) Manage() {
self.managerLoop() self.managerInfiniteLoop()
} }
func (self *InstanceManager) managerLoop() { func (self *InstanceManager) managerInfiniteLoop() {
log.Info().Msg("InstanceManager starting") log.Info().Msg("InstanceManager starting")
go self.receiveNewInstanceHostnames() go self.receiveNewInstanceHostnames()
self.startup = time.Now() self.startup = time.Now()
for { for {
log.Info().Msg("InstanceManager tick") log.Info().Msg("InstanceManager tick")
self.managerLoop()
time.Sleep(1 * time.Second)
}
}
func (self *InstanceManager) managerLoop() {
self.Lock() self.Lock()
defer self.Unlock()
for _, v := range self.instances { for _, v := range self.instances {
// wrap in a new goroutine because this needs to iterate
// fast and unlock fast
go func() { go func() {
if v.dueForFetch() { if v.dueForFetch() {
v.Fetch() go v.Fetch()
} }
}() }()
} }
self.Unlock()
time.Sleep(1 * time.Second)
}
} }
func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool { func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool {
self.Lock() self.Lock()
defer self.Unlock()
for k, _ := range self.instances { for k, _ := range self.instances {
if newhn == k { if newhn == k {
self.Unlock()
return true return true
} }
} }
self.Unlock()
return false return false
} }
func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) {
// only add it if we haven't seen the hostname before // only add it if we haven't seen the hostname before
if !self.hostnameExists(newhn) { if self.hostnameExists(newhn) {
return
}
i := NewInstance(newhn) i := NewInstance(newhn)
self.Lock() self.Lock()
defer self.Unlock()
self.instances[newhn] = i self.instances[newhn] = i
self.Unlock()
}
} }
func (self *InstanceManager) receiveNewInstanceHostnames() { func (self *InstanceManager) receiveNewInstanceHostnames() {
@ -119,7 +126,7 @@ func (self *InstanceManager) receiveNewInstanceHostnames() {
} }
func (self *InstanceManager) logInstanceReport() { func (self *InstanceManager) logInstanceReport() {
r := self.instanceReport() r := self.instanceSummaryReport()
log.Info(). log.Info().
Uint("up", r.up). Uint("up", r.up).
Uint("total", r.total). Uint("total", r.total).
@ -127,18 +134,18 @@ func (self *InstanceManager) logInstanceReport() {
Msg("instance report") Msg("instance report")
} }
type InstanceReport struct { type InstanceSummaryReport struct {
up uint up uint
identified uint identified uint
total uint total uint
} }
func (r *InstanceReport) String() string { func (r *InstanceSummaryReport) String() string {
return fmt.Sprintf("up=%d identified=%d total=%d", r.up, r.identified, r.total) return fmt.Sprintf("up=%d identified=%d total=%d", r.up, r.identified, r.total)
} }
func (self *InstanceManager) NumInstances() uint { func (self *InstanceManager) NumInstances() uint {
return self.instanceReport().total return self.instanceSummaryReport().total
} }
type InstanceListReport []*InstanceDetail type InstanceListReport []*InstanceDetail
@ -149,26 +156,35 @@ type InstanceDetail struct {
nextFetch string nextFetch string
} }
func (self *InstanceManager) instanceListForApi() InstanceListReport { func (self *InstanceManager) listInstances() []*Instance {
var output InstanceListReport var out []*Instance
self.Lock() self.Lock()
defer self.Unlock() defer self.Unlock()
for _, v := range self.instances { for _, v := range self.instances {
id := &InstanceDetail{ out = append(out, v)
hostname: v.hostname, }
return out
}
func (self *InstanceManager) instanceListForApi() []*gin.H {
var output []*gin.H
l := self.listInstances()
for _, v := range l {
id := &gin.H{
"hostname": v.hostname,
"up": v.Up(),
"nextFetch": string(time.Now().Sub(v.nextFetch)),
} }
id.up = v.Up()
id.nextFetch = string(time.Now().Sub(v.nextFetch))
output = append(output, id) output = append(output, id)
fmt.Printf("%s", output)
} }
return output return output
} }
func (self *InstanceManager) instanceReport() *InstanceReport { func (self *InstanceManager) instanceSummaryReport() *InstanceSummaryReport {
self.Lock() self.Lock()
defer self.Unlock() defer self.Unlock()
r := new(InstanceReport) r := new(InstanceSummaryReport)
r.total = uint(len(self.instances)) r.total = uint(len(self.instances))