checkpoint before major refactoring

This commit is contained in:
Jeffrey Paul 2019-11-03 05:17:00 -08:00
parent 5f00c3441b
commit 58954151b5
5 changed files with 129 additions and 69 deletions

View File

@ -48,19 +48,19 @@ func (a *TootArchiverAPIServer) getRouter() *gin.Engine {
}) })
r.GET("/", func(c *gin.Context) { r.GET("/", func(c *gin.Context) {
ir := a.archiver.locator.instanceReport()
c.JSON(200, gin.H{ c.JSON(200, gin.H{
// FIXME(sneak) add more stuff here // FIXME(sneak) add more stuff here
"status": "ok", "status": "ok",
"now": time.Now().UTC().Format(time.RFC3339), "now": time.Now().UTC().Format(time.RFC3339),
"uptime": a.archiver.Uptime().String(), "uptime": a.archiver.Uptime().String(),
"instances": gin.H{ "instanceSummary": gin.H{
"total": a.archiver.locator.instanceReport().total, "total": ir.total,
"up": a.archiver.locator.instanceReport().up, "up": ir.up,
"identified": a.archiver.locator.instanceReport().identified, "identified": ir.identified,
}, },
}) })
}) })
// FIXME(sneak) add more status routes here
return r return r
} }

View File

