refactored, linted, formatted

This commit is contained in:
Jeffrey Paul 2019-12-19 06:24:26 -08:00
parent 5144a957e5
commit 7a9d7a5e5b
11 changed files with 247 additions and 165 deletions

View File

@ -45,9 +45,9 @@ lint: fmt .lintsetup
fgt golint ./... fgt golint ./...
go-get: go-get:
go get -v cd cmd/$(FN) && go get -v
./$(FN): *.go cmd/*/*.go go-get ./$(FN): */*.go cmd/*/*.go go-get
cd cmd/$(FN) && go build -o ../../$(FN) $(GOFLAGS) . cd cmd/$(FN) && go build -o ../../$(FN) $(GOFLAGS) .
fmt: fmt:

View File

@ -2,7 +2,7 @@ package main
import "os" import "os"
import "github.com/sneak/feta" import "github.com/sneak/feta/process"
// these are filled in at link-time by the build scripts // these are filled in at link-time by the build scripts
@ -13,5 +13,5 @@ var Version string
var Buildarch string var Buildarch string
func main() { func main() {
os.Exit(feta.CLIEntry(Version, Buildarch)) os.Exit(process.CLIEntry(Version, Buildarch))
} }

19
config.go Normal file
View File

@ -0,0 +1,19 @@
package feta
import "time"
// FIXME this should use viper or something
// Config stores the configuration for the feta process
type Config struct {
LogReportInterval time.Duration
FSStorageLocation string
}
// GetConfig returns the config
func GetConfig() *Config {
c := new(Config)
c.LogReportInterval = time.Second * 10
c.FSStorageLocation = "/home/sneak/Library/ApplicationSupport/feta/tootarchive"
return c
}

View File

@ -1,5 +1,6 @@
package feta package db
import "github.com/sneak/feta/process"
import "github.com/jinzhu/gorm" import "github.com/jinzhu/gorm"
import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm
@ -9,6 +10,6 @@ type savedInstance struct {
software string software string
} }
func (f *Process) databaseMigrations() { func (f *process.Feta) databaseMigrations() {
f.db.AutoMigrate(&savedInstance{}) f.db.AutoMigrate(&savedInstance{})
} }

View File

@ -52,5 +52,8 @@ func (ti *TootIngester) readFromInboundChannel() {
func (ti *TootIngester) storeToot(t *toot.Toot) { func (ti *TootIngester) storeToot(t *toot.Toot) {
// FIXME first check for dupes in recentlySeen // FIXME first check for dupes in recentlySeen
if ti.storageBackend == nil {
panic("no storage backend")
}
ti.storageBackend.StoreToot(*t) ti.storageBackend.StoreToot(*t)
} }

View File

@ -1,4 +1,4 @@
package feta package instance
import "encoding/json" import "encoding/json"
import "fmt" import "fmt"
@ -24,35 +24,41 @@ const instanceErrorInterval = time.Second * 60 * 30
type instanceImplementation int type instanceImplementation int
// Hostname is a special type for holding the hostname of an
// instance (string)
type Hostname string
const ( const (
implUnknown instanceImplementation = iota implUnknown instanceImplementation = iota
implMastodon implMastodon
implPleroma implPleroma
) )
type instance struct { // Instance stores all the information we know about an instance
type Instance struct {
structLock sync.Mutex structLock sync.Mutex
tootDestination chan *toot.Toot tootDestination chan *toot.Toot
errorCount uint ErrorCount uint
successCount uint SuccessCount uint
highestID int highestID int
hostname string Hostname string
identified bool Identified bool
fetching bool fetching bool
implementation instanceImplementation implementation instanceImplementation
backend *instanceBackend
storageBackend *storage.TootStorageBackend storageBackend *storage.TootStorageBackend
nextFetch time.Time NextFetch time.Time
nodeInfoURL string nodeInfoURL string
serverVersionString string ServerVersionString string
serverImplementationString string ServerImplementationString string
fetchingLock sync.Mutex fetchingLock sync.Mutex
fsm *fsm.FSM fsm *fsm.FSM
fsmLock sync.Mutex fsmLock sync.Mutex
} }
func newInstance(options ...func(i *instance)) *instance { // New returns a new instance, argument is a function that operates on the
i := new(instance) // new instance
func New(options ...func(i *Instance)) *Instance {
i := new(Instance)
i.setNextFetchAfter(1 * time.Second) i.setNextFetchAfter(1 * time.Second)
i.fsm = fsm.NewFSM( i.fsm = fsm.NewFSM(
@ -80,69 +86,82 @@ func newInstance(options ...func(i *instance)) *instance {
return i return i
} }
func (i *instance) Status() string { // Status returns the instance's state in the FSM
func (i *Instance) Status() string {
i.fsmLock.Lock() i.fsmLock.Lock()
defer i.fsmLock.Unlock() defer i.fsmLock.Unlock()
return i.fsm.Current() return i.fsm.Current()
} }
func (i *instance) setTootDestination(d chan *toot.Toot) { // SetTootDestination takes a channel from the manager that all toots
// fetched from this instance should be pushed into. The instance is not
// responsible for deduplication, it should shove all toots on every fetch
// into the channel.
func (i *Instance) SetTootDestination(d chan *toot.Toot) {
i.tootDestination = d i.tootDestination = d
} }
func (i *instance) Event(eventname string) { // Event is the method that alters the FSM
func (i *Instance) Event(eventname string) {
i.fsmLock.Lock() i.fsmLock.Lock()
defer i.fsmLock.Unlock() defer i.fsmLock.Unlock()
i.fsm.Event(eventname) i.fsm.Event(eventname)
} }
func (i *instance) fsmEnterState(e *fsm.Event) { func (i *Instance) fsmEnterState(e *fsm.Event) {
log.Debug(). log.Debug().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Str("state", e.Dst). Str("state", e.Dst).
Msg("instance changed state") Msg("instance changed state")
} }
func (i *instance) Lock() { // Lock locks the instance's mutex for reading/writing from the structure
func (i *Instance) Lock() {
i.structLock.Lock() i.structLock.Lock()
} }
func (i *instance) Unlock() { // Unlock unlocks the instance's mutex for reading/writing from the structure
func (i *Instance) Unlock() {
i.structLock.Unlock() i.structLock.Unlock()
} }
func (i *instance) bumpFetch() { func (i *Instance) bumpFetch() {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
i.nextFetch = time.Now().Add(120 * time.Second) i.NextFetch = time.Now().Add(120 * time.Second)
} }
func (i *instance) setNextFetchAfter(d time.Duration) { func (i *Instance) setNextFetchAfter(d time.Duration) {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
i.nextFetch = time.Now().Add(d) i.NextFetch = time.Now().Add(d)
} }
func (i *instance) Fetch() { // Fetch prepares an instance for fetching. Bad name, fix it.
// FIXME(sneak)
func (i *Instance) Fetch() {
i.fetchingLock.Lock() i.fetchingLock.Lock()
defer i.fetchingLock.Unlock() defer i.fetchingLock.Unlock()
i.setNextFetchAfter(instanceErrorInterval) i.setNextFetchAfter(instanceErrorInterval)
err := i.detectNodeTypeIfNecessary() err := i.DetectNodeTypeIfNecessary()
if err != nil { if err != nil {
log.Debug(). log.Debug().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Err(err). Err(err).
Msg("unable to fetch instance metadata") Msg("unable to fetch instance metadata")
return return
} }
i.setNextFetchAfter(instanceSpiderInterval) i.setNextFetchAfter(instanceSpiderInterval)
log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", i.hostname) log.Info().
Str("hostname", i.Hostname).
Msg("instance now ready for fetch")
} }
func (i *instance) dueForFetch() bool { // FIXME rename this function
func (i *Instance) dueForFetch() bool {
// this just checks FSM state, the ticker must update it and do time // this just checks FSM state, the ticker must update it and do time
// calcs // calcs
if i.Status() == "READY_AND_DUE_FETCH" { if i.Status() == "READY_AND_DUE_FETCH" {
@ -151,14 +170,14 @@ func (i *instance) dueForFetch() bool {
return false return false
} }
func (i *instance) isNowPastFetchTime() bool { func (i *Instance) isNowPastFetchTime() bool {
return time.Now().After(i.nextFetch) return time.Now().After(i.NextFetch)
} }
// Tick is responsible for pushing idle instance records between states. // Tick is responsible for pushing idle instance records between states.
// The instances will transition between states when doing stuff (e.g. // The instances will transition between states when doing stuff (e.g.
// investigating, fetching, et c) as well. // investigating, fetching, et c) as well.
func (i *instance) Tick() { func (i *Instance) Tick() {
if i.Status() == "READY_FOR_TOOTFETCH" { if i.Status() == "READY_FOR_TOOTFETCH" {
if i.isNowPastFetchTime() { if i.isNowPastFetchTime() {
i.Event("FETCH_TIME_REACHED") i.Event("FETCH_TIME_REACHED")
@ -170,7 +189,7 @@ func (i *instance) Tick() {
} }
} }
func (i *instance) nodeIdentified() bool { func (i *Instance) nodeIdentified() bool {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
if i.implementation > implUnknown { if i.implementation > implUnknown {
@ -179,47 +198,50 @@ func (i *instance) nodeIdentified() bool {
return false return false
} }
func (i *instance) detectNodeTypeIfNecessary() error { // DetectNodeTypeIfNecessary does some network requests if the node is as
// yet unidenfitied. No-op otherwise.
func (i *Instance) DetectNodeTypeIfNecessary() error {
if !i.nodeIdentified() { if !i.nodeIdentified() {
return i.fetchNodeInfo() return i.fetchNodeInfo()
} }
return nil return nil
} }
func (i *instance) registerError() { func (i *Instance) registerError() {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
i.errorCount++ i.ErrorCount++
} }
func (i *instance) registerSuccess() { func (i *Instance) registerSuccess() {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
i.successCount++ i.SuccessCount++
} }
func (i *instance) Up() bool { // Up returns true if the success count is >0
func (i *Instance) Up() bool {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
return i.successCount > 0 return i.SuccessCount > 0
} }
func (i *instance) fetchNodeInfoURL() error { func (i *Instance) fetchNodeInfoURL() error {
url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname) url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.Hostname)
var c = &http.Client{ var c = &http.Client{
Timeout: instanceNodeinfoTimeout, Timeout: instanceNodeinfoTimeout,
} }
log.Debug(). log.Debug().
Str("url", url). Str("url", url).
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Msg("fetching nodeinfo reference URL") Msg("fetching nodeinfo reference URL")
i.Event("BEGIN_NODEINFO_URL_FETCH") i.Event("BEGIN_NODEINFO_URL_FETCH")
resp, err := c.Get(url) resp, err := c.Get(url)
if err != nil { if err != nil {
log.Debug(). log.Debug().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Err(err). Err(err).
Msg("unable to fetch nodeinfo, node is down?") Msg("unable to fetch nodeinfo, node is down?")
i.registerError() i.registerError()
@ -232,7 +254,7 @@ func (i *instance) fetchNodeInfoURL() error {
if err != nil { if err != nil {
log.Debug(). log.Debug().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Err(err). Err(err).
Msg("unable to read nodeinfo") Msg("unable to read nodeinfo")
i.registerError() i.registerError()
@ -244,7 +266,7 @@ func (i *instance) fetchNodeInfoURL() error {
err = json.Unmarshal(body, &nir) err = json.Unmarshal(body, &nir)
if err != nil { if err != nil {
log.Debug(). log.Debug().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Err(err). Err(err).
Msg("unable to parse nodeinfo, node is weird") Msg("unable to parse nodeinfo, node is weird")
i.registerError() i.registerError()
@ -255,7 +277,7 @@ func (i *instance) fetchNodeInfoURL() error {
for _, item := range nir.Links { for _, item := range nir.Links {
if item.Rel == nodeInfoSchemaVersionTwoName { if item.Rel == nodeInfoSchemaVersionTwoName {
log.Debug(). log.Debug().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Str("nodeinfourl", item.Href). Str("nodeinfourl", item.Href).
Msg("success fetching url for nodeinfo") Msg("success fetching url for nodeinfo")
@ -267,21 +289,21 @@ func (i *instance) fetchNodeInfoURL() error {
return nil return nil
} }
log.Debug(). log.Debug().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Str("item-rel", item.Rel). Str("item-rel", item.Rel).
Str("item-href", item.Href). Str("item-href", item.Href).
Msg("nodeinfo entry") Msg("nodeinfo entry")
} }
log.Error(). log.Error().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Msg("incomplete nodeinfo") Msg("incomplete nodeinfo")
i.registerError() i.registerError()
i.Event("WEIRD_NODE_RESPONSE") i.Event("WEIRD_NODE_RESPONSE")
return errors.New("incomplete nodeinfo") return errors.New("incomplete nodeinfo")
} }
func (i *instance) fetchNodeInfo() error { func (i *Instance) fetchNodeInfo() error {
err := i.fetchNodeInfoURL() err := i.fetchNodeInfoURL()
if err != nil { if err != nil {
@ -303,7 +325,7 @@ func (i *instance) fetchNodeInfo() error {
if err != nil { if err != nil {
log.Debug(). log.Debug().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Err(err). Err(err).
Msgf("unable to fetch nodeinfo data") Msgf("unable to fetch nodeinfo data")
i.registerError() i.registerError()
@ -316,7 +338,7 @@ func (i *instance) fetchNodeInfo() error {
if err != nil { if err != nil {
log.Error(). log.Error().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Err(err). Err(err).
Msgf("unable to read nodeinfo data") Msgf("unable to read nodeinfo data")
i.registerError() i.registerError()
@ -328,7 +350,7 @@ func (i *instance) fetchNodeInfo() error {
err = json.Unmarshal(body, &ni) err = json.Unmarshal(body, &ni)
if err != nil { if err != nil {
log.Error(). log.Error().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Err(err). Err(err).
Msgf("unable to parse nodeinfo") Msgf("unable to parse nodeinfo")
i.registerError() i.registerError()
@ -339,21 +361,21 @@ func (i *instance) fetchNodeInfo() error {
log.Debug(). log.Debug().
Str("serverVersion", ni.Software.Version). Str("serverVersion", ni.Software.Version).
Str("software", ni.Software.Name). Str("software", ni.Software.Name).
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Str("nodeInfoURL", i.nodeInfoURL). Str("nodeInfoURL", i.nodeInfoURL).
Msg("received nodeinfo from instance") Msg("received nodeinfo from instance")
i.Lock() i.Lock()
i.serverVersionString = ni.Software.Version i.ServerVersionString = ni.Software.Version
i.serverImplementationString = ni.Software.Name i.ServerImplementationString = ni.Software.Name
ni.Software.Name = strings.ToLower(ni.Software.Name) ni.Software.Name = strings.ToLower(ni.Software.Name)
if ni.Software.Name == "pleroma" { if ni.Software.Name == "pleroma" {
log.Debug(). log.Debug().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Str("software", ni.Software.Name). Str("software", ni.Software.Name).
Msg("detected server software") Msg("detected server software")
i.identified = true i.Identified = true
i.implementation = implPleroma i.implementation = implPleroma
i.Unlock() i.Unlock()
i.registerSuccess() i.registerSuccess()
@ -361,10 +383,10 @@ func (i *instance) fetchNodeInfo() error {
return nil return nil
} else if ni.Software.Name == "mastodon" { } else if ni.Software.Name == "mastodon" {
log.Debug(). log.Debug().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Str("software", ni.Software.Name). Str("software", ni.Software.Name).
Msg("detected server software") Msg("detected server software")
i.identified = true i.Identified = true
i.implementation = implMastodon i.implementation = implMastodon
i.Unlock() i.Unlock()
i.registerSuccess() i.registerSuccess()
@ -372,7 +394,7 @@ func (i *instance) fetchNodeInfo() error {
return nil return nil
} else { } else {
log.Error(). log.Error().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Str("software", ni.Software.Name). Str("software", ni.Software.Name).
Msg("FIXME unknown server implementation") Msg("FIXME unknown server implementation")
i.Unlock() i.Unlock()
@ -382,13 +404,13 @@ func (i *instance) fetchNodeInfo() error {
} }
} }
func (i *instance) fetchRecentToots() error { func (i *Instance) fetchRecentToots() error {
// this would have been about a billion times shorter in python // this would have been about a billion times shorter in python
// it turns out pleroma supports the mastodon api so we'll just use that // it turns out pleroma supports the mastodon api so we'll just use that
// for everything for now // for everything for now
url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true",
i.hostname) i.Hostname)
var c = &http.Client{ var c = &http.Client{
Timeout: instanceHTTPTimeout, Timeout: instanceHTTPTimeout,
@ -404,7 +426,7 @@ func (i *instance) fetchRecentToots() error {
if err != nil { if err != nil {
log.Debug(). log.Debug().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Err(err). Err(err).
Msgf("unable to fetch recent toots") Msgf("unable to fetch recent toots")
i.registerError() i.registerError()
@ -417,7 +439,7 @@ func (i *instance) fetchRecentToots() error {
if err != nil { if err != nil {
log.Debug(). log.Debug().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Err(err). Err(err).
Msgf("unable to read recent toots from response") Msgf("unable to read recent toots from response")
i.registerError() i.registerError()
@ -425,11 +447,11 @@ func (i *instance) fetchRecentToots() error {
return err return err
} }
tc, err := toot.NewTootCollectionFromMastodonAPIResponse(body, i.hostname) tc, err := toot.NewTootCollectionFromMastodonAPIResponse(body, i.Hostname)
if err != nil { if err != nil {
log.Error(). log.Error().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Err(err). Err(err).
Msgf("unable to parse recent toot list") Msgf("unable to parse recent toot list")
i.registerError() i.registerError()
@ -440,15 +462,22 @@ func (i *instance) fetchRecentToots() error {
} }
log.Info(). log.Info().
Str("hostname", i.hostname). Str("hostname", i.Hostname).
Int("tootCount", len(tc)). Int("tootCount", len(tc)).
Msgf("got and parsed toots") Msgf("got and parsed toots")
i.registerSuccess() i.registerSuccess()
i.Event("TOOTS_FETCHED") i.Event("TOOTS_FETCHED")
i.setNextFetchAfter(instanceSpiderInterval) i.setNextFetchAfter(instanceSpiderInterval)
// FIXME stream toots to ingester attached to manager instead // this should go fast as either the channel is buffered bigly or the
//i.storeToots(tc) // ingester receives fast and does its own buffering, but run it in its
panic("lol") // own goroutine anyway because why not
go i.sendTootsToIngester(tc)
return nil return nil
} }
func (i *Instance) sendTootsToIngester(tc []*toot.Toot) {
for _, item := range tc {
i.tootDestination <- item
}
}

View File

@ -1,4 +1,4 @@
package feta package locator
import "encoding/json" import "encoding/json"
import "io/ioutil" import "io/ioutil"
@ -9,6 +9,8 @@ import "sync"
import "github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
import "golang.org/x/sync/semaphore" import "golang.org/x/sync/semaphore"
import "github.com/sneak/feta/jsonapis" import "github.com/sneak/feta/jsonapis"
import "github.com/sneak/feta/instance"
import "github.com/sneak/feta"
// IndexAPITimeout is the timeout for fetching json instance lists // IndexAPITimeout is the timeout for fetching json instance lists
// from the listing servers // from the listing servers
@ -25,10 +27,6 @@ var IndexCheckInterval = time.Second * 60 * 60
// (default: 10m) // (default: 10m)
var IndexErrorInterval = time.Second * 60 * 10 var IndexErrorInterval = time.Second * 60 * 10
// LogReportInterval defines how long between logging internal
// stats/reporting for user supervision
var LogReportInterval = time.Second * 10
const mastodonIndexURL = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false" const mastodonIndexURL = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false"
const pleromaIndexURL = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api.cgi" const pleromaIndexURL = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api.cgi"
@ -37,11 +35,12 @@ const pleromaIndexURL = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api
type InstanceLocator struct { type InstanceLocator struct {
pleromaIndexNextRefresh *time.Time pleromaIndexNextRefresh *time.Time
mastodonIndexNextRefresh *time.Time mastodonIndexNextRefresh *time.Time
reportInstanceVia chan InstanceHostname reportInstanceVia chan instance.Hostname
mu sync.Mutex mu sync.Mutex
} }
func newInstanceLocator() *InstanceLocator { // New returns an InstanceLocator for use by the process.
func New() *InstanceLocator {
il := new(InstanceLocator) il := new(InstanceLocator)
n := time.Now() n := time.Now()
il.pleromaIndexNextRefresh = &n il.pleromaIndexNextRefresh = &n
@ -57,13 +56,15 @@ func (il *InstanceLocator) unlock() {
il.mu.Unlock() il.mu.Unlock()
} }
func (il *InstanceLocator) setInstanceNotificationChannel(via chan InstanceHostname) { // SetInstanceNotificationChannel is the way the instanceLocator returns
// newly discovered instances back to the manager for query/addition
func (il *InstanceLocator) SetInstanceNotificationChannel(via chan instance.Hostname) {
il.lock() il.lock()
defer il.unlock() defer il.unlock()
il.reportInstanceVia = via il.reportInstanceVia = via
} }
func (il *InstanceLocator) addInstance(hostname InstanceHostname) { func (il *InstanceLocator) addInstance(hostname instance.Hostname) {
// receiver (InstanceManager) is responsible for de-duping against its // receiver (InstanceManager) is responsible for de-duping against its
// map, we just locate and spray, it manages // map, we just locate and spray, it manages
il.reportInstanceVia <- hostname il.reportInstanceVia <- hostname
@ -119,7 +120,8 @@ func (il *InstanceLocator) Locate() {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
if time.Now().After(x.Add(LogReportInterval)) { c := feta.GetConfig()
if time.Now().After(x.Add(c.LogReportInterval)) {
x = time.Now() x = time.Now()
log.Debug(). log.Debug().
Str("nextMastodonIndexRefresh", il.durationUntilNextMastodonIndexRefresh().String()). Str("nextMastodonIndexRefresh", il.durationUntilNextMastodonIndexRefresh().String()).
@ -196,7 +198,7 @@ func (il *InstanceLocator) locateMastodon() {
Msg("received hosts from mastodon index") Msg("received hosts from mastodon index")
for k := range hosts { for k := range hosts {
il.addInstance(InstanceHostname(k)) il.addInstance(instance.Hostname(k))
} }
} }
@ -264,7 +266,7 @@ func (il *InstanceLocator) locatePleroma() {
Msg("received hosts from pleroma index") Msg("received hosts from pleroma index")
for k := range hosts { for k := range hosts {
il.addInstance(InstanceHostname(k)) il.addInstance(instance.Hostname(k))
} }
} }

View File

@ -1,4 +1,4 @@
package feta package manager
import "sync" import "sync"
import "time" import "time"
@ -8,32 +8,36 @@ import "runtime"
import "github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
import "github.com/sneak/feta/toot" import "github.com/sneak/feta/toot"
import "github.com/sneak/feta/seeds" import "github.com/sneak/feta/seeds"
import "github.com/sneak/feta/instance"
const hostDiscoveryParallelism = 20 const hostDiscoveryParallelism = 20
type instanceBackend interface { // LogReportInterval defines how long between logging internal
//FIXME // stats/reporting for user supervision
} var LogReportInterval = time.Second * 10
// InstanceManager is the main data structure for the goroutine that manages // InstanceManager is the main data structure for the goroutine that manages
// the list of all known instances, fed by the locator // the list of all known instances, fed by the locator
type InstanceManager struct { type InstanceManager struct {
mu sync.Mutex mu sync.Mutex
instances map[InstanceHostname]*instance instances map[instance.Hostname]*instance.Instance
newInstanceNotifications chan InstanceHostname newInstanceNotifications chan instance.Hostname
tootDestination chan *toot.Toot tootDestination chan *toot.Toot
startup time.Time startup time.Time
hostAdderSemaphore chan bool hostAdderSemaphore chan bool
} }
func newInstanceManager() *InstanceManager { // New returns a new InstanceManager for use by the Process
func New() *InstanceManager {
i := new(InstanceManager) i := new(InstanceManager)
i.hostAdderSemaphore = make(chan bool, hostDiscoveryParallelism) i.hostAdderSemaphore = make(chan bool, hostDiscoveryParallelism)
i.instances = make(map[InstanceHostname]*instance) i.instances = make(map[instance.Hostname]*instance.Instance)
return i return i
} }
func (im *InstanceManager) setTootDestination(td chan *toot.Toot) { // SetTootDestination provides the instancemanager with a channel to the
// ingester that it can give to its instances
func (im *InstanceManager) SetTootDestination(td chan *toot.Toot) {
im.tootDestination = td im.tootDestination = td
} }
@ -69,7 +73,11 @@ func (im *InstanceManager) unlock() {
im.mu.Unlock() im.mu.Unlock()
} }
func (im *InstanceManager) setInstanceNotificationChannel(via chan InstanceHostname) { // SetInstanceNotificationChannel is how the Process tells the
// InstanceManager about the channel from the InstanceLocator so that the
// InstanceLocator can provide it/us (the InstanceManager) with new
// instance.Hostnames. We (the manager) deduplicate the list ourselves.
func (im *InstanceManager) SetInstanceNotificationChannel(via chan instance.Hostname) {
im.lock() im.lock()
defer im.unlock() defer im.unlock()
im.newInstanceNotifications = via im.newInstanceNotifications = via
@ -77,9 +85,9 @@ func (im *InstanceManager) setInstanceNotificationChannel(via chan InstanceHostn
func (im *InstanceManager) receiveSeedInstanceHostnames() { func (im *InstanceManager) receiveSeedInstanceHostnames() {
for _, x := range seeds.SeedInstances { for _, x := range seeds.SeedInstances {
go func(tmp InstanceHostname) { go func(tmp instance.Hostname) {
im.addInstanceByHostname(tmp) im.addInstanceByHostname(tmp)
}(InstanceHostname(x)) }(instance.Hostname(x))
} }
} }
@ -111,7 +119,7 @@ func (im *InstanceManager) Manage() {
func (im *InstanceManager) managerLoop() { func (im *InstanceManager) managerLoop() {
im.lock() im.lock()
il := make([]*instance, 0) il := make([]*instance.Instance, 0)
for _, v := range im.instances { for _, v := range im.instances {
il = append(il, v) il = append(il, v)
} }
@ -119,13 +127,13 @@ func (im *InstanceManager) managerLoop() {
// FIXME is this a bug outside of the mutex above? // FIXME is this a bug outside of the mutex above?
for _, v := range il { for _, v := range il {
go func(i *instance) { go func(i *instance.Instance) {
i.Tick() i.Tick()
}(v) }(v)
} }
} }
func (im *InstanceManager) hostnameExists(newhn InstanceHostname) bool { func (im *InstanceManager) hostnameExists(newhn instance.Hostname) bool {
im.lock() im.lock()
defer im.unlock() defer im.unlock()
for k := range im.instances { for k := range im.instances {
@ -136,7 +144,7 @@ func (im *InstanceManager) hostnameExists(newhn InstanceHostname) bool {
return false return false
} }
func (im *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { func (im *InstanceManager) addInstanceByHostname(newhn instance.Hostname) {
if im.hostnameExists(newhn) { if im.hostnameExists(newhn) {
// ignore adding new if we already know about it // ignore adding new if we already know about it
return return
@ -145,13 +153,13 @@ func (im *InstanceManager) addInstanceByHostname(newhn InstanceHostname) {
// this blocks on the channel size, limiting concurrency // this blocks on the channel size, limiting concurrency
im.hostAdderSemaphore <- true im.hostAdderSemaphore <- true
i := newInstance(func(x *instance) { i := instance.New(func(x *instance.Instance) {
x.hostname = string(newhn) x.Hostname = string(newhn) // set hostname
x.setTootDestination(im.tootDestination) x.SetTootDestination(im.tootDestination) // copy ingester input channel from manager to instance
}) })
// we do node detection under the addLock to avoid thundering // we do node detection under the adderSemaphore to avoid thundering
// on startup // on startup
i.detectNodeTypeIfNecessary() i.DetectNodeTypeIfNecessary()
// pop an item from the buffered channel // pop an item from the buffered channel
<-im.hostAdderSemaphore <-im.hostAdderSemaphore
@ -164,7 +172,7 @@ func (im *InstanceManager) addInstanceByHostname(newhn InstanceHostname) {
} }
func (im *InstanceManager) receiveNewInstanceHostnames() { func (im *InstanceManager) receiveNewInstanceHostnames() {
var newhn InstanceHostname var newhn instance.Hostname
for { for {
newhn = <-im.newInstanceNotifications newhn = <-im.newInstanceNotifications
// receive them fast out of the channel, let the adding function lock to add // receive them fast out of the channel, let the adding function lock to add
@ -187,8 +195,10 @@ func (im *InstanceManager) logInstanceReport() {
Msg("instance report") Msg("instance report")
} }
func (im *InstanceManager) listInstances() []*instance { // ListInstances dumps a slice of all Instances the InstanceManager knows
var out []*instance // about
func (im *InstanceManager) ListInstances() []*instance.Instance {
var out []*instance.Instance
im.lock() im.lock()
defer im.unlock() defer im.unlock()
for _, v := range im.instances { for _, v := range im.instances {
@ -199,7 +209,7 @@ func (im *InstanceManager) listInstances() []*instance {
func (im *InstanceManager) instanceSummaryReport() map[string]uint { func (im *InstanceManager) instanceSummaryReport() map[string]uint {
r := make(map[string]uint) r := make(map[string]uint)
for _, v := range im.listInstances() { for _, v := range im.ListInstances() {
v.Lock() v.Lock()
r[v.Status()]++ r[v.Status()]++
v.Unlock() v.Unlock()

View File

@ -1,4 +1,4 @@
package feta package process
import "os" import "os"
import "time" import "time"
@ -9,41 +9,42 @@ import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm
import "github.com/rs/zerolog" import "github.com/rs/zerolog"
import "github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
import "github.com/mattn/go-isatty" import "github.com/mattn/go-isatty"
import "github.com/sneak/feta/ingester"
// InstanceHostname is a special type for holding the hostname of an import "github.com/sneak/feta/ingester"
// instance (string) import "github.com/sneak/feta/storage"
type InstanceHostname string import "github.com/sneak/feta/locator"
import "github.com/sneak/feta/manager"
import "github.com/sneak/feta/instance"
// CLIEntry is the main entrypoint for the feta process from the cli // CLIEntry is the main entrypoint for the feta process from the cli
func CLIEntry(version string, buildarch string) int { func CLIEntry(version string, buildarch string) int {
f := new(Process) f := new(Feta)
f.version = version f.version = version
f.buildarch = buildarch f.buildarch = buildarch
f.setupLogging() f.setupLogging()
return f.runForever() return f.runForever()
} }
// Process is the main structure/process of this app // Feta is the main structure/process of this app
type Process struct { type Feta struct {
version string version string
buildarch string buildarch string
locator *InstanceLocator locator *locator.InstanceLocator
manager *InstanceManager manager *manager.InstanceManager
ingester *ingester.TootIngester ingester *ingester.TootIngester
api *fetaAPIServer api *Server
db *gorm.DB db *gorm.DB
startup time.Time startup time.Time
} }
func (f *Process) identify() { func (f *Feta) identify() {
log.Info(). log.Info().
Str("version", f.version). Str("version", f.version).
Str("buildarch", f.buildarch). Str("buildarch", f.buildarch).
Msg("starting") Msg("starting")
} }
func (f *Process) setupLogging() { func (f *Feta) setupLogging() {
log.Logger = log.With().Caller().Logger() log.Logger = log.With().Caller().Logger()
@ -72,11 +73,12 @@ func (f *Process) setupLogging() {
f.identify() f.identify()
} }
func (f *Process) uptime() time.Duration { func (f *Feta) uptime() time.Duration {
return time.Since(f.startup) return time.Since(f.startup)
} }
func (f *Process) setupDatabase() { /*
func (f *Feta) setupDatabase() {
var err error var err error
f.db, err = gorm.Open("sqlite3", "feta.sqlite") f.db, err = gorm.Open("sqlite3", "feta.sqlite")
@ -84,27 +86,38 @@ func (f *Process) setupDatabase() {
panic(err) panic(err)
} }
f.databaseMigrations() //f.databaseMigrations()
} }
*/
func (f *Process) runForever() int { func (f *Feta) runForever() int {
f.startup = time.Now() f.startup = time.Now()
f.setupDatabase() //f.setupDatabase()
newInstanceHostnameNotifications := make(chan InstanceHostname) // FIXME move this channel creation into the manager's constructor
// and add getters/setters on the manager/locator
newInstanceHostnameNotifications := make(chan instance.Hostname)
f.locator = newInstanceLocator() f.locator = locator.New()
f.manager = newInstanceManager() f.manager = manager.New()
f.ingester = ingester.NewTootIngester() f.ingester = ingester.NewTootIngester()
f.api = new(fetaAPIServer) home := os.Getenv("HOME")
f.api.setFeta(f) // api needs to get to us to access data if home == "" {
panic("can't find home directory")
}
f.locator.setInstanceNotificationChannel(newInstanceHostnameNotifications) diskBackend := storage.NewTootFSStorage(home + "/.local/feta")
f.manager.setInstanceNotificationChannel(newInstanceHostnameNotifications) f.ingester.SetStorageBackend(diskBackend)
f.manager.setTootDestination(f.ingester.GetDeliveryChannel()) f.api = new(Server)
f.api.SetFeta(f) // api needs to get to us to access data
f.locator.SetInstanceNotificationChannel(newInstanceHostnameNotifications)
f.manager.SetInstanceNotificationChannel(newInstanceHostnameNotifications)
f.manager.SetTootDestination(f.ingester.GetDeliveryChannel())
// ingester goroutine: // ingester goroutine:
go f.ingester.Ingest() go f.ingester.Ingest()

View File

@ -1,4 +1,4 @@
package feta package process
import "time" import "time"
import "net/http" import "net/http"
@ -11,25 +11,25 @@ import "github.com/gin-gonic/gin"
type hash map[string]interface{} type hash map[string]interface{}
func (a *fetaAPIServer) instances() []hash { func (a *Server) instances() []hash {
resp := make([]hash, 0) resp := make([]hash, 0)
now := time.Now() now := time.Now()
for _, v := range a.feta.manager.listInstances() { for _, v := range a.feta.manager.ListInstances() {
i := make(hash) i := make(hash)
// FIXME figure out why a very short lock here deadlocks // FIXME figure out why a very short lock here deadlocks
v.Lock() v.Lock()
i["hostname"] = v.hostname i["hostname"] = v.Hostname
i["nextCheck"] = v.nextFetch.UTC().Format(time.RFC3339) i["nextCheck"] = v.NextFetch.UTC().Format(time.RFC3339)
i["nextCheckAfter"] = (-1 * now.Sub(v.nextFetch)).String() i["nextCheckAfter"] = (-1 * now.Sub(v.NextFetch)).String()
i["successCount"] = v.successCount i["successCount"] = v.SuccessCount
i["errorCount"] = v.errorCount i["errorCount"] = v.ErrorCount
i["identified"] = v.identified i["identified"] = v.Identified
i["status"] = v.Status() i["status"] = v.Status()
i["software"] = "unknown" i["software"] = "unknown"
i["version"] = "unknown" i["version"] = "unknown"
if v.identified { if v.Identified {
i["software"] = v.serverImplementationString i["software"] = v.ServerImplementationString
i["version"] = v.serverVersionString i["version"] = v.ServerVersionString
} }
v.Unlock() v.Unlock()
resp = append(resp, i) resp = append(resp, i)
@ -37,21 +37,21 @@ func (a *fetaAPIServer) instances() []hash {
return resp return resp
} }
func (a *fetaAPIServer) instanceSummary() map[string]int { func (a *Server) instanceSummary() map[string]int {
resp := make(map[string]int) resp := make(map[string]int)
for _, v := range a.feta.manager.listInstances() { for _, v := range a.feta.manager.ListInstances() {
v.Lock() v.Lock()
resp[fmt.Sprintf("STATUS_%s", v.Status())]++ resp[fmt.Sprintf("STATUS_%s", v.Status())]++
if v.serverImplementationString != "" { if v.ServerImplementationString != "" {
//FIXME(sneak) sanitize this to a-z0-9, it is server-provided //FIXME(sneak) sanitize this to a-z0-9, it is server-provided
resp[fmt.Sprintf("SOFTWARE_%s", strings.ToUpper(v.serverImplementationString))]++ resp[fmt.Sprintf("SOFTWARE_%s", strings.ToUpper(v.ServerImplementationString))]++
} }
v.Unlock() v.Unlock()
} }
return resp return resp
} }
func (a *fetaAPIServer) getInstanceListHandler() http.HandlerFunc { func (a *Server) getInstanceListHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
result := &gin.H{ result := &gin.H{
@ -69,7 +69,7 @@ func (a *fetaAPIServer) getInstanceListHandler() http.HandlerFunc {
} }
} }
func (a *fetaAPIServer) getIndexHandler() http.HandlerFunc { func (a *Server) getIndexHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
index := &gin.H{ index := &gin.H{
"server": &gin.H{ "server": &gin.H{
@ -94,7 +94,7 @@ func (a *fetaAPIServer) getIndexHandler() http.HandlerFunc {
} }
} }
func (a *fetaAPIServer) getHealthCheckHandler() http.HandlerFunc { func (a *Server) getHealthCheckHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
resp := &gin.H{ resp := &gin.H{
"status": "ok", "status": "ok",

View File

@ -1,4 +1,4 @@
package feta package process
import "fmt" import "fmt"
import "net/http" import "net/http"
@ -10,19 +10,24 @@ import "github.com/rs/zerolog/log"
import "github.com/gin-gonic/gin" import "github.com/gin-gonic/gin"
import "github.com/dn365/gin-zerolog" import "github.com/dn365/gin-zerolog"
type fetaAPIServer struct { // Server is the HTTP webserver object
feta *Process type Server struct {
feta *Feta
port uint port uint
router *gin.Engine router *gin.Engine
server *http.Server server *http.Server
debug bool debug bool
} }
func (a *fetaAPIServer) setFeta(feta *Process) { // SetFeta tells the http Server where to find the Process object so that it
// can pull stats and other information for serving via http
func (a *Server) SetFeta(feta *Feta) {
a.feta = feta a.feta = feta
} }
func (a *fetaAPIServer) Serve() { // Serve is the entrypoint for the Server, which should run in its own
// goroutine (started by the Process)
func (a *Server) Serve() {
if a.feta == nil { if a.feta == nil {
panic("must have feta app from which to serve stats") panic("must have feta app from which to serve stats")
} }
@ -50,7 +55,7 @@ func (a *fetaAPIServer) Serve() {
} }
} }
func (a *fetaAPIServer) initRouter() { func (a *Server) initRouter() {
// empty router // empty router
r := gin.New() r := gin.New()
@ -69,7 +74,7 @@ func (a *fetaAPIServer) initRouter() {
a.router = r a.router = r
} }
func (a *fetaAPIServer) initServer() { func (a *Server) initServer() {
if !a.debug { if !a.debug {
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
} }