diff --git a/.gitignore b/.gitignore index 1b38502..4a757fe 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ feta output/ feta.sqlite +.lintsetup diff --git a/Makefile b/Makefile index 0a04f06..a3bef2c 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,7 @@ ifneq ($(UNAME_S),Darwin) GOFLAGS = -ldflags "-linkmode external -extldflags -static $(GOLDFLAGS)" endif -default: rundebug +default: run rundebug: build GOTRACEBACK=all DEBUG=1 ./$(FN) @@ -38,9 +38,12 @@ clean: build: ./$(FN) -lint: +.lintsetup: go get -u golang.org/x/lint/golint go get -u github.com/GeertJohan/fgt + touch .lintsetup + +lint: .lintsetup fgt golint go-get: @@ -52,7 +55,7 @@ go-get: fmt: go fmt *.go -test: build-docker-image +test: lint build-docker-image is_uncommitted: git diff --exit-code >/dev/null 2>&1 diff --git a/apihandlers.go b/apihandlers.go index 67d7dd3..027e1dc 100644 --- a/apihandlers.go +++ b/apihandlers.go @@ -11,7 +11,7 @@ import "github.com/gin-gonic/gin" type hash map[string]interface{} -func (a *FetaAPIServer) instances() []hash { +func (a *fetaAPIServer) instances() []hash { resp := make([]hash, 0) now := time.Now() for _, v := range a.feta.manager.listInstances() { @@ -37,7 +37,7 @@ func (a *FetaAPIServer) instances() []hash { return resp } -func (a *FetaAPIServer) instanceSummary() map[string]int { +func (a *fetaAPIServer) instanceSummary() map[string]int { resp := make(map[string]int) for _, v := range a.feta.manager.listInstances() { v.Lock() @@ -51,7 +51,7 @@ func (a *FetaAPIServer) instanceSummary() map[string]int { return resp } -func (a *FetaAPIServer) getInstanceListHandler() http.HandlerFunc { +func (a *fetaAPIServer) getInstanceListHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { result := &gin.H{ @@ -69,12 +69,12 @@ func (a *FetaAPIServer) getInstanceListHandler() http.HandlerFunc { } } -func (a *FetaAPIServer) getIndexHandler() http.HandlerFunc { +func (a *fetaAPIServer) getIndexHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { index := &gin.H{ "server": &gin.H{ "now": time.Now().UTC().Format(time.RFC3339), - "uptime": a.feta.Uptime().String(), + "uptime": a.feta.uptime().String(), "goroutines": runtime.NumGoroutine(), "goversion": runtime.Version(), "version": a.feta.version, @@ -96,12 +96,12 @@ func (a *FetaAPIServer) getIndexHandler() http.HandlerFunc { } } -func (a *FetaAPIServer) getHealthCheckHandler() http.HandlerFunc { +func (a *fetaAPIServer) getHealthCheckHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { resp := &gin.H{ "status": "ok", "now": time.Now().UTC().Format(time.RFC3339), - "uptime": a.feta.Uptime().String(), + "uptime": a.feta.uptime().String(), } json, err := json.Marshal(resp) diff --git a/apiserver.go b/apiserver.go index 9e55568..fd8e91c 100644 --- a/apiserver.go +++ b/apiserver.go @@ -10,19 +10,19 @@ import "github.com/rs/zerolog/log" import "github.com/gin-gonic/gin" import "github.com/dn365/gin-zerolog" -type FetaAPIServer struct { - feta *FetaProcess +type fetaAPIServer struct { + feta *Process port uint router *gin.Engine server *http.Server debug bool } -func (self *FetaAPIServer) SetFeta(feta *FetaProcess) { - self.feta = feta +func (a *fetaAPIServer) setFeta(feta *Process) { + a.feta = feta } -func (a *FetaAPIServer) Serve() { +func (a *fetaAPIServer) serve() { if a.feta == nil { panic("must have feta app from which to serve stats") } @@ -50,7 +50,7 @@ func (a *FetaAPIServer) Serve() { } } -func (a *FetaAPIServer) initRouter() { +func (a *fetaAPIServer) initRouter() { // empty router r := gin.New() @@ -69,7 +69,7 @@ func (a *FetaAPIServer) initRouter() { a.router = r } -func (a *FetaAPIServer) initServer() { +func (a *fetaAPIServer) initServer() { if !a.debug { gin.SetMode(gin.ReleaseMode) } diff --git a/dbmodel.go b/dbmodel.go index 7c50859..d025a58 100644 --- a/dbmodel.go +++ b/dbmodel.go @@ -1,14 +1,14 @@ package feta import "github.com/jinzhu/gorm" -import _ "github.com/jinzhu/gorm/dialects/sqlite" +import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm -type SavedInstance struct { +type savedInstance struct { gorm.Model hostname string software string } -func (f *FetaProcess) databaseMigrations() { - f.db.AutoMigrate(&SavedInstance{}) +func (f *Process) databaseMigrations() { + f.db.AutoMigrate(&savedInstance{}) } diff --git a/feta.go b/feta.go index 257198a..d6ee5b0 100644 --- a/feta.go +++ b/feta.go @@ -4,38 +4,41 @@ import "os" import "time" import "github.com/jinzhu/gorm" -import _ "github.com/jinzhu/gorm/dialects/sqlite" +import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm import "github.com/rs/zerolog" import "github.com/rs/zerolog/log" import "github.com/mattn/go-isatty" +// InstanceHostname is a special type for holding the hostname of an +// instance (string) type InstanceHostname string +// CLIEntry is the main entrypoint for the feta process from the cli func CLIEntry(version string, buildtime string, buildarch string, builduser string) int { - f := new(FetaProcess) + f := new(Process) f.version = version f.buildtime = buildtime f.buildarch = buildarch f.builduser = builduser f.setupLogging() - return f.RunForever() + return f.runForever() } -// FetaProcess is the main structure/process of this app -type FetaProcess struct { +// Process is the main structure/process of this app +type Process struct { version string buildtime string buildarch string builduser string locator *InstanceLocator manager *InstanceManager - api *FetaAPIServer + api *fetaAPIServer db *gorm.DB startup time.Time } -func (f *FetaProcess) identify() { +func (f *Process) identify() { log.Info(). Str("version", f.version). Str("buildtime", f.buildtime). @@ -44,7 +47,7 @@ func (f *FetaProcess) identify() { Msg("starting") } -func (f *FetaProcess) setupLogging() { +func (f *Process) setupLogging() { log.Logger = log.With().Caller().Logger() @@ -73,11 +76,11 @@ func (f *FetaProcess) setupLogging() { f.identify() } -func (f *FetaProcess) Uptime() time.Duration { +func (f *Process) uptime() time.Duration { return time.Since(f.startup) } -func (f *FetaProcess) setupDatabase() { +func (f *Process) setupDatabase() { var err error f.db, err = gorm.Open("sqlite3", "feta.sqlite") @@ -88,28 +91,28 @@ func (f *FetaProcess) setupDatabase() { f.databaseMigrations() } -func (f *FetaProcess) RunForever() int { +func (f *Process) runForever() int { f.startup = time.Now() f.setupDatabase() newInstanceHostnameNotifications := make(chan InstanceHostname) - f.locator = NewInstanceLocator() - f.manager = NewInstanceManager() - f.api = new(FetaAPIServer) - f.api.feta = f // api needs to get to us to access data + f.locator = newInstanceLocator() + f.manager = newInstanceManager() + f.api = new(fetaAPIServer) + f.api.setFeta(f) // api needs to get to us to access data - f.locator.AddInstanceNotificationChannel(newInstanceHostnameNotifications) - f.manager.AddInstanceNotificationChannel(newInstanceHostnameNotifications) + f.locator.addInstanceNotificationChannel(newInstanceHostnameNotifications) + f.manager.addInstanceNotificationChannel(newInstanceHostnameNotifications) // locator goroutine: - go f.locator.Locate() + go f.locator.locate() // manager goroutine: - go f.manager.Manage() + go f.manager.manage() - go f.api.Serve() + go f.api.serve() // this goroutine (main) does nothing until we handle signals // FIXME(sneak) diff --git a/instance.go b/instance.go index f34154e..727bbb9 100644 --- a/instance.go +++ b/instance.go @@ -13,36 +13,36 @@ import "errors" import "github.com/looplab/fsm" import "github.com/rs/zerolog/log" -const NodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0" +const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0" -const INSTANCE_NODEINFO_TIMEOUT = time.Second * 50 +const instanceNodeinfoTimeout = time.Second * 50 -const INSTANCE_HTTP_TIMEOUT = time.Second * 50 +const instanceHTTPTimeout = time.Second * 50 -const INSTANCE_SPIDER_INTERVAL = time.Second * 60 +const instanceSpiderInterval = time.Second * 120 -const INSTANCE_ERROR_INTERVAL = time.Second * 60 * 30 +const instanceErrorInterval = time.Second * 60 * 30 type instanceImplementation int const ( - Unknown instanceImplementation = iota - Mastodon - Pleroma + implUnknown instanceImplementation = iota + implMastodon + implPleroma ) type instance struct { structLock sync.Mutex errorCount uint successCount uint - highestId int + highestID int hostname string identified bool fetching bool implementation instanceImplementation - backend *InstanceBackend + backend *instanceBackend nextFetch time.Time - nodeInfoUrl string + nodeInfoURL string serverVersionString string serverImplementationString string fetchingLock sync.Mutex @@ -50,7 +50,7 @@ type instance struct { fsmLock sync.Mutex } -func NewInstance(options ...func(i *instance)) *instance { +func newInstance(options ...func(i *instance)) *instance { i := new(instance) i.setNextFetchAfter(1 * time.Second) @@ -107,7 +107,7 @@ func (i *instance) Unlock() { func (i *instance) bumpFetch() { i.Lock() defer i.Unlock() - i.nextFetch = time.Now().Add(100 * time.Second) + i.nextFetch = time.Now().Add(120 * time.Second) } func (i *instance) setNextFetchAfter(d time.Duration) { @@ -120,7 +120,7 @@ func (i *instance) Fetch() { i.fetchingLock.Lock() defer i.fetchingLock.Unlock() - i.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) + i.setNextFetchAfter(instanceErrorInterval) err := i.detectNodeTypeIfNecessary() if err != nil { @@ -131,7 +131,7 @@ func (i *instance) Fetch() { return } - i.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) + i.setNextFetchAfter(instanceSpiderInterval) log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", i.hostname) } @@ -161,7 +161,7 @@ func (i *instance) Tick() { func (i *instance) nodeIdentified() bool { i.Lock() defer i.Unlock() - if i.implementation > Unknown { + if i.implementation > implUnknown { return true } return false @@ -170,9 +170,8 @@ func (i *instance) nodeIdentified() bool { func (i *instance) detectNodeTypeIfNecessary() error { if !i.nodeIdentified() { return i.fetchNodeInfo() - } else { - return nil - } + } + return nil } func (i *instance) registerError() { @@ -196,7 +195,7 @@ func (i *instance) Up() bool { func (i *instance) fetchNodeInfoURL() error { url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname) var c = &http.Client{ - Timeout: INSTANCE_NODEINFO_TIMEOUT, + Timeout: instanceNodeinfoTimeout, } log.Debug(). @@ -229,7 +228,7 @@ func (i *instance) fetchNodeInfoURL() error { return err } - nir := new(NodeInfoWellKnownResponse) + nir := new(nodeInfoWellKnownResponse) err = json.Unmarshal(body, &nir) if err != nil { log.Debug(). @@ -242,14 +241,14 @@ func (i *instance) fetchNodeInfoURL() error { } for _, item := range nir.Links { - if item.Rel == NodeInfoSchemaVersionTwoName { + if item.Rel == nodeInfoSchemaVersionTwoName { log.Debug(). Str("hostname", i.hostname). Str("nodeinfourl", item.Href). Msg("success fetching url for nodeinfo") i.Lock() - i.nodeInfoUrl = item.Href + i.nodeInfoURL = item.Href i.Unlock() i.registerSuccess() i.Event("GOT_NODEINFO_URL") @@ -278,13 +277,13 @@ func (i *instance) fetchNodeInfo() error { } var c = &http.Client{ - Timeout: INSTANCE_NODEINFO_TIMEOUT, + Timeout: instanceNodeinfoTimeout, } //FIXME make sure the nodeinfourl is on the same domain as the instance //hostname i.Lock() - url := i.nodeInfoUrl + url := i.nodeInfoURL i.Unlock() i.Event("BEGIN_NODEINFO_FETCH") @@ -313,7 +312,7 @@ func (i *instance) fetchNodeInfo() error { return err } - ni := new(NodeInfoVersionTwoSchema) + ni := new(nodeInfoVersionTwoSchema) err = json.Unmarshal(body, &ni) if err != nil { log.Error(). @@ -329,7 +328,7 @@ func (i *instance) fetchNodeInfo() error { Str("serverVersion", ni.Software.Version). Str("software", ni.Software.Name). Str("hostname", i.hostname). - Str("nodeInfoUrl", i.nodeInfoUrl). + Str("nodeInfoURL", i.nodeInfoURL). Msg("received nodeinfo from instance") i.Lock() @@ -343,7 +342,7 @@ func (i *instance) fetchNodeInfo() error { Str("software", ni.Software.Name). Msg("detected server software") i.identified = true - i.implementation = Pleroma + i.implementation = implPleroma i.Unlock() i.registerSuccess() i.Event("GOT_NODEINFO") @@ -354,7 +353,7 @@ func (i *instance) fetchNodeInfo() error { Str("software", ni.Software.Name). Msg("detected server software") i.identified = true - i.implementation = Mastodon + i.implementation = implMastodon i.Unlock() i.registerSuccess() i.Event("GOT_NODEINFO") diff --git a/jsonapis.go b/jsonapis.go index 2adeda0..c71da00 100644 --- a/jsonapis.go +++ b/jsonapis.go @@ -4,7 +4,7 @@ import "time" // thank fuck for https://mholt.github.io/json-to-go/ otherwise // this would have been a giant pain in the dick -type MastodonIndexResponse struct { +type mastodonIndexResponse struct { Instances []struct { ID string `json:"_id"` AddedAt time.Time `json:"addedAt"` @@ -48,7 +48,7 @@ type MastodonIndexResponse struct { } `json:"instances"` } -type PleromaIndexResponse []struct { +type pleromaIndexResponse []struct { Domain string `json:"domain"` Title string `json:"title"` Thumbnail string `json:"thumbnail"` @@ -62,7 +62,7 @@ type PleromaIndexResponse []struct { TextLimit int `json:"text_limit"` } -type NodeInfoVersionTwoSchema struct { +type nodeInfoVersionTwoSchema struct { Version string `json:"version"` Software struct { Name string `json:"name"` @@ -80,7 +80,7 @@ type NodeInfoVersionTwoSchema struct { OpenRegistrations bool `json:"openRegistrations"` } -type NodeInfoWellKnownResponse struct { +type nodeInfoWellKnownResponse struct { Links []struct { Rel string `json:"rel"` Href string `json:"href"` diff --git a/locator.go b/locator.go index 809d4f3..d8af1cd 100644 --- a/locator.go +++ b/locator.go @@ -9,69 +9,82 @@ import "sync" import "github.com/rs/zerolog/log" import "golang.org/x/sync/semaphore" -const INDEX_API_TIMEOUT = time.Second * 60 +// IndexAPITimeout is the timeout for fetching json instance lists +// from the listing servers +const IndexAPITimeout = time.Second * 60 -var USER_AGENT = "https://github.com/sneak/feta indexer bot; sneak@sneak.berlin for feedback" +// UserAgent is the user-agent string we provide to servers +var UserAgent = "feta indexer bot, sneak@sneak.berlin for feedback" -// INDEX_CHECK_INTERVAL defines the interval for downloading new lists from +// IndexCheckInterval defines the interval for downloading new lists from // the index APIs run by mastodon/pleroma (default: 1h) -var INDEX_CHECK_INTERVAL = time.Second * 60 * 60 +var IndexCheckInterval = time.Second * 60 * 60 -// INDEX_ERROR_INTERVAL is used for when the index fetch/parse fails +// IndexErrorInterval is used for when the index fetch/parse fails // (default: 10m) -var INDEX_ERROR_INTERVAL = time.Second * 60 * 10 +var IndexErrorInterval = time.Second * 60 * 10 -// LOG_REPORT_INTERVAL defines how long between logging internal +// LogReportInterval defines how long between logging internal // stats/reporting for user supervision -var LOG_REPORT_INTERVAL = time.Second * 10 +var LogReportInterval = time.Second * 10 -const mastodonIndexUrl = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false" -const pleromaIndexUrl = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api.cgi" +const mastodonIndexURL = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false" +const pleromaIndexURL = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api.cgi" +// InstanceLocator is the main data structure for the locator goroutine +// which sprays discovered instance hostnames into the manager type InstanceLocator struct { pleromaIndexNextRefresh *time.Time mastodonIndexNextRefresh *time.Time reportInstanceVia chan InstanceHostname - sync.Mutex + mu sync.Mutex } -func NewInstanceLocator() *InstanceLocator { - i := new(InstanceLocator) +func newInstanceLocator() *InstanceLocator { + il := new(InstanceLocator) n := time.Now() - i.pleromaIndexNextRefresh = &n - i.mastodonIndexNextRefresh = &n - return i + il.pleromaIndexNextRefresh = &n + il.mastodonIndexNextRefresh = &n + return il } -func (self *InstanceLocator) AddInstanceNotificationChannel(via chan InstanceHostname) { - self.Lock() - defer self.Unlock() - self.reportInstanceVia = via +func (il *InstanceLocator) lock() { + il.mu.Lock() } -func (self *InstanceLocator) addInstance(hostname InstanceHostname) { +func (il *InstanceLocator) unlock() { + il.mu.Unlock() +} + +func (il *InstanceLocator) addInstanceNotificationChannel(via chan InstanceHostname) { + il.lock() + defer il.unlock() + il.reportInstanceVia = via +} + +func (il *InstanceLocator) addInstance(hostname InstanceHostname) { // receiver (InstanceManager) is responsible for de-duping against its - // map, we just spray - self.reportInstanceVia <- hostname + // map, we just locate and spray, it manages + il.reportInstanceVia <- hostname } -func (self *InstanceLocator) mastodonIndexRefreshDue() bool { - return self.mastodonIndexNextRefresh.Before(time.Now()) +func (il *InstanceLocator) mastodonIndexRefreshDue() bool { + return il.mastodonIndexNextRefresh.Before(time.Now()) } -func (self *InstanceLocator) durationUntilNextMastodonIndexRefresh() time.Duration { - return (time.Duration(-1) * time.Now().Sub(*self.mastodonIndexNextRefresh)) +func (il *InstanceLocator) durationUntilNextMastodonIndexRefresh() time.Duration { + return (time.Duration(-1) * time.Now().Sub(*il.mastodonIndexNextRefresh)) } -func (self *InstanceLocator) pleromaIndexRefreshDue() bool { - return self.pleromaIndexNextRefresh.Before(time.Now()) +func (il *InstanceLocator) pleromaIndexRefreshDue() bool { + return il.pleromaIndexNextRefresh.Before(time.Now()) } -func (self *InstanceLocator) durationUntilNextPleromaIndexRefresh() time.Duration { - return (time.Duration(-1) * time.Now().Sub(*self.pleromaIndexNextRefresh)) +func (il *InstanceLocator) durationUntilNextPleromaIndexRefresh() time.Duration { + return (time.Duration(-1) * time.Now().Sub(*il.pleromaIndexNextRefresh)) } -func (self *InstanceLocator) Locate() { +func (il *InstanceLocator) locate() { log.Info().Msg("InstanceLocator starting") x := time.Now() var pleromaSemaphore = semaphore.NewWeighted(1) @@ -82,90 +95,90 @@ func (self *InstanceLocator) Locate() { log.Info().Msg("InstanceLocator tick") go func() { - if self.pleromaIndexRefreshDue() { + if il.pleromaIndexRefreshDue() { if !pleromaSemaphore.TryAcquire(1) { return } - self.locatePleroma() + il.locatePleroma() pleromaSemaphore.Release(1) } }() go func() { - if self.mastodonIndexRefreshDue() { + if il.mastodonIndexRefreshDue() { if !mastodonSemaphore.TryAcquire(1) { return } - self.locateMastodon() + il.locateMastodon() mastodonSemaphore.Release(1) } }() time.Sleep(1 * time.Second) - if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) { + if time.Now().After(x.Add(LogReportInterval)) { x = time.Now() log.Debug(). - Str("nextMastodonIndexRefresh", self.durationUntilNextMastodonIndexRefresh().String()). + Str("nextMastodonIndexRefresh", il.durationUntilNextMastodonIndexRefresh().String()). Msg("refresh countdown") log.Debug(). - Str("nextPleromaIndexRefresh", self.durationUntilNextPleromaIndexRefresh().String()). + Str("nextPleromaIndexRefresh", il.durationUntilNextPleromaIndexRefresh().String()). Msg("refresh countdown") } } } -func (self *InstanceLocator) locateMastodon() { +func (il *InstanceLocator) locateMastodon() { var c = &http.Client{ - Timeout: INDEX_API_TIMEOUT, + Timeout: IndexAPITimeout, } - req, err := http.NewRequest("GET", mastodonIndexUrl, nil) + req, err := http.NewRequest("GET", mastodonIndexURL, nil) if err != nil { panic(err) } - req.Header.Set("User-Agent", USER_AGENT) + req.Header.Set("User-Agent", UserAgent) resp, err := c.Do(req) if err != nil { log.Error().Msgf("unable to fetch mastodon instance list: %s", err) - t := time.Now().Add(INDEX_ERROR_INTERVAL) - self.Lock() - self.mastodonIndexNextRefresh = &t - self.Unlock() + t := time.Now().Add(IndexErrorInterval) + il.lock() + il.mastodonIndexNextRefresh = &t + il.unlock() return - } else { - log.Info(). - Msg("fetched mastodon index") } + log.Info(). + Msg("fetched mastodon index") + defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Error().Msgf("unable to fetch mastodon instance list: %s", err) - t := time.Now().Add(INDEX_ERROR_INTERVAL) - self.Lock() - self.mastodonIndexNextRefresh = &t - self.Unlock() + t := time.Now().Add(IndexErrorInterval) + il.lock() + il.mastodonIndexNextRefresh = &t + il.unlock() return } - t := time.Now().Add(INDEX_CHECK_INTERVAL) - self.Lock() - self.mastodonIndexNextRefresh = &t - self.Unlock() + t := time.Now().Add(IndexCheckInterval) + il.lock() + il.mastodonIndexNextRefresh = &t + il.unlock() - mi := new(MastodonIndexResponse) + mi := new(mastodonIndexResponse) err = json.Unmarshal(body, &mi) if err != nil { log.Error().Msgf("unable to parse mastodon instance list: %s", err) - t := time.Now().Add(INDEX_ERROR_INTERVAL) - self.Lock() - self.mastodonIndexNextRefresh = &t - self.Unlock() + t := time.Now().Add(IndexErrorInterval) + il.lock() + il.mastodonIndexNextRefresh = &t + il.unlock() return } @@ -179,31 +192,31 @@ func (self *InstanceLocator) locateMastodon() { Int("count", x). Msg("received hosts from mastodon index") - for k, _ := range hosts { - self.addInstance(InstanceHostname(k)) + for k := range hosts { + il.addInstance(InstanceHostname(k)) } } -func (self *InstanceLocator) locatePleroma() { +func (il *InstanceLocator) locatePleroma() { var c = &http.Client{ - Timeout: INDEX_API_TIMEOUT, + Timeout: IndexAPITimeout, } - req, err := http.NewRequest("GET", pleromaIndexUrl, nil) + req, err := http.NewRequest("GET", pleromaIndexURL, nil) if err != nil { panic(err) } - req.Header.Set("User-Agent", USER_AGENT) + req.Header.Set("User-Agent", UserAgent) resp, err := c.Do(req) if err != nil { log.Error().Msgf("unable to fetch pleroma instance list: %s", err) - t := time.Now().Add(INDEX_ERROR_INTERVAL) - self.Lock() - self.pleromaIndexNextRefresh = &t - self.Unlock() + t := time.Now().Add(IndexErrorInterval) + il.lock() + il.pleromaIndexNextRefresh = &t + il.unlock() return } @@ -212,28 +225,28 @@ func (self *InstanceLocator) locatePleroma() { if err != nil { log.Error().Msgf("unable to fetch pleroma instance list: %s", err) - t := time.Now().Add(INDEX_ERROR_INTERVAL) - self.Lock() - self.pleromaIndexNextRefresh = &t - self.Unlock() + t := time.Now().Add(IndexErrorInterval) + il.lock() + il.pleromaIndexNextRefresh = &t + il.unlock() return } // fetch worked - t := time.Now().Add(INDEX_CHECK_INTERVAL) - self.Lock() - self.pleromaIndexNextRefresh = &t - self.Unlock() + t := time.Now().Add(IndexCheckInterval) + il.lock() + il.pleromaIndexNextRefresh = &t + il.unlock() - pi := new(PleromaIndexResponse) + pi := new(pleromaIndexResponse) err = json.Unmarshal(body, &pi) if err != nil { log.Warn().Msgf("unable to parse pleroma instance list: %s", err) - t := time.Now().Add(INDEX_ERROR_INTERVAL) - self.Lock() - self.pleromaIndexNextRefresh = &t - self.Unlock() + t := time.Now().Add(IndexErrorInterval) + il.lock() + il.pleromaIndexNextRefresh = &t + il.unlock() return } @@ -247,8 +260,8 @@ func (self *InstanceLocator) locatePleroma() { Int("count", x). Msg("received hosts from pleroma index") - for k, _ := range hosts { - self.addInstance(InstanceHostname(k)) + for k := range hosts { + il.addInstance(InstanceHostname(k)) } } diff --git a/manager.go b/manager.go index e6c4aca..330da3a 100644 --- a/manager.go +++ b/manager.go @@ -7,13 +7,14 @@ import "runtime" //import "github.com/gin-gonic/gin" import "github.com/rs/zerolog/log" -const HOST_DISCOVERY_PARALLELISM = 1 - -type InstanceBackend interface { +const hostDiscoveryParallelism = 20 +type instanceBackend interface { //FIXME } +// InstanceManager is the main data structure for the goroutine that manages +// the list of all known instances, fed by the locator type InstanceManager struct { mu sync.Mutex instances map[InstanceHostname]*instance @@ -22,14 +23,14 @@ type InstanceManager struct { hostAdderSemaphore chan bool } -func NewInstanceManager() *InstanceManager { +func newInstanceManager() *InstanceManager { i := new(InstanceManager) - i.hostAdderSemaphore = make(chan bool, HOST_DISCOVERY_PARALLELISM) + i.hostAdderSemaphore = make(chan bool, hostDiscoveryParallelism) i.instances = make(map[InstanceHostname]*instance) return i } -func (self *InstanceManager) logCaller(msg string) { +func (im *InstanceManager) logCaller(msg string) { fpcs := make([]uintptr, 1) // Skip 2 levels to get the caller n := runtime.Callers(3, fpcs) @@ -53,49 +54,47 @@ func (self *InstanceManager) logCaller(msg string) { Msg(msg) } -func (self *InstanceManager) Lock() { - //self.logCaller("instancemanager attempting to lock") - self.mu.Lock() - //self.logCaller("instancemanager locked") +func (im *InstanceManager) lock() { + im.mu.Lock() } -func (self *InstanceManager) Unlock() { - self.mu.Unlock() - //self.logCaller("instancemanager unlocked") +func (im *InstanceManager) unlock() { + im.mu.Unlock() } -func (self *InstanceManager) AddInstanceNotificationChannel(via chan InstanceHostname) { - self.Lock() - defer self.Unlock() - self.newInstanceNotifications = via +func (im *InstanceManager) addInstanceNotificationChannel(via chan InstanceHostname) { + im.lock() + defer im.unlock() + im.newInstanceNotifications = via } -func (self *InstanceManager) Manage() { +func (im *InstanceManager) manage() { log.Info().Msg("InstanceManager starting") go func() { - self.receiveNewInstanceHostnames() + im.receiveNewInstanceHostnames() }() - self.startup = time.Now() - x := self.startup + im.startup = time.Now() + x := im.startup for { log.Info().Msg("InstanceManager tick") - self.managerLoop() + im.managerLoop() time.Sleep(1 * time.Second) - if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) { + if time.Now().After(x.Add(LogReportInterval)) { x = time.Now() - self.logInstanceReport() + im.logInstanceReport() } } } -func (self *InstanceManager) managerLoop() { - self.Lock() +func (im *InstanceManager) managerLoop() { + im.lock() il := make([]*instance, 0) - for _, v := range self.instances { + for _, v := range im.instances { il = append(il, v) } - self.Unlock() + im.unlock() + // FIXME is this a bug outside of the mutex above? for _, v := range il { go func(i *instance) { i.Tick() @@ -103,10 +102,10 @@ func (self *InstanceManager) managerLoop() { } } -func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool { - self.Lock() - defer self.Unlock() - for k, _ := range self.instances { +func (im *InstanceManager) hostnameExists(newhn InstanceHostname) bool { + im.lock() + defer im.unlock() + for k := range im.instances { if newhn == k { return true } @@ -114,15 +113,16 @@ func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool { return false } -func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { - if self.hostnameExists(newhn) { +func (im *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { + if im.hostnameExists(newhn) { + // ignore adding new if we already know about it return } // this blocks on the channel size, limiting concurrency - self.hostAdderSemaphore <- true + im.hostAdderSemaphore <- true - i := NewInstance(func(x *instance) { + i := newInstance(func(x *instance) { x.hostname = string(newhn) }) // we do node detection under the addLock to avoid thundering @@ -130,27 +130,28 @@ func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { i.detectNodeTypeIfNecessary() // pop an item from the buffered channel - <-self.hostAdderSemaphore + <-im.hostAdderSemaphore // lock the map to insert - self.Lock() - self.instances[newhn] = i - self.Unlock() + im.lock() + im.instances[newhn] = i + im.unlock() } -func (self *InstanceManager) receiveNewInstanceHostnames() { +func (im *InstanceManager) receiveNewInstanceHostnames() { var newhn InstanceHostname for { - newhn = <-self.newInstanceNotifications + newhn = <-im.newInstanceNotifications // receive them fast out of the channel, let the adding function lock to add - // them one at a time - go self.addInstanceByHostname(newhn) + // them one at a time, using a bunch of blocked goroutines as our + // modification queue + go im.addInstanceByHostname(newhn) } } -func (self *InstanceManager) logInstanceReport() { - r := self.instanceSummaryReport() +func (im *InstanceManager) logInstanceReport() { + r := im.instanceSummaryReport() sublogger := log.With().Logger() @@ -162,11 +163,11 @@ func (self *InstanceManager) logInstanceReport() { Msg("instance report") } -func (self *InstanceManager) listInstances() []*instance { +func (im *InstanceManager) listInstances() []*instance { var out []*instance - self.Lock() - defer self.Unlock() - for _, v := range self.instances { + im.lock() + defer im.unlock() + for _, v := range im.instances { out = append(out, v) } return out diff --git a/toot.go b/toot.go index a41a464..7a58330 100644 --- a/toot.go +++ b/toot.go @@ -2,10 +2,10 @@ package feta //import "github.com/rs/zerolog/log" -type Toot struct { +type toot struct { } -func NewToot(input []byte) *Toot { - t := new(Toot) +func newToot(input []byte) *toot { + t := new(toot) return t }