diff --git a/.gitignore b/.gitignore index 0cee278..1b38502 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ feta output/ +feta.sqlite diff --git a/apihandlers.go b/apihandlers.go index 32c66df..3f9120f 100644 --- a/apihandlers.go +++ b/apihandlers.go @@ -4,6 +4,8 @@ import "time" import "net/http" import "encoding/json" import "runtime" +import "fmt" +import "strings" import "github.com/gin-gonic/gin" @@ -22,7 +24,7 @@ func (a *FetaAPIServer) instances() []hash { i["successCount"] = v.successCount i["errorCount"] = v.errorCount i["identified"] = v.identified - i["status"] = v.status + i["status"] = v.Status() i["software"] = "unknown" i["version"] = "unknown" if v.identified { @@ -35,11 +37,42 @@ func (a *FetaAPIServer) instances() []hash { return resp } +func (a *FetaAPIServer) instanceSummary() map[string]int { + resp := make(map[string]int) + for _, v := range a.feta.manager.listInstances() { + v.Lock() + resp[fmt.Sprintf("STATUS_%s", v.Status())]++ + if v.serverImplementationString != "" { + //FIXME(sneak) sanitize this to a-z0-9, it is server-provided + resp[fmt.Sprintf("SOFTWARE_%s", strings.ToUpper(v.serverImplementationString))]++ + } + v.Unlock() + } + return resp +} + +func (a *FetaAPIServer) getInstanceListHandler() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + + result := &gin.H{ + "instances": a.instances(), + } + + json, err := json.Marshal(result) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.Write(json) + } +} + func (a *FetaAPIServer) getIndexHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { index := &gin.H{ - "instances": a.instances(), "server": &gin.H{ "now": time.Now().UTC().Format(time.RFC3339), "uptime": a.feta.Uptime().String(), @@ -50,6 +83,7 @@ func (a *FetaAPIServer) getIndexHandler() http.HandlerFunc { "buildarch": a.feta.buildarch, "builduser": a.feta.builduser, }, + "instanceSummary": a.instanceSummary(), } json, err := json.Marshal(index) diff --git a/apiserver.go b/apiserver.go index b833f9e..0013f27 100644 --- a/apiserver.go +++ b/apiserver.go @@ -44,7 +44,9 @@ func (a *FetaAPIServer) getRouter() *gin.Engine { r.Use(ginzerolog.Logger("gin")) r.GET("/.well-known/healthcheck.json", gin.WrapF(a.getHealthCheckHandler())) - r.GET("/", gin.WrapF(a.getIndexHandler())) + r.GET("/", func(c *gin.Context) { c.Redirect(http.StatusMovedPermanently, "/feta") }) + r.GET("/feta", gin.WrapF(a.getIndexHandler())) + r.GET("/feta/list/instances", gin.WrapF(a.getInstanceListHandler())) return r } diff --git a/dbmodel.go b/dbmodel.go new file mode 100644 index 0000000..7c50859 --- /dev/null +++ b/dbmodel.go @@ -0,0 +1,14 @@ +package feta + +import "github.com/jinzhu/gorm" +import _ "github.com/jinzhu/gorm/dialects/sqlite" + +type SavedInstance struct { + gorm.Model + hostname string + software string +} + +func (f *FetaProcess) databaseMigrations() { + f.db.AutoMigrate(&SavedInstance{}) +} diff --git a/feta.go b/feta.go index 9579a90..f52749c 100644 --- a/feta.go +++ b/feta.go @@ -32,7 +32,7 @@ type FetaProcess struct { manager *InstanceManager api *FetaAPIServer db *gorm.DB - startup *time.Time + startup time.Time } func (f *FetaProcess) identify() { @@ -72,26 +72,31 @@ func (f *FetaProcess) setupLogging() { } func (f *FetaProcess) Uptime() time.Duration { - return time.Since(*f.startup) + return time.Since(f.startup) +} + +func (f *FetaProcess) setupDatabase() { + var err error + f.db, err = gorm.Open("sqlite3", "feta.sqlite") + + if err != nil { + panic(err) + } + + f.databaseMigrations() } func (f *FetaProcess) RunForever() int { - t := time.Now() - f.startup = &t + f.startup = time.Now() - var err error - f.db, err = gorm.Open("sqlite3", "/feta.sqlite") - defer f.db.Close() + f.setupDatabase() - if err != nil { - log.Panic().Err(err) - } newInstanceHostnameNotifications := make(chan InstanceHostname) f.locator = NewInstanceLocator() f.manager = NewInstanceManager() f.api = new(FetaAPIServer) - f.api.feta = f // api needs to get to us + f.api.feta = f // api needs to get to us to access data f.locator.AddInstanceNotificationChannel(newInstanceHostnameNotifications) f.manager.AddInstanceNotificationChannel(newInstanceHostnameNotifications) diff --git a/instance.go b/instance.go index 286ef7c..d147221 100644 --- a/instance.go +++ b/instance.go @@ -9,38 +9,31 @@ import "sync" import "time" import "errors" -import "github.com/gin-gonic/gin" +//import "github.com/gin-gonic/gin" +import "github.com/looplab/fsm" import "github.com/rs/zerolog/log" const NodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0" -const INSTANCE_NODEINFO_TIMEOUT = time.Second * 5 +const INSTANCE_NODEINFO_TIMEOUT = time.Second * 50 -const INSTANCE_HTTP_TIMEOUT = time.Second * 60 +const INSTANCE_HTTP_TIMEOUT = time.Second * 50 const INSTANCE_SPIDER_INTERVAL = time.Second * 60 const INSTANCE_ERROR_INTERVAL = time.Second * 60 * 30 -type InstanceImplementation int +type instanceImplementation int const ( - Unknown InstanceImplementation = iota + Unknown instanceImplementation = iota Mastodon Pleroma ) -type InstanceStatus int +type instanceStatus int -const ( - InstanceStatusNone InstanceStatus = iota - InstanceStatusUnknown - InstanceStatusAlive - InstanceStatusIdentified - InstanceStatusFailure -) - -type Instance struct { +type instance struct { structLock sync.Mutex errorCount uint successCount uint @@ -48,114 +41,162 @@ type Instance struct { hostname string identified bool fetching bool - implementation InstanceImplementation + implementation instanceImplementation backend *InstanceBackend - status InstanceStatus + status instanceStatus nextFetch time.Time nodeInfoUrl string serverVersionString string serverImplementationString string fetchingLock sync.Mutex + fsm *fsm.FSM + fsmLock sync.Mutex } -func NewInstance(hostname InstanceHostname) *Instance { - self := new(Instance) - self.hostname = string(hostname) - self.status = InstanceStatusUnknown - self.setNextFetchAfter(86400 * time.Second) - return self +func NewInstance(options ...func(i *instance)) *instance { + i := new(instance) + i.setNextFetchAfter(1 * time.Second) + + i.fsm = fsm.NewFSM( + "STATUS_UNKNOWN", + fsm.Events{ + {Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"}, + {Name: "GOT_NODEINFO_URL", Src: []string{"FETCHING_NODEINFO_URL"}, Dst: "PRE_NODEINFO_FETCH"}, + {Name: "BEGIN_NODEINFO_FETCH", Src: []string{"PRE_NODEINFO_FETCH"}, Dst: "FETCHING_NODEINFO"}, + {Name: "GOT_NODEINFO", Src: []string{"FETCHING_NODEINFO"}, Dst: "READY_FOR_TOOTFETCH"}, + {Name: "FETCH_TIME_REACHED", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "READY_AND_DUE_FETCH"}, + {Name: "WEIRD_NODE_RESPONSE", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "WEIRD_NODE"}, + {Name: "EARLY_FETCH_ERROR", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "EARLY_ERROR"}, + {Name: "TOOT_FETCH_ERROR", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "TOOT_FETCH_ERROR"}, + }, + fsm.Callbacks{ + "enter_state": func(e *fsm.Event) { i.fsmEnterState(e) }, + }, + ) + + for _, opt := range options { + opt(i) + } + return i } -func (self *Instance) Lock() { - self.structLock.Lock() +func (i *instance) Status() string { + i.fsmLock.Lock() + defer i.fsmLock.Unlock() + return i.fsm.Current() } -func (self *Instance) Unlock() { - self.structLock.Unlock() +func (i *instance) Event(eventname string) { + i.fsmLock.Lock() + defer i.fsmLock.Unlock() + i.fsm.Event(eventname) } -func (self *Instance) bumpFetch() { - self.Lock() - defer self.Unlock() - self.nextFetch = time.Now().Add(100 * time.Second) +func (i *instance) fsmEnterState(e *fsm.Event) { + log.Debug(). + Str("hostname", i.hostname). + Str("state", e.Dst). + Msg("instance changed state") } -func (self *Instance) setNextFetchAfter(d time.Duration) { - self.Lock() - defer self.Unlock() - self.nextFetch = time.Now().Add(d) +func (i *instance) Lock() { + i.structLock.Lock() } -func (self *Instance) Fetch() { - self.fetchingLock.Lock() - defer self.fetchingLock.Unlock() +func (i *instance) Unlock() { + i.structLock.Unlock() +} - self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) +func (i *instance) bumpFetch() { + i.Lock() + defer i.Unlock() + i.nextFetch = time.Now().Add(100 * time.Second) +} - err := self.detectNodeTypeIfNecessary() +func (i *instance) setNextFetchAfter(d time.Duration) { + i.Lock() + defer i.Unlock() + i.nextFetch = time.Now().Add(d) +} + +func (i *instance) Fetch() { + i.fetchingLock.Lock() + defer i.fetchingLock.Unlock() + + i.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) + + err := i.detectNodeTypeIfNecessary() if err != nil { log.Debug(). - Str("hostname", self.hostname). + Str("hostname", i.hostname). Err(err). Msg("unable to fetch instance metadata") return } - self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) - log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", self.hostname) + i.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) + log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", i.hostname) } -func (self *Instance) dueForFetch() bool { - self.Lock() - defer self.Unlock() - if !self.identified { - return false - } - nf := self.nextFetch - return nf.Before(time.Now()) -} - -func (self *Instance) nodeIdentified() bool { - self.Lock() - defer self.Unlock() - if self.implementation > Unknown { +func (i *instance) dueForFetch() bool { + // this just checks FSM state, the ticker must update it and do time + // calcs + if i.Status() == "READY_AND_DUE_FETCH" { return true } return false } -func (self *Instance) detectNodeTypeIfNecessary() error { - if !self.nodeIdentified() { - return self.fetchNodeInfo() +func (i *instance) isNowPastFetchTime() bool { + return time.Now().After(i.nextFetch) +} + +func (i *instance) Tick() { + if i.Status() == "READY_FOR_TOOTFETCH" { + if i.isNowPastFetchTime() { + i.Event("FETCH_TIME_REACHED") + } + } else if i.Status() == "STATUS_UNKNOWN" { + i.Fetch() + } +} + +func (i *instance) nodeIdentified() bool { + i.Lock() + defer i.Unlock() + if i.implementation > Unknown { + return true + } + return false +} + +func (i *instance) detectNodeTypeIfNecessary() error { + if !i.nodeIdentified() { + return i.fetchNodeInfo() } else { return nil } } -func (self *Instance) registerError() { - self.Lock() - defer self.Unlock() - self.errorCount++ +func (i *instance) registerError() { + i.Lock() + defer i.Unlock() + i.errorCount++ } -func (self *Instance) registerSuccess() { - self.Lock() - defer self.Unlock() - self.successCount++ +func (i *instance) registerSuccess() { + i.Lock() + defer i.Unlock() + i.successCount++ } -func (self *Instance) APIReport() *gin.H { - r := gin.H{} - return &r -} - -func (i *Instance) Up() bool { +func (i *instance) Up() bool { i.Lock() defer i.Unlock() return i.successCount > 0 } -func (i *Instance) fetchNodeInfoURL() error { +func (i *instance) fetchNodeInfoURL() error { url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname) var c = &http.Client{ Timeout: INSTANCE_NODEINFO_TIMEOUT, @@ -166,6 +207,7 @@ func (i *Instance) fetchNodeInfoURL() error { Str("hostname", i.hostname). Msg("fetching nodeinfo reference URL") + i.Event("BEGIN_NODEINFO_URL_FETCH") resp, err := c.Get(url) if err != nil { log.Debug(). @@ -173,6 +215,7 @@ func (i *Instance) fetchNodeInfoURL() error { Err(err). Msg("unable to fetch nodeinfo, node is down?") i.registerError() + i.Event("EARLY_FETCH_ERROR") return err } @@ -185,6 +228,7 @@ func (i *Instance) fetchNodeInfoURL() error { Err(err). Msg("unable to read nodeinfo") i.registerError() + i.Event("EARLY_FETCH_ERROR") return err } @@ -196,6 +240,7 @@ func (i *Instance) fetchNodeInfoURL() error { Err(err). Msg("unable to parse nodeinfo, node is weird") i.registerError() + i.Event("WEIRD_NODE_RESPONSE") return err } @@ -210,23 +255,25 @@ func (i *Instance) fetchNodeInfoURL() error { i.nodeInfoUrl = item.Href i.Unlock() i.registerSuccess() + i.Event("GOT_NODEINFO_URL") return nil } log.Debug(). Str("hostname", i.hostname). Str("item-rel", item.Rel). Str("item-href", item.Href). - Msg("found key in nodeinfo") + Msg("nodeinfo entry") } log.Error(). Str("hostname", i.hostname). Msg("incomplete nodeinfo") i.registerError() + i.Event("WEIRD_NODE_RESPONSE") return errors.New("incomplete nodeinfo") } -func (i *Instance) fetchNodeInfo() error { +func (i *instance) fetchNodeInfo() error { err := i.fetchNodeInfoURL() if err != nil { @@ -243,6 +290,7 @@ func (i *Instance) fetchNodeInfo() error { url := i.nodeInfoUrl i.Unlock() + i.Event("BEGIN_NODEINFO_FETCH") resp, err := c.Get(url) if err != nil { @@ -251,6 +299,7 @@ func (i *Instance) fetchNodeInfo() error { Err(err). Msgf("unable to fetch nodeinfo data") i.registerError() + i.Event("EARLY_FETCH_ERROR") return err } @@ -263,6 +312,7 @@ func (i *Instance) fetchNodeInfo() error { Err(err). Msgf("unable to read nodeinfo data") i.registerError() + i.Event("EARLY_FETCH_ERROR") return err } @@ -274,6 +324,7 @@ func (i *Instance) fetchNodeInfo() error { Err(err). Msgf("unable to parse nodeinfo") i.registerError() + i.Event("WEIRD_NODE_RESPONSE") return err } @@ -296,9 +347,9 @@ func (i *Instance) fetchNodeInfo() error { Msg("detected server software") i.identified = true i.implementation = Pleroma - i.status = InstanceStatusIdentified i.Unlock() i.registerSuccess() + i.Event("GOT_NODEINFO") return nil } else if ni.Software.Name == "mastodon" { log.Debug(). @@ -307,9 +358,9 @@ func (i *Instance) fetchNodeInfo() error { Msg("detected server software") i.identified = true i.implementation = Mastodon - i.status = InstanceStatusIdentified i.Unlock() i.registerSuccess() + i.Event("GOT_NODEINFO") return nil } else { log.Error(). @@ -318,7 +369,8 @@ func (i *Instance) fetchNodeInfo() error { Msg("FIXME unknown server implementation") i.Unlock() i.registerError() - return errors.New("FIXME unknown server implementation") + i.Event("WEIRD_NODE_RESPONSE") + return errors.New("unknown server implementation") } } @@ -339,14 +391,14 @@ func (i *Instance) fetchRecentToots() ([]byte, error) { */ /* -func (self *PleromaBackend) fetchRecentToots() ([]byte, error) { +func (i *PleromaBackend) fetchRecentToots() ([]byte, error) { //url := //fmt.Sprintf("https://%s/api/statuses/public_and_external_timeline.json?count=100", //i.hostname) return nil, nil } -func (self *MastodonBackend) fetchRecentTootsJsonFromMastodon() ([]byte, error) { +func (i *MastodonBackend) fetchRecentTootsJsonFromMastodon() ([]byte, error) { //url := //fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", //i.hostname) diff --git a/locator.go b/locator.go index e87a27c..809d4f3 100644 --- a/locator.go +++ b/locator.go @@ -23,7 +23,7 @@ var INDEX_ERROR_INTERVAL = time.Second * 60 * 10 // LOG_REPORT_INTERVAL defines how long between logging internal // stats/reporting for user supervision -var LOG_REPORT_INTERVAL = time.Second * 60 +var LOG_REPORT_INTERVAL = time.Second * 10 const mastodonIndexUrl = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false" const pleromaIndexUrl = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api.cgi" @@ -55,6 +55,22 @@ func (self *InstanceLocator) addInstance(hostname InstanceHostname) { self.reportInstanceVia <- hostname } +func (self *InstanceLocator) mastodonIndexRefreshDue() bool { + return self.mastodonIndexNextRefresh.Before(time.Now()) +} + +func (self *InstanceLocator) durationUntilNextMastodonIndexRefresh() time.Duration { + return (time.Duration(-1) * time.Now().Sub(*self.mastodonIndexNextRefresh)) +} + +func (self *InstanceLocator) pleromaIndexRefreshDue() bool { + return self.pleromaIndexNextRefresh.Before(time.Now()) +} + +func (self *InstanceLocator) durationUntilNextPleromaIndexRefresh() time.Duration { + return (time.Duration(-1) * time.Now().Sub(*self.pleromaIndexNextRefresh)) +} + func (self *InstanceLocator) Locate() { log.Info().Msg("InstanceLocator starting") x := time.Now() @@ -66,7 +82,7 @@ func (self *InstanceLocator) Locate() { log.Info().Msg("InstanceLocator tick") go func() { - if self.pleromaIndexNextRefresh.Before(time.Now()) { + if self.pleromaIndexRefreshDue() { if !pleromaSemaphore.TryAcquire(1) { return } @@ -76,7 +92,7 @@ func (self *InstanceLocator) Locate() { }() go func() { - if self.mastodonIndexNextRefresh.Before(time.Now()) { + if self.mastodonIndexRefreshDue() { if !mastodonSemaphore.TryAcquire(1) { return } @@ -90,11 +106,11 @@ func (self *InstanceLocator) Locate() { if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) { x = time.Now() log.Debug(). - Str("nextMastodonIndexFetch", time.Now().Sub(*self.mastodonIndexNextRefresh).String()). - Send() + Str("nextMastodonIndexRefresh", self.durationUntilNextMastodonIndexRefresh().String()). + Msg("refresh countdown") log.Debug(). - Str("nextMastodonIndexFetch", time.Now().Sub(*self.pleromaIndexNextRefresh).String()). - Send() + Str("nextPleromaIndexRefresh", self.durationUntilNextPleromaIndexRefresh().String()). + Msg("refresh countdown") } } diff --git a/manager.go b/manager.go index 4770dcc..fe325d8 100644 --- a/manager.go +++ b/manager.go @@ -2,13 +2,12 @@ package feta import "sync" import "time" -import "fmt" import "runtime" //import "github.com/gin-gonic/gin" import "github.com/rs/zerolog/log" -const HOST_DISCOVERY_PARALLELISM = 10 +const HOST_DISCOVERY_PARALLELISM = 1 type InstanceBackend interface { //FIXME @@ -16,7 +15,7 @@ type InstanceBackend interface { type InstanceManager struct { mu sync.Mutex - instances map[InstanceHostname]*Instance + instances map[InstanceHostname]*instance newInstanceNotifications chan InstanceHostname startup time.Time hostAdderSemaphore chan bool @@ -25,7 +24,7 @@ type InstanceManager struct { func NewInstanceManager() *InstanceManager { i := new(InstanceManager) i.hostAdderSemaphore = make(chan bool, HOST_DISCOVERY_PARALLELISM) - i.instances = make(map[InstanceHostname]*Instance) + i.instances = make(map[InstanceHostname]*instance) return i } @@ -76,29 +75,31 @@ func (self *InstanceManager) Manage() { self.receiveNewInstanceHostnames() }() self.startup = time.Now() + x := self.startup for { log.Info().Msg("InstanceManager tick") self.managerLoop() time.Sleep(1 * time.Second) + if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) { + x = time.Now() + self.logInstanceReport() + } } } func (self *InstanceManager) managerLoop() { self.Lock() - il := make([]*Instance, 0) + il := make([]*instance, 0) for _, v := range self.instances { il = append(il, v) } self.Unlock() for _, v := range il { - if v.dueForFetch() { - go func(i *Instance) { - i.Fetch() - }(v) - } + go func(i *instance) { + i.Tick() + }(v) } - } func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool { @@ -120,7 +121,9 @@ func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { // this blocks on the channel size, limiting concurrency self.hostAdderSemaphore <- true - i := NewInstance(newhn) + i := NewInstance(func(x *instance) { + x.hostname = string(newhn) + }) // we do node detection under the addLock to avoid thundering // on startup i.detectNodeTypeIfNecessary() @@ -147,37 +150,19 @@ func (self *InstanceManager) receiveNewInstanceHostnames() { func (self *InstanceManager) logInstanceReport() { r := self.instanceSummaryReport() - log.Info(). - Uint("up", r.up). - Uint("total", r.total). - Uint("identified", r.identified). + + sublogger := log.With().Logger() + + for k, v := range r { + sublogger = sublogger.With().Uint(k, v).Logger() + } + + sublogger.Info(). Msg("instance report") } -type InstanceSummaryReport struct { - up uint - identified uint - total uint -} - -func (r *InstanceSummaryReport) String() string { - return fmt.Sprintf("up=%d identified=%d total=%d", r.up, r.identified, r.total) -} - -func (self *InstanceManager) NumInstances() uint { - return self.instanceSummaryReport().total -} - -type InstanceListReport []*InstanceDetail - -type InstanceDetail struct { - hostname string - up bool - nextFetch string -} - -func (self *InstanceManager) listInstances() []*Instance { - var out []*Instance +func (self *InstanceManager) listInstances() []*instance { + var out []*instance self.Lock() defer self.Unlock() for _, v := range self.instances { @@ -186,22 +171,12 @@ func (self *InstanceManager) listInstances() []*Instance { return out } -func (self *InstanceManager) instanceSummaryReport() *InstanceSummaryReport { - self.Lock() - defer self.Unlock() - r := new(InstanceSummaryReport) - - r.total = uint(len(self.instances)) - - for _, elem := range self.instances { - if elem.identified == true { - r.identified = r.identified + 1 - } - - if elem.status == InstanceStatusAlive { - r.up = r.up + 1 - } +func (im *InstanceManager) instanceSummaryReport() map[string]uint { + r := make(map[string]uint) + for _, v := range im.listInstances() { + v.Lock() + r[v.Status()]++ + v.Unlock() } - return r }