works really great now

This commit is contained in:
Jeffrey Paul 2019-11-05 23:03:42 -08:00
parent 5023155190
commit a854a48b2b
8 changed files with 258 additions and 159 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
feta
output/
feta.sqlite

View File

@ -4,6 +4,8 @@ import "time"
import "net/http"
import "encoding/json"
import "runtime"
import "fmt"
import "strings"
import "github.com/gin-gonic/gin"
@ -22,7 +24,7 @@ func (a *FetaAPIServer) instances() []hash {
i["successCount"] = v.successCount
i["errorCount"] = v.errorCount
i["identified"] = v.identified
i["status"] = v.status
i["status"] = v.Status()
i["software"] = "unknown"
i["version"] = "unknown"
if v.identified {
@ -35,11 +37,42 @@ func (a *FetaAPIServer) instances() []hash {
return resp
}
func (a *FetaAPIServer) instanceSummary() map[string]int {
resp := make(map[string]int)
for _, v := range a.feta.manager.listInstances() {
v.Lock()
resp[fmt.Sprintf("STATUS_%s", v.Status())]++
if v.serverImplementationString != "" {
//FIXME(sneak) sanitize this to a-z0-9, it is server-provided
resp[fmt.Sprintf("SOFTWARE_%s", strings.ToUpper(v.serverImplementationString))]++
}
v.Unlock()
}
return resp
}
func (a *FetaAPIServer) getInstanceListHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
result := &gin.H{
"instances": a.instances(),
}
json, err := json.Marshal(result)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(json)
}
}
func (a *FetaAPIServer) getIndexHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
index := &gin.H{
"instances": a.instances(),
"server": &gin.H{
"now": time.Now().UTC().Format(time.RFC3339),
"uptime": a.feta.Uptime().String(),
@ -50,6 +83,7 @@ func (a *FetaAPIServer) getIndexHandler() http.HandlerFunc {
"buildarch": a.feta.buildarch,
"builduser": a.feta.builduser,
},
"instanceSummary": a.instanceSummary(),
}
json, err := json.Marshal(index)

View File

@ -44,7 +44,9 @@ func (a *FetaAPIServer) getRouter() *gin.Engine {
r.Use(ginzerolog.Logger("gin"))
r.GET("/.well-known/healthcheck.json", gin.WrapF(a.getHealthCheckHandler()))
r.GET("/", gin.WrapF(a.getIndexHandler()))
r.GET("/", func(c *gin.Context) { c.Redirect(http.StatusMovedPermanently, "/feta") })
r.GET("/feta", gin.WrapF(a.getIndexHandler()))
r.GET("/feta/list/instances", gin.WrapF(a.getInstanceListHandler()))
return r
}

14
dbmodel.go Normal file
View File

@ -0,0 +1,14 @@
package feta
import "github.com/jinzhu/gorm"
import _ "github.com/jinzhu/gorm/dialects/sqlite"
type SavedInstance struct {
gorm.Model
hostname string
software string
}
func (f *FetaProcess) databaseMigrations() {
f.db.AutoMigrate(&SavedInstance{})
}

27
feta.go
View File

@ -32,7 +32,7 @@ type FetaProcess struct {
manager *InstanceManager
api *FetaAPIServer
db *gorm.DB
startup *time.Time
startup time.Time
}
func (f *FetaProcess) identify() {
@ -72,26 +72,31 @@ func (f *FetaProcess) setupLogging() {
}
func (f *FetaProcess) Uptime() time.Duration {
return time.Since(*f.startup)
return time.Since(f.startup)
}
func (f *FetaProcess) setupDatabase() {
var err error
f.db, err = gorm.Open("sqlite3", "feta.sqlite")
if err != nil {
panic(err)
}
f.databaseMigrations()
}
func (f *FetaProcess) RunForever() int {
t := time.Now()
f.startup = &t
f.startup = time.Now()
var err error
f.db, err = gorm.Open("sqlite3", "/feta.sqlite")
defer f.db.Close()
f.setupDatabase()
if err != nil {
log.Panic().Err(err)
}
newInstanceHostnameNotifications := make(chan InstanceHostname)
f.locator = NewInstanceLocator()
f.manager = NewInstanceManager()
f.api = new(FetaAPIServer)
f.api.feta = f // api needs to get to us
f.api.feta = f // api needs to get to us to access data
f.locator.AddInstanceNotificationChannel(newInstanceHostnameNotifications)
f.manager.AddInstanceNotificationChannel(newInstanceHostnameNotifications)

View File

@ -9,38 +9,31 @@ import "sync"
import "time"
import "errors"
import "github.com/gin-gonic/gin"
//import "github.com/gin-gonic/gin"
import "github.com/looplab/fsm"
import "github.com/rs/zerolog/log"
const NodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0"
const INSTANCE_NODEINFO_TIMEOUT = time.Second * 5
const INSTANCE_NODEINFO_TIMEOUT = time.Second * 50
const INSTANCE_HTTP_TIMEOUT = time.Second * 60
const INSTANCE_HTTP_TIMEOUT = time.Second * 50
const INSTANCE_SPIDER_INTERVAL = time.Second * 60
const INSTANCE_ERROR_INTERVAL = time.Second * 60 * 30
type InstanceImplementation int
type instanceImplementation int
const (
Unknown InstanceImplementation = iota
Unknown instanceImplementation = iota
Mastodon
Pleroma
)
type InstanceStatus int
type instanceStatus int
const (
InstanceStatusNone InstanceStatus = iota
InstanceStatusUnknown
InstanceStatusAlive
InstanceStatusIdentified
InstanceStatusFailure
)
type Instance struct {
type instance struct {
structLock sync.Mutex
errorCount uint
successCount uint
@ -48,114 +41,162 @@ type Instance struct {
hostname string
identified bool
fetching bool
implementation InstanceImplementation
implementation instanceImplementation
backend *InstanceBackend
status InstanceStatus
status instanceStatus
nextFetch time.Time
nodeInfoUrl string
serverVersionString string
serverImplementationString string
fetchingLock sync.Mutex
fsm *fsm.FSM
fsmLock sync.Mutex
}
func NewInstance(hostname InstanceHostname) *Instance {
self := new(Instance)
self.hostname = string(hostname)
self.status = InstanceStatusUnknown
self.setNextFetchAfter(86400 * time.Second)
return self
func NewInstance(options ...func(i *instance)) *instance {
i := new(instance)
i.setNextFetchAfter(1 * time.Second)
i.fsm = fsm.NewFSM(
"STATUS_UNKNOWN",
fsm.Events{
{Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"},
{Name: "GOT_NODEINFO_URL", Src: []string{"FETCHING_NODEINFO_URL"}, Dst: "PRE_NODEINFO_FETCH"},
{Name: "BEGIN_NODEINFO_FETCH", Src: []string{"PRE_NODEINFO_FETCH"}, Dst: "FETCHING_NODEINFO"},
{Name: "GOT_NODEINFO", Src: []string{"FETCHING_NODEINFO"}, Dst: "READY_FOR_TOOTFETCH"},
{Name: "FETCH_TIME_REACHED", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "READY_AND_DUE_FETCH"},
{Name: "WEIRD_NODE_RESPONSE", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "WEIRD_NODE"},
{Name: "EARLY_FETCH_ERROR", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "EARLY_ERROR"},
{Name: "TOOT_FETCH_ERROR", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "TOOT_FETCH_ERROR"},
},
fsm.Callbacks{
"enter_state": func(e *fsm.Event) { i.fsmEnterState(e) },
},
)
for _, opt := range options {
opt(i)
}
return i
}
func (self *Instance) Lock() {
self.structLock.Lock()
func (i *instance) Status() string {
i.fsmLock.Lock()
defer i.fsmLock.Unlock()
return i.fsm.Current()
}
func (self *Instance) Unlock() {
self.structLock.Unlock()
func (i *instance) Event(eventname string) {
i.fsmLock.Lock()
defer i.fsmLock.Unlock()
i.fsm.Event(eventname)
}
func (self *Instance) bumpFetch() {
self.Lock()
defer self.Unlock()
self.nextFetch = time.Now().Add(100 * time.Second)
func (i *instance) fsmEnterState(e *fsm.Event) {
log.Debug().
Str("hostname", i.hostname).
Str("state", e.Dst).
Msg("instance changed state")
}
func (self *Instance) setNextFetchAfter(d time.Duration) {
self.Lock()
defer self.Unlock()
self.nextFetch = time.Now().Add(d)
func (i *instance) Lock() {
i.structLock.Lock()
}
func (self *Instance) Fetch() {
self.fetchingLock.Lock()
defer self.fetchingLock.Unlock()
func (i *instance) Unlock() {
i.structLock.Unlock()
}
self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL)
func (i *instance) bumpFetch() {
i.Lock()
defer i.Unlock()
i.nextFetch = time.Now().Add(100 * time.Second)
}
err := self.detectNodeTypeIfNecessary()
func (i *instance) setNextFetchAfter(d time.Duration) {
i.Lock()
defer i.Unlock()
i.nextFetch = time.Now().Add(d)
}
func (i *instance) Fetch() {
i.fetchingLock.Lock()
defer i.fetchingLock.Unlock()
i.setNextFetchAfter(INSTANCE_ERROR_INTERVAL)
err := i.detectNodeTypeIfNecessary()
if err != nil {
log.Debug().
Str("hostname", self.hostname).
Str("hostname", i.hostname).
Err(err).
Msg("unable to fetch instance metadata")
return
}
self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL)
log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", self.hostname)
i.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL)
log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", i.hostname)
}
func (self *Instance) dueForFetch() bool {
self.Lock()
defer self.Unlock()
if !self.identified {
return false
}
nf := self.nextFetch
return nf.Before(time.Now())
}
func (self *Instance) nodeIdentified() bool {
self.Lock()
defer self.Unlock()
if self.implementation > Unknown {
func (i *instance) dueForFetch() bool {
// this just checks FSM state, the ticker must update it and do time
// calcs
if i.Status() == "READY_AND_DUE_FETCH" {
return true
}
return false
}
func (self *Instance) detectNodeTypeIfNecessary() error {
if !self.nodeIdentified() {
return self.fetchNodeInfo()
func (i *instance) isNowPastFetchTime() bool {
return time.Now().After(i.nextFetch)
}
func (i *instance) Tick() {
if i.Status() == "READY_FOR_TOOTFETCH" {
if i.isNowPastFetchTime() {
i.Event("FETCH_TIME_REACHED")
}
} else if i.Status() == "STATUS_UNKNOWN" {
i.Fetch()
}
}
func (i *instance) nodeIdentified() bool {
i.Lock()
defer i.Unlock()
if i.implementation > Unknown {
return true
}
return false
}
func (i *instance) detectNodeTypeIfNecessary() error {
if !i.nodeIdentified() {
return i.fetchNodeInfo()
} else {
return nil
}
}
func (self *Instance) registerError() {
self.Lock()
defer self.Unlock()
self.errorCount++
func (i *instance) registerError() {
i.Lock()
defer i.Unlock()
i.errorCount++
}
func (self *Instance) registerSuccess() {
self.Lock()
defer self.Unlock()
self.successCount++
func (i *instance) registerSuccess() {
i.Lock()
defer i.Unlock()
i.successCount++
}
func (self *Instance) APIReport() *gin.H {
r := gin.H{}
return &r
}
func (i *Instance) Up() bool {
func (i *instance) Up() bool {
i.Lock()
defer i.Unlock()
return i.successCount > 0
}
func (i *Instance) fetchNodeInfoURL() error {
func (i *instance) fetchNodeInfoURL() error {
url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname)
var c = &http.Client{
Timeout: INSTANCE_NODEINFO_TIMEOUT,
@ -166,6 +207,7 @@ func (i *Instance) fetchNodeInfoURL() error {
Str("hostname", i.hostname).
Msg("fetching nodeinfo reference URL")
i.Event("BEGIN_NODEINFO_URL_FETCH")
resp, err := c.Get(url)
if err != nil {
log.Debug().
@ -173,6 +215,7 @@ func (i *Instance) fetchNodeInfoURL() error {
Err(err).
Msg("unable to fetch nodeinfo, node is down?")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
@ -185,6 +228,7 @@ func (i *Instance) fetchNodeInfoURL() error {
Err(err).
Msg("unable to read nodeinfo")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
@ -196,6 +240,7 @@ func (i *Instance) fetchNodeInfoURL() error {
Err(err).
Msg("unable to parse nodeinfo, node is weird")
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return err
}
@ -210,23 +255,25 @@ func (i *Instance) fetchNodeInfoURL() error {
i.nodeInfoUrl = item.Href
i.Unlock()
i.registerSuccess()
i.Event("GOT_NODEINFO_URL")
return nil
}
log.Debug().
Str("hostname", i.hostname).
Str("item-rel", item.Rel).
Str("item-href", item.Href).
Msg("found key in nodeinfo")
Msg("nodeinfo entry")
}
log.Error().
Str("hostname", i.hostname).
Msg("incomplete nodeinfo")
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return errors.New("incomplete nodeinfo")
}
func (i *Instance) fetchNodeInfo() error {
func (i *instance) fetchNodeInfo() error {
err := i.fetchNodeInfoURL()
if err != nil {
@ -243,6 +290,7 @@ func (i *Instance) fetchNodeInfo() error {
url := i.nodeInfoUrl
i.Unlock()
i.Event("BEGIN_NODEINFO_FETCH")
resp, err := c.Get(url)
if err != nil {
@ -251,6 +299,7 @@ func (i *Instance) fetchNodeInfo() error {
Err(err).
Msgf("unable to fetch nodeinfo data")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
@ -263,6 +312,7 @@ func (i *Instance) fetchNodeInfo() error {
Err(err).
Msgf("unable to read nodeinfo data")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
@ -274,6 +324,7 @@ func (i *Instance) fetchNodeInfo() error {
Err(err).
Msgf("unable to parse nodeinfo")
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return err
}
@ -296,9 +347,9 @@ func (i *Instance) fetchNodeInfo() error {
Msg("detected server software")
i.identified = true
i.implementation = Pleroma
i.status = InstanceStatusIdentified
i.Unlock()
i.registerSuccess()
i.Event("GOT_NODEINFO")
return nil
} else if ni.Software.Name == "mastodon" {
log.Debug().
@ -307,9 +358,9 @@ func (i *Instance) fetchNodeInfo() error {
Msg("detected server software")
i.identified = true
i.implementation = Mastodon
i.status = InstanceStatusIdentified
i.Unlock()
i.registerSuccess()
i.Event("GOT_NODEINFO")
return nil
} else {
log.Error().
@ -318,7 +369,8 @@ func (i *Instance) fetchNodeInfo() error {
Msg("FIXME unknown server implementation")
i.Unlock()
i.registerError()
return errors.New("FIXME unknown server implementation")
i.Event("WEIRD_NODE_RESPONSE")
return errors.New("unknown server implementation")
}
}
@ -339,14 +391,14 @@ func (i *Instance) fetchRecentToots() ([]byte, error) {
*/
/*
func (self *PleromaBackend) fetchRecentToots() ([]byte, error) {
func (i *PleromaBackend) fetchRecentToots() ([]byte, error) {
//url :=
//fmt.Sprintf("https://%s/api/statuses/public_and_external_timeline.json?count=100",
//i.hostname)
return nil, nil
}
func (self *MastodonBackend) fetchRecentTootsJsonFromMastodon() ([]byte, error) {
func (i *MastodonBackend) fetchRecentTootsJsonFromMastodon() ([]byte, error) {
//url :=
//fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true",
//i.hostname)

View File

@ -23,7 +23,7 @@ var INDEX_ERROR_INTERVAL = time.Second * 60 * 10
// LOG_REPORT_INTERVAL defines how long between logging internal
// stats/reporting for user supervision
var LOG_REPORT_INTERVAL = time.Second * 60
var LOG_REPORT_INTERVAL = time.Second * 10
const mastodonIndexUrl = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false"
const pleromaIndexUrl = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api.cgi"
@ -55,6 +55,22 @@ func (self *InstanceLocator) addInstance(hostname InstanceHostname) {
self.reportInstanceVia <- hostname
}
func (self *InstanceLocator) mastodonIndexRefreshDue() bool {
return self.mastodonIndexNextRefresh.Before(time.Now())
}
func (self *InstanceLocator) durationUntilNextMastodonIndexRefresh() time.Duration {
return (time.Duration(-1) * time.Now().Sub(*self.mastodonIndexNextRefresh))
}
func (self *InstanceLocator) pleromaIndexRefreshDue() bool {
return self.pleromaIndexNextRefresh.Before(time.Now())
}
func (self *InstanceLocator) durationUntilNextPleromaIndexRefresh() time.Duration {
return (time.Duration(-1) * time.Now().Sub(*self.pleromaIndexNextRefresh))
}
func (self *InstanceLocator) Locate() {
log.Info().Msg("InstanceLocator starting")
x := time.Now()
@ -66,7 +82,7 @@ func (self *InstanceLocator) Locate() {
log.Info().Msg("InstanceLocator tick")
go func() {
if self.pleromaIndexNextRefresh.Before(time.Now()) {
if self.pleromaIndexRefreshDue() {
if !pleromaSemaphore.TryAcquire(1) {
return
}
@ -76,7 +92,7 @@ func (self *InstanceLocator) Locate() {
}()
go func() {
if self.mastodonIndexNextRefresh.Before(time.Now()) {
if self.mastodonIndexRefreshDue() {
if !mastodonSemaphore.TryAcquire(1) {
return
}
@ -90,11 +106,11 @@ func (self *InstanceLocator) Locate() {
if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) {
x = time.Now()
log.Debug().
Str("nextMastodonIndexFetch", time.Now().Sub(*self.mastodonIndexNextRefresh).String()).
Send()
Str("nextMastodonIndexRefresh", self.durationUntilNextMastodonIndexRefresh().String()).
Msg("refresh countdown")
log.Debug().
Str("nextMastodonIndexFetch", time.Now().Sub(*self.pleromaIndexNextRefresh).String()).
Send()
Str("nextPleromaIndexRefresh", self.durationUntilNextPleromaIndexRefresh().String()).
Msg("refresh countdown")
}
}

View File

@ -2,13 +2,12 @@ package feta
import "sync"
import "time"
import "fmt"
import "runtime"
//import "github.com/gin-gonic/gin"
import "github.com/rs/zerolog/log"
const HOST_DISCOVERY_PARALLELISM = 10
const HOST_DISCOVERY_PARALLELISM = 1
type InstanceBackend interface {
//FIXME
@ -16,7 +15,7 @@ type InstanceBackend interface {
type InstanceManager struct {
mu sync.Mutex
instances map[InstanceHostname]*Instance
instances map[InstanceHostname]*instance
newInstanceNotifications chan InstanceHostname
startup time.Time
hostAdderSemaphore chan bool
@ -25,7 +24,7 @@ type InstanceManager struct {
func NewInstanceManager() *InstanceManager {
i := new(InstanceManager)
i.hostAdderSemaphore = make(chan bool, HOST_DISCOVERY_PARALLELISM)
i.instances = make(map[InstanceHostname]*Instance)
i.instances = make(map[InstanceHostname]*instance)
return i
}
@ -76,29 +75,31 @@ func (self *InstanceManager) Manage() {
self.receiveNewInstanceHostnames()
}()
self.startup = time.Now()
x := self.startup
for {
log.Info().Msg("InstanceManager tick")
self.managerLoop()
time.Sleep(1 * time.Second)
if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) {
x = time.Now()
self.logInstanceReport()
}
}
}
func (self *InstanceManager) managerLoop() {
self.Lock()
il := make([]*Instance, 0)
il := make([]*instance, 0)
for _, v := range self.instances {
il = append(il, v)
}
self.Unlock()
for _, v := range il {
if v.dueForFetch() {
go func(i *Instance) {
i.Fetch()
}(v)
}
go func(i *instance) {
i.Tick()
}(v)
}
}
func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool {
@ -120,7 +121,9 @@ func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) {
// this blocks on the channel size, limiting concurrency
self.hostAdderSemaphore <- true
i := NewInstance(newhn)
i := NewInstance(func(x *instance) {
x.hostname = string(newhn)
})
// we do node detection under the addLock to avoid thundering
// on startup
i.detectNodeTypeIfNecessary()
@ -147,37 +150,19 @@ func (self *InstanceManager) receiveNewInstanceHostnames() {
func (self *InstanceManager) logInstanceReport() {
r := self.instanceSummaryReport()
log.Info().
Uint("up", r.up).
Uint("total", r.total).
Uint("identified", r.identified).
sublogger := log.With().Logger()
for k, v := range r {
sublogger = sublogger.With().Uint(k, v).Logger()
}
sublogger.Info().
Msg("instance report")
}
type InstanceSummaryReport struct {
up uint
identified uint
total uint
}
func (r *InstanceSummaryReport) String() string {
return fmt.Sprintf("up=%d identified=%d total=%d", r.up, r.identified, r.total)
}
func (self *InstanceManager) NumInstances() uint {
return self.instanceSummaryReport().total
}
type InstanceListReport []*InstanceDetail
type InstanceDetail struct {
hostname string
up bool
nextFetch string
}
func (self *InstanceManager) listInstances() []*Instance {
var out []*Instance
func (self *InstanceManager) listInstances() []*instance {
var out []*instance
self.Lock()
defer self.Unlock()
for _, v := range self.instances {
@ -186,22 +171,12 @@ func (self *InstanceManager) listInstances() []*Instance {
return out
}
func (self *InstanceManager) instanceSummaryReport() *InstanceSummaryReport {
self.Lock()
defer self.Unlock()
r := new(InstanceSummaryReport)
r.total = uint(len(self.instances))
for _, elem := range self.instances {
if elem.identified == true {
r.identified = r.identified + 1
}
if elem.status == InstanceStatusAlive {
r.up = r.up + 1
}
func (im *InstanceManager) instanceSummaryReport() map[string]uint {
r := make(map[string]uint)
for _, v := range im.listInstances() {
v.Lock()
r[v.Status()]++
v.Unlock()
}
return r
}