From 7a9d7a5e5b4442220f162f42532b323d0b073774 Mon Sep 17 00:00:00 2001 From: sneak Date: Thu, 19 Dec 2019 06:24:26 -0800 Subject: [PATCH] refactored, linted, formatted --- Makefile | 4 +- cmd/feta/main.go | 4 +- config.go | 19 +++ dbmodel.go => db/dbmodel.go | 5 +- ingester/ingester.go | 3 + instance.go => instance/instance.go | 169 +++++++++++++++----------- locator.go => locator/locator.go | 26 ++-- manager.go => manager/manager.go | 60 +++++---- feta.go => process/feta.go | 65 ++++++---- apihandlers.go => process/handlers.go | 38 +++--- apiserver.go => process/server.go | 19 +-- 11 files changed, 247 insertions(+), 165 deletions(-) create mode 100644 config.go rename dbmodel.go => db/dbmodel.go (69%) rename instance.go => instance/instance.go (69%) rename locator.go => locator/locator.go (89%) rename manager.go => manager/manager.go (65%) rename feta.go => process/feta.go (58%) rename apihandlers.go => process/handlers.go (69%) rename apiserver.go => process/server.go (75%) diff --git a/Makefile b/Makefile index 6db7a3b..d778d9b 100644 --- a/Makefile +++ b/Makefile @@ -45,9 +45,9 @@ lint: fmt .lintsetup fgt golint ./... go-get: - go get -v + cd cmd/$(FN) && go get -v -./$(FN): *.go cmd/*/*.go go-get +./$(FN): */*.go cmd/*/*.go go-get cd cmd/$(FN) && go build -o ../../$(FN) $(GOFLAGS) . fmt: diff --git a/cmd/feta/main.go b/cmd/feta/main.go index 1eeeefc..5f40b5f 100644 --- a/cmd/feta/main.go +++ b/cmd/feta/main.go @@ -2,7 +2,7 @@ package main import "os" -import "github.com/sneak/feta" +import "github.com/sneak/feta/process" // these are filled in at link-time by the build scripts @@ -13,5 +13,5 @@ var Version string var Buildarch string func main() { - os.Exit(feta.CLIEntry(Version, Buildarch)) + os.Exit(process.CLIEntry(Version, Buildarch)) } diff --git a/config.go b/config.go new file mode 100644 index 0000000..746b5a7 --- /dev/null +++ b/config.go @@ -0,0 +1,19 @@ +package feta + +import "time" + +// FIXME this should use viper or something + +// Config stores the configuration for the feta process +type Config struct { + LogReportInterval time.Duration + FSStorageLocation string +} + +// GetConfig returns the config +func GetConfig() *Config { + c := new(Config) + c.LogReportInterval = time.Second * 10 + c.FSStorageLocation = "/home/sneak/Library/ApplicationSupport/feta/tootarchive" + return c +} diff --git a/dbmodel.go b/db/dbmodel.go similarity index 69% rename from dbmodel.go rename to db/dbmodel.go index d025a58..2201f43 100644 --- a/dbmodel.go +++ b/db/dbmodel.go @@ -1,5 +1,6 @@ -package feta +package db +import "github.com/sneak/feta/process" import "github.com/jinzhu/gorm" import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm @@ -9,6 +10,6 @@ type savedInstance struct { software string } -func (f *Process) databaseMigrations() { +func (f *process.Feta) databaseMigrations() { f.db.AutoMigrate(&savedInstance{}) } diff --git a/ingester/ingester.go b/ingester/ingester.go index 2800dad..60556d7 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -52,5 +52,8 @@ func (ti *TootIngester) readFromInboundChannel() { func (ti *TootIngester) storeToot(t *toot.Toot) { // FIXME first check for dupes in recentlySeen + if ti.storageBackend == nil { + panic("no storage backend") + } ti.storageBackend.StoreToot(*t) } diff --git a/instance.go b/instance/instance.go similarity index 69% rename from instance.go rename to instance/instance.go index 75dc179..4254eec 100644 --- a/instance.go +++ b/instance/instance.go @@ -1,4 +1,4 @@ -package feta +package instance import "encoding/json" import "fmt" @@ -24,35 +24,41 @@ const instanceErrorInterval = time.Second * 60 * 30 type instanceImplementation int +// Hostname is a special type for holding the hostname of an +// instance (string) +type Hostname string + const ( implUnknown instanceImplementation = iota implMastodon implPleroma ) -type instance struct { +// Instance stores all the information we know about an instance +type Instance struct { structLock sync.Mutex tootDestination chan *toot.Toot - errorCount uint - successCount uint + ErrorCount uint + SuccessCount uint highestID int - hostname string - identified bool + Hostname string + Identified bool fetching bool implementation instanceImplementation - backend *instanceBackend storageBackend *storage.TootStorageBackend - nextFetch time.Time + NextFetch time.Time nodeInfoURL string - serverVersionString string - serverImplementationString string + ServerVersionString string + ServerImplementationString string fetchingLock sync.Mutex fsm *fsm.FSM fsmLock sync.Mutex } -func newInstance(options ...func(i *instance)) *instance { - i := new(instance) +// New returns a new instance, argument is a function that operates on the +// new instance +func New(options ...func(i *Instance)) *Instance { + i := new(Instance) i.setNextFetchAfter(1 * time.Second) i.fsm = fsm.NewFSM( @@ -80,69 +86,82 @@ func newInstance(options ...func(i *instance)) *instance { return i } -func (i *instance) Status() string { +// Status returns the instance's state in the FSM +func (i *Instance) Status() string { i.fsmLock.Lock() defer i.fsmLock.Unlock() return i.fsm.Current() } -func (i *instance) setTootDestination(d chan *toot.Toot) { +// SetTootDestination takes a channel from the manager that all toots +// fetched from this instance should be pushed into. The instance is not +// responsible for deduplication, it should shove all toots on every fetch +// into the channel. +func (i *Instance) SetTootDestination(d chan *toot.Toot) { i.tootDestination = d } -func (i *instance) Event(eventname string) { +// Event is the method that alters the FSM +func (i *Instance) Event(eventname string) { i.fsmLock.Lock() defer i.fsmLock.Unlock() i.fsm.Event(eventname) } -func (i *instance) fsmEnterState(e *fsm.Event) { +func (i *Instance) fsmEnterState(e *fsm.Event) { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("state", e.Dst). Msg("instance changed state") } -func (i *instance) Lock() { +// Lock locks the instance's mutex for reading/writing from the structure +func (i *Instance) Lock() { i.structLock.Lock() } -func (i *instance) Unlock() { +// Unlock unlocks the instance's mutex for reading/writing from the structure +func (i *Instance) Unlock() { i.structLock.Unlock() } -func (i *instance) bumpFetch() { +func (i *Instance) bumpFetch() { i.Lock() defer i.Unlock() - i.nextFetch = time.Now().Add(120 * time.Second) + i.NextFetch = time.Now().Add(120 * time.Second) } -func (i *instance) setNextFetchAfter(d time.Duration) { +func (i *Instance) setNextFetchAfter(d time.Duration) { i.Lock() defer i.Unlock() - i.nextFetch = time.Now().Add(d) + i.NextFetch = time.Now().Add(d) } -func (i *instance) Fetch() { +// Fetch prepares an instance for fetching. Bad name, fix it. +// FIXME(sneak) +func (i *Instance) Fetch() { i.fetchingLock.Lock() defer i.fetchingLock.Unlock() i.setNextFetchAfter(instanceErrorInterval) - err := i.detectNodeTypeIfNecessary() + err := i.DetectNodeTypeIfNecessary() if err != nil { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msg("unable to fetch instance metadata") return } i.setNextFetchAfter(instanceSpiderInterval) - log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", i.hostname) + log.Info(). + Str("hostname", i.Hostname). + Msg("instance now ready for fetch") } -func (i *instance) dueForFetch() bool { +// FIXME rename this function +func (i *Instance) dueForFetch() bool { // this just checks FSM state, the ticker must update it and do time // calcs if i.Status() == "READY_AND_DUE_FETCH" { @@ -151,14 +170,14 @@ func (i *instance) dueForFetch() bool { return false } -func (i *instance) isNowPastFetchTime() bool { - return time.Now().After(i.nextFetch) +func (i *Instance) isNowPastFetchTime() bool { + return time.Now().After(i.NextFetch) } // Tick is responsible for pushing idle instance records between states. // The instances will transition between states when doing stuff (e.g. // investigating, fetching, et c) as well. -func (i *instance) Tick() { +func (i *Instance) Tick() { if i.Status() == "READY_FOR_TOOTFETCH" { if i.isNowPastFetchTime() { i.Event("FETCH_TIME_REACHED") @@ -170,7 +189,7 @@ func (i *instance) Tick() { } } -func (i *instance) nodeIdentified() bool { +func (i *Instance) nodeIdentified() bool { i.Lock() defer i.Unlock() if i.implementation > implUnknown { @@ -179,47 +198,50 @@ func (i *instance) nodeIdentified() bool { return false } -func (i *instance) detectNodeTypeIfNecessary() error { +// DetectNodeTypeIfNecessary does some network requests if the node is as +// yet unidenfitied. No-op otherwise. +func (i *Instance) DetectNodeTypeIfNecessary() error { if !i.nodeIdentified() { return i.fetchNodeInfo() } return nil } -func (i *instance) registerError() { +func (i *Instance) registerError() { i.Lock() defer i.Unlock() - i.errorCount++ + i.ErrorCount++ } -func (i *instance) registerSuccess() { +func (i *Instance) registerSuccess() { i.Lock() defer i.Unlock() - i.successCount++ + i.SuccessCount++ } -func (i *instance) Up() bool { +// Up returns true if the success count is >0 +func (i *Instance) Up() bool { i.Lock() defer i.Unlock() - return i.successCount > 0 + return i.SuccessCount > 0 } -func (i *instance) fetchNodeInfoURL() error { - url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname) +func (i *Instance) fetchNodeInfoURL() error { + url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.Hostname) var c = &http.Client{ Timeout: instanceNodeinfoTimeout, } log.Debug(). Str("url", url). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Msg("fetching nodeinfo reference URL") i.Event("BEGIN_NODEINFO_URL_FETCH") resp, err := c.Get(url) if err != nil { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msg("unable to fetch nodeinfo, node is down?") i.registerError() @@ -232,7 +254,7 @@ func (i *instance) fetchNodeInfoURL() error { if err != nil { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msg("unable to read nodeinfo") i.registerError() @@ -244,7 +266,7 @@ func (i *instance) fetchNodeInfoURL() error { err = json.Unmarshal(body, &nir) if err != nil { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msg("unable to parse nodeinfo, node is weird") i.registerError() @@ -255,7 +277,7 @@ func (i *instance) fetchNodeInfoURL() error { for _, item := range nir.Links { if item.Rel == nodeInfoSchemaVersionTwoName { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("nodeinfourl", item.Href). Msg("success fetching url for nodeinfo") @@ -267,21 +289,21 @@ func (i *instance) fetchNodeInfoURL() error { return nil } log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("item-rel", item.Rel). Str("item-href", item.Href). Msg("nodeinfo entry") } log.Error(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Msg("incomplete nodeinfo") i.registerError() i.Event("WEIRD_NODE_RESPONSE") return errors.New("incomplete nodeinfo") } -func (i *instance) fetchNodeInfo() error { +func (i *Instance) fetchNodeInfo() error { err := i.fetchNodeInfoURL() if err != nil { @@ -303,7 +325,7 @@ func (i *instance) fetchNodeInfo() error { if err != nil { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msgf("unable to fetch nodeinfo data") i.registerError() @@ -316,7 +338,7 @@ func (i *instance) fetchNodeInfo() error { if err != nil { log.Error(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msgf("unable to read nodeinfo data") i.registerError() @@ -328,7 +350,7 @@ func (i *instance) fetchNodeInfo() error { err = json.Unmarshal(body, &ni) if err != nil { log.Error(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msgf("unable to parse nodeinfo") i.registerError() @@ -339,21 +361,21 @@ func (i *instance) fetchNodeInfo() error { log.Debug(). Str("serverVersion", ni.Software.Version). Str("software", ni.Software.Name). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("nodeInfoURL", i.nodeInfoURL). Msg("received nodeinfo from instance") i.Lock() - i.serverVersionString = ni.Software.Version - i.serverImplementationString = ni.Software.Name + i.ServerVersionString = ni.Software.Version + i.ServerImplementationString = ni.Software.Name ni.Software.Name = strings.ToLower(ni.Software.Name) if ni.Software.Name == "pleroma" { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("software", ni.Software.Name). Msg("detected server software") - i.identified = true + i.Identified = true i.implementation = implPleroma i.Unlock() i.registerSuccess() @@ -361,10 +383,10 @@ func (i *instance) fetchNodeInfo() error { return nil } else if ni.Software.Name == "mastodon" { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("software", ni.Software.Name). Msg("detected server software") - i.identified = true + i.Identified = true i.implementation = implMastodon i.Unlock() i.registerSuccess() @@ -372,7 +394,7 @@ func (i *instance) fetchNodeInfo() error { return nil } else { log.Error(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("software", ni.Software.Name). Msg("FIXME unknown server implementation") i.Unlock() @@ -382,13 +404,13 @@ func (i *instance) fetchNodeInfo() error { } } -func (i *instance) fetchRecentToots() error { +func (i *Instance) fetchRecentToots() error { // this would have been about a billion times shorter in python // it turns out pleroma supports the mastodon api so we'll just use that // for everything for now url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", - i.hostname) + i.Hostname) var c = &http.Client{ Timeout: instanceHTTPTimeout, @@ -404,7 +426,7 @@ func (i *instance) fetchRecentToots() error { if err != nil { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msgf("unable to fetch recent toots") i.registerError() @@ -417,7 +439,7 @@ func (i *instance) fetchRecentToots() error { if err != nil { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msgf("unable to read recent toots from response") i.registerError() @@ -425,11 +447,11 @@ func (i *instance) fetchRecentToots() error { return err } - tc, err := toot.NewTootCollectionFromMastodonAPIResponse(body, i.hostname) + tc, err := toot.NewTootCollectionFromMastodonAPIResponse(body, i.Hostname) if err != nil { log.Error(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msgf("unable to parse recent toot list") i.registerError() @@ -440,15 +462,22 @@ func (i *instance) fetchRecentToots() error { } log.Info(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Int("tootCount", len(tc)). Msgf("got and parsed toots") i.registerSuccess() i.Event("TOOTS_FETCHED") i.setNextFetchAfter(instanceSpiderInterval) - // FIXME stream toots to ingester attached to manager instead - //i.storeToots(tc) - panic("lol") + // this should go fast as either the channel is buffered bigly or the + // ingester receives fast and does its own buffering, but run it in its + // own goroutine anyway because why not + go i.sendTootsToIngester(tc) return nil } + +func (i *Instance) sendTootsToIngester(tc []*toot.Toot) { + for _, item := range tc { + i.tootDestination <- item + } +} diff --git a/locator.go b/locator/locator.go similarity index 89% rename from locator.go rename to locator/locator.go index ae88f4b..1efad65 100644 --- a/locator.go +++ b/locator/locator.go @@ -1,4 +1,4 @@ -package feta +package locator import "encoding/json" import "io/ioutil" @@ -9,6 +9,8 @@ import "sync" import "github.com/rs/zerolog/log" import "golang.org/x/sync/semaphore" import "github.com/sneak/feta/jsonapis" +import "github.com/sneak/feta/instance" +import "github.com/sneak/feta" // IndexAPITimeout is the timeout for fetching json instance lists // from the listing servers @@ -25,10 +27,6 @@ var IndexCheckInterval = time.Second * 60 * 60 // (default: 10m) var IndexErrorInterval = time.Second * 60 * 10 -// LogReportInterval defines how long between logging internal -// stats/reporting for user supervision -var LogReportInterval = time.Second * 10 - const mastodonIndexURL = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false" const pleromaIndexURL = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api.cgi" @@ -37,11 +35,12 @@ const pleromaIndexURL = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api type InstanceLocator struct { pleromaIndexNextRefresh *time.Time mastodonIndexNextRefresh *time.Time - reportInstanceVia chan InstanceHostname + reportInstanceVia chan instance.Hostname mu sync.Mutex } -func newInstanceLocator() *InstanceLocator { +// New returns an InstanceLocator for use by the process. +func New() *InstanceLocator { il := new(InstanceLocator) n := time.Now() il.pleromaIndexNextRefresh = &n @@ -57,13 +56,15 @@ func (il *InstanceLocator) unlock() { il.mu.Unlock() } -func (il *InstanceLocator) setInstanceNotificationChannel(via chan InstanceHostname) { +// SetInstanceNotificationChannel is the way the instanceLocator returns +// newly discovered instances back to the manager for query/addition +func (il *InstanceLocator) SetInstanceNotificationChannel(via chan instance.Hostname) { il.lock() defer il.unlock() il.reportInstanceVia = via } -func (il *InstanceLocator) addInstance(hostname InstanceHostname) { +func (il *InstanceLocator) addInstance(hostname instance.Hostname) { // receiver (InstanceManager) is responsible for de-duping against its // map, we just locate and spray, it manages il.reportInstanceVia <- hostname @@ -119,7 +120,8 @@ func (il *InstanceLocator) Locate() { time.Sleep(1 * time.Second) - if time.Now().After(x.Add(LogReportInterval)) { + c := feta.GetConfig() + if time.Now().After(x.Add(c.LogReportInterval)) { x = time.Now() log.Debug(). Str("nextMastodonIndexRefresh", il.durationUntilNextMastodonIndexRefresh().String()). @@ -196,7 +198,7 @@ func (il *InstanceLocator) locateMastodon() { Msg("received hosts from mastodon index") for k := range hosts { - il.addInstance(InstanceHostname(k)) + il.addInstance(instance.Hostname(k)) } } @@ -264,7 +266,7 @@ func (il *InstanceLocator) locatePleroma() { Msg("received hosts from pleroma index") for k := range hosts { - il.addInstance(InstanceHostname(k)) + il.addInstance(instance.Hostname(k)) } } diff --git a/manager.go b/manager/manager.go similarity index 65% rename from manager.go rename to manager/manager.go index 39ba4bb..a8570cc 100644 --- a/manager.go +++ b/manager/manager.go @@ -1,4 +1,4 @@ -package feta +package manager import "sync" import "time" @@ -8,32 +8,36 @@ import "runtime" import "github.com/rs/zerolog/log" import "github.com/sneak/feta/toot" import "github.com/sneak/feta/seeds" +import "github.com/sneak/feta/instance" const hostDiscoveryParallelism = 20 -type instanceBackend interface { - //FIXME -} +// LogReportInterval defines how long between logging internal +// stats/reporting for user supervision +var LogReportInterval = time.Second * 10 // InstanceManager is the main data structure for the goroutine that manages // the list of all known instances, fed by the locator type InstanceManager struct { mu sync.Mutex - instances map[InstanceHostname]*instance - newInstanceNotifications chan InstanceHostname + instances map[instance.Hostname]*instance.Instance + newInstanceNotifications chan instance.Hostname tootDestination chan *toot.Toot startup time.Time hostAdderSemaphore chan bool } -func newInstanceManager() *InstanceManager { +// New returns a new InstanceManager for use by the Process +func New() *InstanceManager { i := new(InstanceManager) i.hostAdderSemaphore = make(chan bool, hostDiscoveryParallelism) - i.instances = make(map[InstanceHostname]*instance) + i.instances = make(map[instance.Hostname]*instance.Instance) return i } -func (im *InstanceManager) setTootDestination(td chan *toot.Toot) { +// SetTootDestination provides the instancemanager with a channel to the +// ingester that it can give to its instances +func (im *InstanceManager) SetTootDestination(td chan *toot.Toot) { im.tootDestination = td } @@ -69,7 +73,11 @@ func (im *InstanceManager) unlock() { im.mu.Unlock() } -func (im *InstanceManager) setInstanceNotificationChannel(via chan InstanceHostname) { +// SetInstanceNotificationChannel is how the Process tells the +// InstanceManager about the channel from the InstanceLocator so that the +// InstanceLocator can provide it/us (the InstanceManager) with new +// instance.Hostnames. We (the manager) deduplicate the list ourselves. +func (im *InstanceManager) SetInstanceNotificationChannel(via chan instance.Hostname) { im.lock() defer im.unlock() im.newInstanceNotifications = via @@ -77,9 +85,9 @@ func (im *InstanceManager) setInstanceNotificationChannel(via chan InstanceHostn func (im *InstanceManager) receiveSeedInstanceHostnames() { for _, x := range seeds.SeedInstances { - go func(tmp InstanceHostname) { + go func(tmp instance.Hostname) { im.addInstanceByHostname(tmp) - }(InstanceHostname(x)) + }(instance.Hostname(x)) } } @@ -111,7 +119,7 @@ func (im *InstanceManager) Manage() { func (im *InstanceManager) managerLoop() { im.lock() - il := make([]*instance, 0) + il := make([]*instance.Instance, 0) for _, v := range im.instances { il = append(il, v) } @@ -119,13 +127,13 @@ func (im *InstanceManager) managerLoop() { // FIXME is this a bug outside of the mutex above? for _, v := range il { - go func(i *instance) { + go func(i *instance.Instance) { i.Tick() }(v) } } -func (im *InstanceManager) hostnameExists(newhn InstanceHostname) bool { +func (im *InstanceManager) hostnameExists(newhn instance.Hostname) bool { im.lock() defer im.unlock() for k := range im.instances { @@ -136,7 +144,7 @@ func (im *InstanceManager) hostnameExists(newhn InstanceHostname) bool { return false } -func (im *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { +func (im *InstanceManager) addInstanceByHostname(newhn instance.Hostname) { if im.hostnameExists(newhn) { // ignore adding new if we already know about it return @@ -145,13 +153,13 @@ func (im *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { // this blocks on the channel size, limiting concurrency im.hostAdderSemaphore <- true - i := newInstance(func(x *instance) { - x.hostname = string(newhn) - x.setTootDestination(im.tootDestination) + i := instance.New(func(x *instance.Instance) { + x.Hostname = string(newhn) // set hostname + x.SetTootDestination(im.tootDestination) // copy ingester input channel from manager to instance }) - // we do node detection under the addLock to avoid thundering + // we do node detection under the adderSemaphore to avoid thundering // on startup - i.detectNodeTypeIfNecessary() + i.DetectNodeTypeIfNecessary() // pop an item from the buffered channel <-im.hostAdderSemaphore @@ -164,7 +172,7 @@ func (im *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { } func (im *InstanceManager) receiveNewInstanceHostnames() { - var newhn InstanceHostname + var newhn instance.Hostname for { newhn = <-im.newInstanceNotifications // receive them fast out of the channel, let the adding function lock to add @@ -187,8 +195,10 @@ func (im *InstanceManager) logInstanceReport() { Msg("instance report") } -func (im *InstanceManager) listInstances() []*instance { - var out []*instance +// ListInstances dumps a slice of all Instances the InstanceManager knows +// about +func (im *InstanceManager) ListInstances() []*instance.Instance { + var out []*instance.Instance im.lock() defer im.unlock() for _, v := range im.instances { @@ -199,7 +209,7 @@ func (im *InstanceManager) listInstances() []*instance { func (im *InstanceManager) instanceSummaryReport() map[string]uint { r := make(map[string]uint) - for _, v := range im.listInstances() { + for _, v := range im.ListInstances() { v.Lock() r[v.Status()]++ v.Unlock() diff --git a/feta.go b/process/feta.go similarity index 58% rename from feta.go rename to process/feta.go index 716297d..5f61ffd 100644 --- a/feta.go +++ b/process/feta.go @@ -1,4 +1,4 @@ -package feta +package process import "os" import "time" @@ -9,41 +9,42 @@ import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm import "github.com/rs/zerolog" import "github.com/rs/zerolog/log" import "github.com/mattn/go-isatty" -import "github.com/sneak/feta/ingester" -// InstanceHostname is a special type for holding the hostname of an -// instance (string) -type InstanceHostname string +import "github.com/sneak/feta/ingester" +import "github.com/sneak/feta/storage" +import "github.com/sneak/feta/locator" +import "github.com/sneak/feta/manager" +import "github.com/sneak/feta/instance" // CLIEntry is the main entrypoint for the feta process from the cli func CLIEntry(version string, buildarch string) int { - f := new(Process) + f := new(Feta) f.version = version f.buildarch = buildarch f.setupLogging() return f.runForever() } -// Process is the main structure/process of this app -type Process struct { +// Feta is the main structure/process of this app +type Feta struct { version string buildarch string - locator *InstanceLocator - manager *InstanceManager + locator *locator.InstanceLocator + manager *manager.InstanceManager ingester *ingester.TootIngester - api *fetaAPIServer + api *Server db *gorm.DB startup time.Time } -func (f *Process) identify() { +func (f *Feta) identify() { log.Info(). Str("version", f.version). Str("buildarch", f.buildarch). Msg("starting") } -func (f *Process) setupLogging() { +func (f *Feta) setupLogging() { log.Logger = log.With().Caller().Logger() @@ -72,11 +73,12 @@ func (f *Process) setupLogging() { f.identify() } -func (f *Process) uptime() time.Duration { +func (f *Feta) uptime() time.Duration { return time.Since(f.startup) } -func (f *Process) setupDatabase() { +/* +func (f *Feta) setupDatabase() { var err error f.db, err = gorm.Open("sqlite3", "feta.sqlite") @@ -84,27 +86,38 @@ func (f *Process) setupDatabase() { panic(err) } - f.databaseMigrations() + //f.databaseMigrations() } +*/ -func (f *Process) runForever() int { +func (f *Feta) runForever() int { f.startup = time.Now() - f.setupDatabase() + //f.setupDatabase() - newInstanceHostnameNotifications := make(chan InstanceHostname) + // FIXME move this channel creation into the manager's constructor + // and add getters/setters on the manager/locator + newInstanceHostnameNotifications := make(chan instance.Hostname) - f.locator = newInstanceLocator() - f.manager = newInstanceManager() + f.locator = locator.New() + f.manager = manager.New() f.ingester = ingester.NewTootIngester() - f.api = new(fetaAPIServer) - f.api.setFeta(f) // api needs to get to us to access data + home := os.Getenv("HOME") + if home == "" { + panic("can't find home directory") + } - f.locator.setInstanceNotificationChannel(newInstanceHostnameNotifications) - f.manager.setInstanceNotificationChannel(newInstanceHostnameNotifications) + diskBackend := storage.NewTootFSStorage(home + "/.local/feta") + f.ingester.SetStorageBackend(diskBackend) - f.manager.setTootDestination(f.ingester.GetDeliveryChannel()) + f.api = new(Server) + f.api.SetFeta(f) // api needs to get to us to access data + + f.locator.SetInstanceNotificationChannel(newInstanceHostnameNotifications) + f.manager.SetInstanceNotificationChannel(newInstanceHostnameNotifications) + + f.manager.SetTootDestination(f.ingester.GetDeliveryChannel()) // ingester goroutine: go f.ingester.Ingest() diff --git a/apihandlers.go b/process/handlers.go similarity index 69% rename from apihandlers.go rename to process/handlers.go index 554ac50..a1ac43d 100644 --- a/apihandlers.go +++ b/process/handlers.go @@ -1,4 +1,4 @@ -package feta +package process import "time" import "net/http" @@ -11,25 +11,25 @@ import "github.com/gin-gonic/gin" type hash map[string]interface{} -func (a *fetaAPIServer) instances() []hash { +func (a *Server) instances() []hash { resp := make([]hash, 0) now := time.Now() - for _, v := range a.feta.manager.listInstances() { + for _, v := range a.feta.manager.ListInstances() { i := make(hash) // FIXME figure out why a very short lock here deadlocks v.Lock() - i["hostname"] = v.hostname - i["nextCheck"] = v.nextFetch.UTC().Format(time.RFC3339) - i["nextCheckAfter"] = (-1 * now.Sub(v.nextFetch)).String() - i["successCount"] = v.successCount - i["errorCount"] = v.errorCount - i["identified"] = v.identified + i["hostname"] = v.Hostname + i["nextCheck"] = v.NextFetch.UTC().Format(time.RFC3339) + i["nextCheckAfter"] = (-1 * now.Sub(v.NextFetch)).String() + i["successCount"] = v.SuccessCount + i["errorCount"] = v.ErrorCount + i["identified"] = v.Identified i["status"] = v.Status() i["software"] = "unknown" i["version"] = "unknown" - if v.identified { - i["software"] = v.serverImplementationString - i["version"] = v.serverVersionString + if v.Identified { + i["software"] = v.ServerImplementationString + i["version"] = v.ServerVersionString } v.Unlock() resp = append(resp, i) @@ -37,21 +37,21 @@ func (a *fetaAPIServer) instances() []hash { return resp } -func (a *fetaAPIServer) instanceSummary() map[string]int { +func (a *Server) instanceSummary() map[string]int { resp := make(map[string]int) - for _, v := range a.feta.manager.listInstances() { + for _, v := range a.feta.manager.ListInstances() { v.Lock() resp[fmt.Sprintf("STATUS_%s", v.Status())]++ - if v.serverImplementationString != "" { + if v.ServerImplementationString != "" { //FIXME(sneak) sanitize this to a-z0-9, it is server-provided - resp[fmt.Sprintf("SOFTWARE_%s", strings.ToUpper(v.serverImplementationString))]++ + resp[fmt.Sprintf("SOFTWARE_%s", strings.ToUpper(v.ServerImplementationString))]++ } v.Unlock() } return resp } -func (a *fetaAPIServer) getInstanceListHandler() http.HandlerFunc { +func (a *Server) getInstanceListHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { result := &gin.H{ @@ -69,7 +69,7 @@ func (a *fetaAPIServer) getInstanceListHandler() http.HandlerFunc { } } -func (a *fetaAPIServer) getIndexHandler() http.HandlerFunc { +func (a *Server) getIndexHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { index := &gin.H{ "server": &gin.H{ @@ -94,7 +94,7 @@ func (a *fetaAPIServer) getIndexHandler() http.HandlerFunc { } } -func (a *fetaAPIServer) getHealthCheckHandler() http.HandlerFunc { +func (a *Server) getHealthCheckHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { resp := &gin.H{ "status": "ok", diff --git a/apiserver.go b/process/server.go similarity index 75% rename from apiserver.go rename to process/server.go index 5053ebd..ddff565 100644 --- a/apiserver.go +++ b/process/server.go @@ -1,4 +1,4 @@ -package feta +package process import "fmt" import "net/http" @@ -10,19 +10,24 @@ import "github.com/rs/zerolog/log" import "github.com/gin-gonic/gin" import "github.com/dn365/gin-zerolog" -type fetaAPIServer struct { - feta *Process +// Server is the HTTP webserver object +type Server struct { + feta *Feta port uint router *gin.Engine server *http.Server debug bool } -func (a *fetaAPIServer) setFeta(feta *Process) { +// SetFeta tells the http Server where to find the Process object so that it +// can pull stats and other information for serving via http +func (a *Server) SetFeta(feta *Feta) { a.feta = feta } -func (a *fetaAPIServer) Serve() { +// Serve is the entrypoint for the Server, which should run in its own +// goroutine (started by the Process) +func (a *Server) Serve() { if a.feta == nil { panic("must have feta app from which to serve stats") } @@ -50,7 +55,7 @@ func (a *fetaAPIServer) Serve() { } } -func (a *fetaAPIServer) initRouter() { +func (a *Server) initRouter() { // empty router r := gin.New() @@ -69,7 +74,7 @@ func (a *fetaAPIServer) initRouter() { a.router = r } -func (a *fetaAPIServer) initServer() { +func (a *Server) initServer() { if !a.debug { gin.SetMode(gin.ReleaseMode) }