From 3b543fe5a514fe46e2031c01cec2b1a3223f8b74 Mon Sep 17 00:00:00 2001 From: Jeffrey Paul Date: Sun, 3 Nov 2019 10:00:01 -0800 Subject: [PATCH] latest refactoring --- Makefile | 2 +- api.go | 5 +- instance.go | 181 ++++++++++++++++++++++++++------------------- instancelocator.go | 86 +++++++-------------- instancemanager.go | 158 ++++++++++++++++++++++++++++++++++++++- main.go | 6 ++ 6 files changed, 298 insertions(+), 140 deletions(-) diff --git a/Makefile b/Makefile index 47bb03a..1863756 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,7 @@ ifneq ($(UNAME_S),Darwin) GOFLAGS = -ldflags "-linkmode external -extldflags -static $(GOLDFLAGS)" endif -default: rundebug +default: run rundebug: build DEBUG=1 ./$(FN) diff --git a/api.go b/api.go index 561d429..66051ba 100644 --- a/api.go +++ b/api.go @@ -48,7 +48,9 @@ func (a *TootArchiverAPIServer) getRouter() *gin.Engine { }) r.GET("/", func(c *gin.Context) { - ir := a.archiver.locator.instanceReport() + ir := a.archiver.manager.instanceReport() + + il := a.archiver.manager.instanceListForApi() c.JSON(200, gin.H{ // FIXME(sneak) add more stuff here "status": "ok", @@ -59,6 +61,7 @@ func (a *TootArchiverAPIServer) getRouter() *gin.Engine { "up": ir.up, "identified": ir.identified, }, + "instanceList": il, }) }) diff --git a/instance.go b/instance.go index e4503cd..3aee79e 100644 --- a/instance.go +++ b/instance.go @@ -7,6 +7,7 @@ import "net/http" import "strings" import "sync" import "time" +import "errors" import "github.com/rs/zerolog/log" @@ -32,90 +33,123 @@ const ( InstanceStatusNone InstanceStatus = iota InstanceStatusUnknown InstanceStatusAlive + InstanceStatusIdentified InstanceStatusFailure ) type Instance struct { - sync.Mutex + sync.RWMutex errorCount uint successCount uint highestId int - hostName string + hostname string identified bool + fetching bool impl InstanceImplementation backend *InstanceBackend status InstanceStatus - nextCheck *time.Time + nextFetch time.Time nodeInfoUrl string serverVersion string } -func NewInstance(hostname string) *Instance { +func NewInstance(hostname InstanceHostname) *Instance { i := new(Instance) - i.hostName = hostname + i.hostname = string(hostname) i.status = InstanceStatusUnknown - t := time.Now().Add(-1 * time.Second) - i.nextCheck = &t + i.nextFetch = time.Now().Add(-1 * time.Second) // FIXME make checks detect the node type instead of in the constructor return i } -func (i *Instance) setNextCheck(d time.Duration) { +func (i *Instance) setNextFetchAfter(d time.Duration) { i.Lock() defer i.Unlock() - then := time.Now().Add(d) - i.nextCheck = &then + i.nextFetch = time.Now().Add(d) } -func (i *Instance) dueForCheck() bool { - i.Lock() - defer i.Unlock() - return i.nextCheck.Before(time.Now()) +func (self *Instance) setFetching(f bool) { + self.Lock() + defer self.Unlock() + self.fetching = f } -func (i *Instance) detectNodeType() { - i.Lock() - if i.impl > Unknown { - i.Unlock() +func (self *Instance) Fetch() { + self.setFetching(true) + defer self.setFetching(false) + + err := self.detectNodeTypeIfNecessary() + if err != nil { + self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) + log.Debug(). + Str("hostname", self.hostname). + Err(err). + Msg("unable to fetch instance metadata") return } - i.Unlock() - i.fetchNodeInfo() + + //self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) + log.Info().Msgf("i (%s) should check for toots", self.hostname) +} + +func (self *Instance) dueForFetch() bool { + self.RLock() + defer self.RUnlock() + if self.fetching { + return false + } + return self.nextFetch.Before(time.Now()) +} + +func (i *Instance) detectNodeTypeIfNecessary() error { + i.RLock() + if i.impl > Unknown { + i.RUnlock() + return nil + } + i.RUnlock() + return i.fetchNodeInfo() } func (i *Instance) registerError() { - i.setNextCheck(INSTANCE_ERROR_INTERVAL) + i.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) i.Lock() defer i.Unlock() i.errorCount++ } func (i *Instance) registerSuccess() { - i.setNextCheck(INSTANCE_SPIDER_INTERVAL) + i.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) i.Lock() defer i.Unlock() i.successCount++ } -func (i *Instance) fetchNodeInfoURL() { - url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostName) +func (i *Instance) Up() bool { + i.Lock() + defer i.Unlock() + return i.successCount > 0 +} + +func (i *Instance) fetchNodeInfoURL() error { + url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname) var c = &http.Client{ Timeout: INSTANCE_HTTP_TIMEOUT, } log.Debug(). Str("url", url). - Str("hostname", i.hostName). + Str("hostname", i.hostname). Msg("fetching nodeinfo reference URL") resp, err := c.Get(url) if err != nil { log.Debug(). - Str("hostname", i.hostName). + Str("hostname", i.hostname). Err(err). Msg("unable to fetch nodeinfo, node is down?") i.registerError() - return + return err } defer resp.Body.Close() @@ -123,28 +157,28 @@ func (i *Instance) fetchNodeInfoURL() { if err != nil { log.Debug(). - Str("hostname", i.hostName). + Str("hostname", i.hostname). Err(err). Msg("unable to read nodeinfo") i.registerError() - return + return err } nir := new(NodeInfoWellKnownResponse) err = json.Unmarshal(body, &nir) if err != nil { - log.Error(). - Str("hostname", i.hostName). + log.Debug(). + Str("hostname", i.hostname). Err(err). Msg("unable to parse nodeinfo, node is weird") i.registerError() - return + return err } for _, item := range nir.Links { if item.Rel == NodeInfoSchemaVersionTwoName { - log.Info(). - Str("hostname", i.hostName). + log.Debug(). + Str("hostname", i.hostname). Str("nodeinfourl", item.Href). Msg("success fetching url for nodeinfo") @@ -152,32 +186,27 @@ func (i *Instance) fetchNodeInfoURL() { i.nodeInfoUrl = item.Href i.Unlock() i.registerSuccess() - return + return nil } + log.Debug(). + Str("hostname", i.hostname). + Str("item-rel", item.Rel). + Str("item-href", item.Href). + Msg("found key in nodeinfo") } log.Error(). - Str("hostname", i.hostName). + Str("hostname", i.hostname). Msg("incomplete nodeinfo") i.registerError() - return + return errors.New("incomplete nodeinfo") } -func (i *Instance) fetchNodeInfo() { - i.fetchNodeInfoURL() +func (i *Instance) fetchNodeInfo() error { + err := i.fetchNodeInfoURL() - i.Lock() - failure := false - if i.nodeInfoUrl == "" { - log.Error(). - Str("hostname", i.hostName). - Msg("unable to fetch nodeinfo as nodeinfo URL cannot be determined") - failure = true - } - i.Unlock() - - if failure == true { - return + if err != nil { + return err } var c = &http.Client{ @@ -186,19 +215,19 @@ func (i *Instance) fetchNodeInfo() { //FIXME make sure the nodeinfourl is on the same domain as the instance //hostname - i.Lock() + i.RLock() url := i.nodeInfoUrl - i.Unlock() + i.RUnlock() resp, err := c.Get(url) if err != nil { - log.Error(). - Str("hostname", i.hostName). + log.Debug(). + Str("hostname", i.hostname). Err(err). Msgf("unable to fetch nodeinfo data") i.registerError() - return + return err } defer resp.Body.Close() @@ -206,28 +235,28 @@ func (i *Instance) fetchNodeInfo() { if err != nil { log.Error(). - Str("hostname", i.hostName). + Str("hostname", i.hostname). Err(err). Msgf("unable to read nodeinfo data") i.registerError() - return + return err } ni := new(NodeInfoVersionTwoSchema) err = json.Unmarshal(body, &ni) if err != nil { log.Error(). - Str("hostname", i.hostName). + Str("hostname", i.hostname). Err(err). Msgf("unable to parse nodeinfo") i.registerError() - return + return err } - log.Info(). + log.Debug(). Str("serverVersion", ni.Software.Version). Str("software", ni.Software.Name). - Str("hostName", i.hostName). + Str("hostname", i.hostname). Str("nodeInfoUrl", i.nodeInfoUrl). Msg("received nodeinfo from instance") @@ -238,31 +267,29 @@ func (i *Instance) fetchNodeInfo() { ni.Software.Name = strings.ToLower(ni.Software.Name) if ni.Software.Name == "pleroma" { - log.Info(). - Str("hostname", i.hostName). + log.Debug(). + Str("hostname", i.hostname). Str("software", ni.Software.Name). Msg("detected server software") i.registerSuccess() i.identified = true i.impl = Pleroma - i.status = InstanceStatusAlive + i.status = InstanceStatusIdentified + return nil } else if ni.Software.Name == "mastodon" { - log.Info(). - Str("hostname", i.hostName). - Str("software", ni.Software.Name). - Msg("detected server software") i.registerSuccess() i.identified = true i.impl = Mastodon - i.status = InstanceStatusAlive + i.status = InstanceStatusIdentified + return nil } else { log.Error(). - Str("hostname", i.hostName). + Str("hostname", i.hostname). Str("software", ni.Software.Name). - Msg("unknown implementation on server") + Msg("FIXME unknown server implementation") i.registerError() + return errors.New("FIXME unknown server implementation") } - return } /* @@ -283,12 +310,16 @@ func (i *Instance) fetchRecentToots() ([]byte, error) { /* func (self *PleromaBackend) fetchRecentToots() ([]byte, error) { - //url := fmt.Sprintf("https://%s/api/statuses/public_and_external_timeline.json?count=100", i.hostName) + //url := + //fmt.Sprintf("https://%s/api/statuses/public_and_external_timeline.json?count=100", + //i.hostname) return nil, nil } func (self *MastodonBackend) fetchRecentTootsJsonFromMastodon() ([]byte, error) { - //url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", i.hostName) + //url := + //fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", + //i.hostname) return nil, nil } */ diff --git a/instancelocator.go b/instancelocator.go index 4f34202..b619004 100644 --- a/instancelocator.go +++ b/instancelocator.go @@ -1,7 +1,6 @@ package main import "encoding/json" -import "fmt" import "io/ioutil" import "net/http" import "time" @@ -11,6 +10,8 @@ import "github.com/rs/zerolog/log" const INDEX_API_TIMEOUT = time.Second * 60 +var USER_AGENT = "https://github.com/sneak/feta indexer bot; sneak@sneak.berlin for feedback" + // check with indices only hourly var INDEX_CHECK_INTERVAL = time.Second * 60 * 60 @@ -26,13 +27,11 @@ type InstanceLocator struct { pleromaIndexNextRefresh *time.Time mastodonIndexNextRefresh *time.Time reportInstanceVia chan InstanceHostname - instances map[string]*Instance sync.Mutex } func NewInstanceLocator() *InstanceLocator { i := new(InstanceLocator) - i.instances = make(map[string]*Instance) n := time.Now() i.pleromaIndexNextRefresh = &n i.mastodonIndexNextRefresh = &n @@ -45,20 +44,17 @@ func (self *InstanceLocator) AddInstanceNotificationChannel(via chan InstanceHos self.reportInstanceVia = via } -func (self *InstanceLocator) addInstance(hostname string) { - self.Lock() - defer self.Unlock() - // only add it if we haven't seen the hostname before - if self.instances[hostname] == nil { - log.Info().Str("hostname", hostname).Msgf("adding discovered instance") - self.instances[hostname] = NewInstance(hostname) - } - +func (self *InstanceLocator) addInstance(hostname InstanceHostname) { + // receiver (InstanceManager) is responsible for de-duping against its + // map + self.reportInstanceVia <- hostname } func (self *InstanceLocator) Locate() { + log.Info().Msg("InstanceLocator starting") x := time.Now() for { + log.Info().Msg("InstanceLocator tick") if self.pleromaIndexNextRefresh.Before(time.Now()) { self.locatePleroma() } @@ -74,61 +70,24 @@ func (self *InstanceLocator) Locate() { log.Debug(). Str("nextPleromaIndexFetch", self.pleromaIndexNextRefresh.Format(time.RFC3339)). Send() - self.logInstanceReport() } } } -func (self *InstanceLocator) logInstanceReport() { - r := self.instanceReport() - log.Info(). - Uint("up", r.up). - Uint("total", r.total). - Uint("identified", r.identified). - Msg("instance report") -} - -type InstanceLocatorReport struct { - up uint - identified uint - total uint -} - -func (r *InstanceLocatorReport) String() string { - return fmt.Sprintf("up=%d identified=%d total=%d", r.up, r.identified, r.total) -} - -func (self *InstanceLocator) NumInstances() uint { - return self.instanceReport().total -} - -func (self *InstanceLocator) instanceReport() *InstanceLocatorReport { - self.Lock() - defer self.Unlock() - r := new(InstanceLocatorReport) - - r.total = uint(len(self.instances)) - - for _, elem := range self.instances { - if elem.identified == true { - r.identified = r.identified + 1 - } - - if elem.status == InstanceStatusAlive { - r.up = r.up + 1 - } - } - - return r -} - func (self *InstanceLocator) locateMastodon() { var c = &http.Client{ Timeout: INDEX_API_TIMEOUT, } - resp, err := c.Get(mastodonIndexUrl) + req, err := http.NewRequest("GET", mastodonIndexUrl, nil) + if err != nil { + panic(err) + } + + req.Header.Set("User-Agent", USER_AGENT) + + resp, err := c.Do(req) if err != nil { log.Error().Msgf("unable to fetch mastodon instance list: %s", err) t := time.Now().Add(INDEX_ERROR_INTERVAL) @@ -162,7 +121,7 @@ func (self *InstanceLocator) locateMastodon() { } for _, instance := range mi.Instances { - self.addInstance(instance.Name) + self.addInstance(InstanceHostname(instance.Name)) } t := time.Now().Add(INDEX_CHECK_INTERVAL) @@ -175,7 +134,14 @@ func (self *InstanceLocator) locatePleroma() { var c = &http.Client{ Timeout: INDEX_API_TIMEOUT, } - resp, err := c.Get(pleromaIndexUrl) + + req, err := http.NewRequest("GET", pleromaIndexUrl, nil) + if err != nil { + panic(err) + } + req.Header.Set("User-Agent", USER_AGENT) + + resp, err := c.Do(req) if err != nil { log.Error().Msgf("unable to fetch pleroma instance list: %s", err) @@ -211,7 +177,7 @@ func (self *InstanceLocator) locatePleroma() { } for _, instance := range *pi { - self.addInstance(instance.Domain) + self.addInstance(InstanceHostname(instance.Domain)) } t := time.Now().Add(INDEX_CHECK_INTERVAL) self.Lock() diff --git a/instancemanager.go b/instancemanager.go index daddc53..5bb1bf3 100644 --- a/instancemanager.go +++ b/instancemanager.go @@ -2,24 +2,63 @@ package main import "sync" import "time" +import "fmt" +import "runtime" -//import "github.com/rs/zerolog/log" +import "github.com/rs/zerolog/log" type InstanceBackend interface { //FIXME } type InstanceManager struct { - sync.Mutex + mu sync.Mutex instances map[InstanceHostname]*Instance newInstanceNotifications chan InstanceHostname + startup time.Time } func NewInstanceManager() *InstanceManager { i := new(InstanceManager) + i.instances = make(map[InstanceHostname]*Instance) return i } +func (self *InstanceManager) logCaller(msg string) { + fpcs := make([]uintptr, 1) + // Skip 2 levels to get the caller + n := runtime.Callers(3, fpcs) + if n == 0 { + log.Debug().Msg("MSG: NO CALLER") + } + + caller := runtime.FuncForPC(fpcs[0] - 1) + if caller == nil { + log.Debug().Msg("MSG CALLER WAS NIL") + } + + // Print the file name and line number + filename, line := caller.FileLine(fpcs[0] - 1) + function := caller.Name() + + log.Debug(). + Str("filename", filename). + Int("linenum", line). + Str("function", function). + Msg(msg) +} + +func (self *InstanceManager) Lock() { + self.logCaller("instancemanager attempting to lock") + self.mu.Lock() + self.logCaller("instancemanager locked") +} + +func (self *InstanceManager) Unlock() { + self.mu.Unlock() + self.logCaller("instancemanager unlocked") +} + func (self *InstanceManager) AddInstanceNotificationChannel(via chan InstanceHostname) { self.Lock() defer self.Unlock() @@ -27,8 +66,121 @@ func (self *InstanceManager) AddInstanceNotificationChannel(via chan InstanceHos } func (self *InstanceManager) Manage() { + self.managerLoop() +} + +func (self *InstanceManager) managerLoop() { + log.Info().Msg("InstanceManager starting") + go self.receiveNewInstanceHostnames() + self.startup = time.Now() for { - // FIXME(sneak) + log.Info().Msg("InstanceManager tick") + self.Lock() + for _, v := range self.instances { + go func() { + if v.dueForFetch() { + v.Fetch() + } + }() + } + self.Unlock() time.Sleep(1 * time.Second) } } + +func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool { + self.Lock() + for k, _ := range self.instances { + if newhn == k { + self.Unlock() + return true + } + } + self.Unlock() + return false +} + +func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { + // only add it if we haven't seen the hostname before + if !self.hostnameExists(newhn) { + i := NewInstance(newhn) + self.Lock() + self.instances[newhn] = i + self.Unlock() + } +} + +func (self *InstanceManager) receiveNewInstanceHostnames() { + var newhn InstanceHostname + for { + newhn = <-self.newInstanceNotifications + self.addInstanceByHostname(newhn) + } +} + +func (self *InstanceManager) logInstanceReport() { + r := self.instanceReport() + log.Info(). + Uint("up", r.up). + Uint("total", r.total). + Uint("identified", r.identified). + Msg("instance report") +} + +type InstanceReport struct { + up uint + identified uint + total uint +} + +func (r *InstanceReport) String() string { + return fmt.Sprintf("up=%d identified=%d total=%d", r.up, r.identified, r.total) +} + +func (self *InstanceManager) NumInstances() uint { + return self.instanceReport().total +} + +type InstanceListReport []*InstanceDetail + +type InstanceDetail struct { + hostname string + up bool + nextFetch string +} + +func (self *InstanceManager) instanceListForApi() InstanceListReport { + var output InstanceListReport + self.Lock() + defer self.Unlock() + for _, v := range self.instances { + id := &InstanceDetail{ + hostname: v.hostname, + } + id.up = v.Up() + id.nextFetch = string(time.Now().Sub(v.nextFetch)) + output = append(output, id) + fmt.Printf("%s", output) + } + return output +} + +func (self *InstanceManager) instanceReport() *InstanceReport { + self.Lock() + defer self.Unlock() + r := new(InstanceReport) + + r.total = uint(len(self.instances)) + + for _, elem := range self.instances { + if elem.identified == true { + r.identified = r.identified + 1 + } + + if elem.status == InstanceStatusAlive { + r.up = r.up + 1 + } + } + + return r +} diff --git a/main.go b/main.go index 1b3a747..96a7c82 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import "os" import "sync" +import "time" import "github.com/rs/zerolog" import "github.com/rs/zerolog/log" @@ -19,6 +20,11 @@ func app() int { identify() + // always log in UTC + zerolog.TimestampFunc = func() time.Time { + return time.Now().UTC() + } + zerolog.SetGlobalLevel(zerolog.InfoLevel) if os.Getenv("DEBUG") != "" { zerolog.SetGlobalLevel(zerolog.DebugLevel)