@ -2,12 +2,12 @@ package main
import "time" import "time"
type InstanceHostName string type InstanceHostname string
type TootArchiver struct { type TootArchiver struct {
locator *InstanceLocator locator *InstanceLocator
instances map[InstanceHostName]*Instance manager *InstanceManager
startup *time.Time startup *time.Time
} }
func NewTootArchiver() *TootArchiver { func NewTootArchiver() *TootArchiver {
@ -19,15 +19,28 @@ func (a *TootArchiver) Uptime() time.Duration {
return time.Since(*a.startup) return time.Since(*a.startup)
} }
func (a *TootArchiver) RunForever() int { func (a *TootArchiver) RunForever() {
t := time.Now() t := time.Now()
a.startup = &t a.startup = &t
newInstanceHostnameNotifications := make(chan InstanceHostname)
a.locator = NewInstanceLocator() a.locator = NewInstanceLocator()
a.manager = NewInstanceManager()
a.locator.AddInstanceNotificationChannel(newInstanceHostnameNotifications)
a.manager.AddInstanceNotificationChannel(newInstanceHostnameNotifications)
// locator goroutine:
go a.locator.Locate() go a.locator.Locate()
// manager goroutine:
go a.manager.Manage()
// this goroutine (main) does nothing until we handle signals
// FIXME(sneak)
for { for {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
return 0
} }

View File

@ -43,6 +43,7 @@ type Instance struct {
hostName string hostName string
identified bool identified bool
impl InstanceImplementation impl InstanceImplementation
backend *InstanceBackend
status InstanceStatus status InstanceStatus
nextCheck *time.Time nextCheck *time.Time
nodeInfoUrl string nodeInfoUrl string
@ -264,6 +265,7 @@ func (i *Instance) fetchNodeInfo() {
return return
} }
/*
func (i *Instance) fetchRecentToots() ([]byte, error) { func (i *Instance) fetchRecentToots() ([]byte, error) {
i.Lock() i.Lock()
impl := i.impl impl := i.impl
@ -277,13 +279,16 @@ func (i *Instance) fetchRecentToots() ([]byte, error) {
panic("unimplemented") panic("unimplemented")
} }
} }
*/
func (i *Instance) fetchRecentTootsJsonFromPleroma() ([]byte, error) { /*
func (self *PleromaBackend) fetchRecentToots() ([]byte, error) {
//url := fmt.Sprintf("https://%s/api/statuses/public_and_external_timeline.json?count=100", i.hostName) //url := fmt.Sprintf("https://%s/api/statuses/public_and_external_timeline.json?count=100", i.hostName)
return nil, nil return nil, nil
} }
func (i *Instance) fetchRecentTootsJsonFromMastodon() ([]byte, error) { func (self *MastodonBackend) fetchRecentTootsJsonFromMastodon() ([]byte, error) {
//url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", i.hostName) //url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", i.hostName)
return nil, nil return nil, nil
} }
*/

View File

@ -4,8 +4,8 @@ import "encoding/json"
import "fmt" import "fmt"
import "io/ioutil" import "io/ioutil"
import "net/http" import "net/http"
import "sync"
import "time" import "time"
import "sync"
import "github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
@ -17,12 +17,15 @@ var INDEX_CHECK_INTERVAL = time.Second * 60 * 60
// check with indices after 10 mins if they failed // check with indices after 10 mins if they failed
var INDEX_ERROR_INTERVAL = time.Second * 60 * 10 var INDEX_ERROR_INTERVAL = time.Second * 60 * 10
var LOG_REPORT_INTERVAL = time.Second * 60
const mastodonIndexUrl = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false" const mastodonIndexUrl = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false"
const pleromaIndexUrl = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api.cgi" const pleromaIndexUrl = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api.cgi"
type InstanceLocator struct { type InstanceLocator struct {
pleromaIndexNextRefresh *time.Time pleromaIndexNextRefresh *time.Time
mastodonIndexNextRefresh *time.Time mastodonIndexNextRefresh *time.Time
reportInstanceVia chan InstanceHostname
instances map[string]*Instance instances map[string]*Instance
sync.Mutex sync.Mutex
} }
@ -36,44 +39,49 @@ func NewInstanceLocator() *InstanceLocator {
return i return i
} }
func (i *InstanceLocator) addInstance(hostname string) { func (self *InstanceLocator) AddInstanceNotificationChannel(via chan InstanceHostname) {
i.Lock() self.Lock()
defer i.Unlock() defer self.Unlock()
self.reportInstanceVia = via
}
func (self *InstanceLocator) addInstance(hostname string) {
self.Lock()
defer self.Unlock()
// only add it if we haven't seen the hostname before // only add it if we haven't seen the hostname before
if i.instances[hostname] == nil { if self.instances[hostname] == nil {
log.Info().Str("hostname", hostname).Msgf("adding discovered instance") log.Info().Str("hostname", hostname).Msgf("adding discovered instance")
i.instances[hostname] = NewInstance(hostname) self.instances[hostname] = NewInstance(hostname)
} }
} }
func (i *InstanceLocator) Locate() { func (self *InstanceLocator) Locate() {
x := 0 x := time.Now()
for { for {
if i.pleromaIndexNextRefresh.Before(time.Now()) { if self.pleromaIndexNextRefresh.Before(time.Now()) {
i.locatePleroma() self.locatePleroma()
} }
if i.mastodonIndexNextRefresh.Before(time.Now()) { if self.mastodonIndexNextRefresh.Before(time.Now()) {
i.locateMastodon() self.locateMastodon()
} }
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
x++ if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) {
if x == 60 { x = time.Now()
x = 0
log.Debug(). log.Debug().
Str("nextmastodonupdate", i.mastodonIndexNextRefresh.Format(time.RFC3339)). Str("nextMastodonIndexFetch", self.mastodonIndexNextRefresh.Format(time.RFC3339)).
Send() Send()
log.Debug(). log.Debug().
Str("nextpleromaupdate", i.pleromaIndexNextRefresh.Format(time.RFC3339)). Str("nextPleromaIndexFetch", self.pleromaIndexNextRefresh.Format(time.RFC3339)).
Send() Send()
i.logInstanceReport() self.logInstanceReport()
} }
} }
} }
func (i *InstanceLocator) logInstanceReport() { func (self *InstanceLocator) logInstanceReport() {
r := i.instanceReport() r := self.instanceReport()
log.Info(). log.Info().
Uint("up", r.up). Uint("up", r.up).
Uint("total", r.total). Uint("total", r.total).
@ -91,18 +99,18 @@ func (r *InstanceLocatorReport) String() string {
return fmt.Sprintf("up=%d identified=%d total=%d", r.up, r.identified, r.total) return fmt.Sprintf("up=%d identified=%d total=%d", r.up, r.identified, r.total)
} }
func (i *InstanceLocator) NumInstances() uint { func (self *InstanceLocator) NumInstances() uint {
return i.instanceReport().total return self.instanceReport().total
} }
func (i *InstanceLocator) instanceReport() *InstanceLocatorReport { func (self *InstanceLocator) instanceReport() *InstanceLocatorReport {
i.Lock() self.Lock()
defer i.Unlock() defer self.Unlock()
r := new(InstanceLocatorReport) r := new(InstanceLocatorReport)
r.total = uint(len(i.instances)) r.total = uint(len(self.instances))
for _, elem := range i.instances { for _, elem := range self.instances {
if elem.identified == true { if elem.identified == true {
r.identified = r.identified + 1 r.identified = r.identified + 1
} }
@ -115,7 +123,7 @@ func (i *InstanceLocator) instanceReport() *InstanceLocatorReport {
return r return r
} }
func (i *InstanceLocator) locateMastodon() { func (self *InstanceLocator) locateMastodon() {
var c = &http.Client{ var c = &http.Client{
Timeout: INDEX_API_TIMEOUT, Timeout: INDEX_API_TIMEOUT,
} }
@ -124,9 +132,9 @@ func (i *InstanceLocator) locateMastodon() {
if err != nil { if err != nil {
log.Error().Msgf("unable to fetch mastodon instance list: %s", err) log.Error().Msgf("unable to fetch mastodon instance list: %s", err)
t := time.Now().Add(INDEX_ERROR_INTERVAL) t := time.Now().Add(INDEX_ERROR_INTERVAL)
i.Lock() self.Lock()
i.mastodonIndexNextRefresh = &t self.mastodonIndexNextRefresh = &t
i.Unlock() self.Unlock()
return return
} }
@ -136,9 +144,9 @@ func (i *InstanceLocator) locateMastodon() {
if err != nil { if err != nil {
log.Error().Msgf("unable to fetch mastodon instance list: %s", err) log.Error().Msgf("unable to fetch mastodon instance list: %s", err)
t := time.Now().Add(INDEX_ERROR_INTERVAL) t := time.Now().Add(INDEX_ERROR_INTERVAL)
i.Lock() self.Lock()
i.mastodonIndexNextRefresh = &t self.mastodonIndexNextRefresh = &t
i.Unlock() self.Unlock()
return return
} }
@ -147,23 +155,23 @@ func (i *InstanceLocator) locateMastodon() {
if err != nil { if err != nil {
log.Error().Msgf("unable to parse mastodon instance list: %s", err) log.Error().Msgf("unable to parse mastodon instance list: %s", err)
t := time.Now().Add(INDEX_ERROR_INTERVAL) t := time.Now().Add(INDEX_ERROR_INTERVAL)
i.Lock() self.Lock()
i.mastodonIndexNextRefresh = &t self.mastodonIndexNextRefresh = &t
i.Unlock() self.Unlock()
return return
} }
for _, instance := range mi.Instances { for _, instance := range mi.Instances {
i.addInstance(instance.Name) self.addInstance(instance.Name)
} }
t := time.Now().Add(INDEX_CHECK_INTERVAL) t := time.Now().Add(INDEX_CHECK_INTERVAL)
i.Lock() self.Lock()
i.mastodonIndexNextRefresh = &t self.mastodonIndexNextRefresh = &t
i.Unlock() self.Unlock()
} }
func (i *InstanceLocator) locatePleroma() { func (self *InstanceLocator) locatePleroma() {
var c = &http.Client{ var c = &http.Client{
Timeout: INDEX_API_TIMEOUT, Timeout: INDEX_API_TIMEOUT,
} }
@ -172,9 +180,9 @@ func (i *InstanceLocator) locatePleroma() {
if err != nil { if err != nil {
log.Error().Msgf("unable to fetch pleroma instance list: %s", err) log.Error().Msgf("unable to fetch pleroma instance list: %s", err)
t := time.Now().Add(INDEX_ERROR_INTERVAL) t := time.Now().Add(INDEX_ERROR_INTERVAL)
i.Lock() self.Lock()
i.pleromaIndexNextRefresh = &t self.pleromaIndexNextRefresh = &t
i.Unlock() self.Unlock()
return return
} }
@ -184,9 +192,9 @@ func (i *InstanceLocator) locatePleroma() {
if err != nil { if err != nil {
log.Error().Msgf("unable to fetch pleroma instance list: %s", err) log.Error().Msgf("unable to fetch pleroma instance list: %s", err)
t := time.Now().Add(INDEX_ERROR_INTERVAL) t := time.Now().Add(INDEX_ERROR_INTERVAL)
i.Lock() self.Lock()
i.pleromaIndexNextRefresh = &t self.pleromaIndexNextRefresh = &t
i.Unlock() self.Unlock()
return return
} }
@ -196,17 +204,17 @@ func (i *InstanceLocator) locatePleroma() {
if err != nil { if err != nil {
log.Warn().Msgf("unable to parse pleroma instance list: %s", err) log.Warn().Msgf("unable to parse pleroma instance list: %s", err)
t := time.Now().Add(INDEX_ERROR_INTERVAL) t := time.Now().Add(INDEX_ERROR_INTERVAL)
i.Lock() self.Lock()
i.pleromaIndexNextRefresh = &t self.pleromaIndexNextRefresh = &t
i.Unlock() self.Unlock()
return return
} }
for _, instance := range *pi { for _, instance := range *pi {
i.addInstance(instance.Domain) self.addInstance(instance.Domain)
} }
t := time.Now().Add(INDEX_CHECK_INTERVAL) t := time.Now().Add(INDEX_CHECK_INTERVAL)
i.Lock() self.Lock()
i.pleromaIndexNextRefresh = &t self.pleromaIndexNextRefresh = &t
i.Unlock() self.Unlock()
} }

34
instancemanager.go Normal file
View File

@ -0,0 +1,34 @@
package main
import "sync"
import "time"
//import "github.com/rs/zerolog/log"
type InstanceBackend interface {
//FIXME
}
type InstanceManager struct {
sync.Mutex
instances map[InstanceHostname]*Instance
newInstanceNotifications chan InstanceHostname
}
func NewInstanceManager() *InstanceManager {
i := new(InstanceManager)
return i
}
func (self *InstanceManager) AddInstanceNotificationChannel(via chan InstanceHostname) {
self.Lock()
defer self.Unlock()
self.newInstanceNotifications = via
}
func (self *InstanceManager) Manage() {
for {
// FIXME(sneak)
time.Sleep(1 * time.Second)
}
}