sacrifice a dead chicken to the idiom gods

* passes linting now so should pass CI
This commit is contained in:
Jeffrey Paul 2019-12-14 07:24:42 -08:00
parent 4b34043e07
commit 4d15e61fd6
11 changed files with 235 additions and 215 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
feta feta
output/ output/
feta.sqlite feta.sqlite
.lintsetup

View File

@ -25,7 +25,7 @@ ifneq ($(UNAME_S),Darwin)
GOFLAGS = -ldflags "-linkmode external -extldflags -static $(GOLDFLAGS)" GOFLAGS = -ldflags "-linkmode external -extldflags -static $(GOLDFLAGS)"
endif endif
default: rundebug default: run
rundebug: build rundebug: build
GOTRACEBACK=all DEBUG=1 ./$(FN) GOTRACEBACK=all DEBUG=1 ./$(FN)
@ -38,9 +38,12 @@ clean:
build: ./$(FN) build: ./$(FN)
lint: .lintsetup:
go get -u golang.org/x/lint/golint go get -u golang.org/x/lint/golint
go get -u github.com/GeertJohan/fgt go get -u github.com/GeertJohan/fgt
touch .lintsetup
lint: .lintsetup
fgt golint fgt golint
go-get: go-get:
@ -52,7 +55,7 @@ go-get:
fmt: fmt:
go fmt *.go go fmt *.go
test: build-docker-image test: lint build-docker-image
is_uncommitted: is_uncommitted:
git diff --exit-code >/dev/null 2>&1 git diff --exit-code >/dev/null 2>&1

View File

@ -11,7 +11,7 @@ import "github.com/gin-gonic/gin"
type hash map[string]interface{} type hash map[string]interface{}
func (a *FetaAPIServer) instances() []hash { func (a *fetaAPIServer) instances() []hash {
resp := make([]hash, 0) resp := make([]hash, 0)
now := time.Now() now := time.Now()
for _, v := range a.feta.manager.listInstances() { for _, v := range a.feta.manager.listInstances() {
@ -37,7 +37,7 @@ func (a *FetaAPIServer) instances() []hash {
return resp return resp
} }
func (a *FetaAPIServer) instanceSummary() map[string]int { func (a *fetaAPIServer) instanceSummary() map[string]int {
resp := make(map[string]int) resp := make(map[string]int)
for _, v := range a.feta.manager.listInstances() { for _, v := range a.feta.manager.listInstances() {
v.Lock() v.Lock()
@ -51,7 +51,7 @@ func (a *FetaAPIServer) instanceSummary() map[string]int {
return resp return resp
} }
func (a *FetaAPIServer) getInstanceListHandler() http.HandlerFunc { func (a *fetaAPIServer) getInstanceListHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
result := &gin.H{ result := &gin.H{
@ -69,12 +69,12 @@ func (a *FetaAPIServer) getInstanceListHandler() http.HandlerFunc {
} }
} }
func (a *FetaAPIServer) getIndexHandler() http.HandlerFunc { func (a *fetaAPIServer) getIndexHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
index := &gin.H{ index := &gin.H{
"server": &gin.H{ "server": &gin.H{
"now": time.Now().UTC().Format(time.RFC3339), "now": time.Now().UTC().Format(time.RFC3339),
"uptime": a.feta.Uptime().String(), "uptime": a.feta.uptime().String(),
"goroutines": runtime.NumGoroutine(), "goroutines": runtime.NumGoroutine(),
"goversion": runtime.Version(), "goversion": runtime.Version(),
"version": a.feta.version, "version": a.feta.version,
@ -96,12 +96,12 @@ func (a *FetaAPIServer) getIndexHandler() http.HandlerFunc {
} }
} }
func (a *FetaAPIServer) getHealthCheckHandler() http.HandlerFunc { func (a *fetaAPIServer) getHealthCheckHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
resp := &gin.H{ resp := &gin.H{
"status": "ok", "status": "ok",
"now": time.Now().UTC().Format(time.RFC3339), "now": time.Now().UTC().Format(time.RFC3339),
"uptime": a.feta.Uptime().String(), "uptime": a.feta.uptime().String(),
} }
json, err := json.Marshal(resp) json, err := json.Marshal(resp)

View File

@ -10,19 +10,19 @@ import "github.com/rs/zerolog/log"
import "github.com/gin-gonic/gin" import "github.com/gin-gonic/gin"
import "github.com/dn365/gin-zerolog" import "github.com/dn365/gin-zerolog"
type FetaAPIServer struct { type fetaAPIServer struct {
feta *FetaProcess feta *Process
port uint port uint
router *gin.Engine router *gin.Engine
server *http.Server server *http.Server
debug bool debug bool
} }
func (self *FetaAPIServer) SetFeta(feta *FetaProcess) { func (a *fetaAPIServer) setFeta(feta *Process) {
self.feta = feta a.feta = feta
} }
func (a *FetaAPIServer) Serve() { func (a *fetaAPIServer) serve() {
if a.feta == nil { if a.feta == nil {
panic("must have feta app from which to serve stats") panic("must have feta app from which to serve stats")
} }
@ -50,7 +50,7 @@ func (a *FetaAPIServer) Serve() {
} }
} }
func (a *FetaAPIServer) initRouter() { func (a *fetaAPIServer) initRouter() {
// empty router // empty router
r := gin.New() r := gin.New()
@ -69,7 +69,7 @@ func (a *FetaAPIServer) initRouter() {
a.router = r a.router = r
} }
func (a *FetaAPIServer) initServer() { func (a *fetaAPIServer) initServer() {
if !a.debug { if !a.debug {
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
} }

View File

@ -1,14 +1,14 @@
package feta package feta
import "github.com/jinzhu/gorm" import "github.com/jinzhu/gorm"
import _ "github.com/jinzhu/gorm/dialects/sqlite" import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm
type SavedInstance struct { type savedInstance struct {
gorm.Model gorm.Model
hostname string hostname string
software string software string
} }
func (f *FetaProcess) databaseMigrations() { func (f *Process) databaseMigrations() {
f.db.AutoMigrate(&SavedInstance{}) f.db.AutoMigrate(&savedInstance{})
} }

43
feta.go
View File

@ -4,38 +4,41 @@ import "os"
import "time" import "time"
import "github.com/jinzhu/gorm" import "github.com/jinzhu/gorm"
import _ "github.com/jinzhu/gorm/dialects/sqlite" import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm
import "github.com/rs/zerolog" import "github.com/rs/zerolog"
import "github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
import "github.com/mattn/go-isatty" import "github.com/mattn/go-isatty"
// InstanceHostname is a special type for holding the hostname of an
// instance (string)
type InstanceHostname string type InstanceHostname string
// CLIEntry is the main entrypoint for the feta process from the cli
func CLIEntry(version string, buildtime string, buildarch string, builduser string) int { func CLIEntry(version string, buildtime string, buildarch string, builduser string) int {
f := new(FetaProcess) f := new(Process)
f.version = version f.version = version
f.buildtime = buildtime f.buildtime = buildtime
f.buildarch = buildarch f.buildarch = buildarch
f.builduser = builduser f.builduser = builduser
f.setupLogging() f.setupLogging()
return f.RunForever() return f.runForever()
} }
// FetaProcess is the main structure/process of this app // Process is the main structure/process of this app
type FetaProcess struct { type Process struct {
version string version string
buildtime string buildtime string
buildarch string buildarch string
builduser string builduser string
locator *InstanceLocator locator *InstanceLocator
manager *InstanceManager manager *InstanceManager
api *FetaAPIServer api *fetaAPIServer
db *gorm.DB db *gorm.DB
startup time.Time startup time.Time
} }
func (f *FetaProcess) identify() { func (f *Process) identify() {
log.Info(). log.Info().
Str("version", f.version). Str("version", f.version).
Str("buildtime", f.buildtime). Str("buildtime", f.buildtime).
@ -44,7 +47,7 @@ func (f *FetaProcess) identify() {
Msg("starting") Msg("starting")
} }
func (f *FetaProcess) setupLogging() { func (f *Process) setupLogging() {
log.Logger = log.With().Caller().Logger() log.Logger = log.With().Caller().Logger()
@ -73,11 +76,11 @@ func (f *FetaProcess) setupLogging() {
f.identify() f.identify()
} }
func (f *FetaProcess) Uptime() time.Duration { func (f *Process) uptime() time.Duration {
return time.Since(f.startup) return time.Since(f.startup)
} }
func (f *FetaProcess) setupDatabase() { func (f *Process) setupDatabase() {
var err error var err error
f.db, err = gorm.Open("sqlite3", "feta.sqlite") f.db, err = gorm.Open("sqlite3", "feta.sqlite")
@ -88,28 +91,28 @@ func (f *FetaProcess) setupDatabase() {
f.databaseMigrations() f.databaseMigrations()
} }
func (f *FetaProcess) RunForever() int { func (f *Process) runForever() int {
f.startup = time.Now() f.startup = time.Now()
f.setupDatabase() f.setupDatabase()
newInstanceHostnameNotifications := make(chan InstanceHostname) newInstanceHostnameNotifications := make(chan InstanceHostname)
f.locator = NewInstanceLocator() f.locator = newInstanceLocator()
f.manager = NewInstanceManager() f.manager = newInstanceManager()
f.api = new(FetaAPIServer) f.api = new(fetaAPIServer)
f.api.feta = f // api needs to get to us to access data f.api.setFeta(f) // api needs to get to us to access data
f.locator.AddInstanceNotificationChannel(newInstanceHostnameNotifications) f.locator.addInstanceNotificationChannel(newInstanceHostnameNotifications)
f.manager.AddInstanceNotificationChannel(newInstanceHostnameNotifications) f.manager.addInstanceNotificationChannel(newInstanceHostnameNotifications)
// locator goroutine: // locator goroutine:
go f.locator.Locate() go f.locator.locate()
// manager goroutine: // manager goroutine:
go f.manager.Manage() go f.manager.manage()
go f.api.Serve() go f.api.serve()
// this goroutine (main) does nothing until we handle signals // this goroutine (main) does nothing until we handle signals
// FIXME(sneak) // FIXME(sneak)

View File

@ -13,36 +13,36 @@ import "errors"
import "github.com/looplab/fsm" import "github.com/looplab/fsm"
import "github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
const NodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0" const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0"
const INSTANCE_NODEINFO_TIMEOUT = time.Second * 50 const instanceNodeinfoTimeout = time.Second * 50
const INSTANCE_HTTP_TIMEOUT = time.Second * 50 const instanceHTTPTimeout = time.Second * 50
const INSTANCE_SPIDER_INTERVAL = time.Second * 60 const instanceSpiderInterval = time.Second * 120
const INSTANCE_ERROR_INTERVAL = time.Second * 60 * 30 const instanceErrorInterval = time.Second * 60 * 30
type instanceImplementation int type instanceImplementation int
const ( const (
Unknown instanceImplementation = iota implUnknown instanceImplementation = iota
Mastodon implMastodon
Pleroma implPleroma
) )
type instance struct { type instance struct {
structLock sync.Mutex structLock sync.Mutex
errorCount uint errorCount uint
successCount uint successCount uint
highestId int highestID int
hostname string hostname string
identified bool identified bool
fetching bool fetching bool
implementation instanceImplementation implementation instanceImplementation
backend *InstanceBackend backend *instanceBackend
nextFetch time.Time nextFetch time.Time
nodeInfoUrl string nodeInfoURL string
serverVersionString string serverVersionString string
serverImplementationString string serverImplementationString string
fetchingLock sync.Mutex fetchingLock sync.Mutex
@ -50,7 +50,7 @@ type instance struct {
fsmLock sync.Mutex fsmLock sync.Mutex
} }
func NewInstance(options ...func(i *instance)) *instance { func newInstance(options ...func(i *instance)) *instance {
i := new(instance) i := new(instance)
i.setNextFetchAfter(1 * time.Second) i.setNextFetchAfter(1 * time.Second)
@ -107,7 +107,7 @@ func (i *instance) Unlock() {
func (i *instance) bumpFetch() { func (i *instance) bumpFetch() {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
i.nextFetch = time.Now().Add(100 * time.Second) i.nextFetch = time.Now().Add(120 * time.Second)
} }
func (i *instance) setNextFetchAfter(d time.Duration) { func (i *instance) setNextFetchAfter(d time.Duration) {
@ -120,7 +120,7 @@ func (i *instance) Fetch() {
i.fetchingLock.Lock() i.fetchingLock.Lock()
defer i.fetchingLock.Unlock() defer i.fetchingLock.Unlock()
i.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) i.setNextFetchAfter(instanceErrorInterval)
err := i.detectNodeTypeIfNecessary() err := i.detectNodeTypeIfNecessary()
if err != nil { if err != nil {
@ -131,7 +131,7 @@ func (i *instance) Fetch() {
return return
} }
i.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) i.setNextFetchAfter(instanceSpiderInterval)
log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", i.hostname) log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", i.hostname)
} }
@ -161,7 +161,7 @@ func (i *instance) Tick() {
func (i *instance) nodeIdentified() bool { func (i *instance) nodeIdentified() bool {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
if i.implementation > Unknown { if i.implementation > implUnknown {
return true return true
} }
return false return false
@ -170,9 +170,8 @@ func (i *instance) nodeIdentified() bool {
func (i *instance) detectNodeTypeIfNecessary() error { func (i *instance) detectNodeTypeIfNecessary() error {
if !i.nodeIdentified() { if !i.nodeIdentified() {
return i.fetchNodeInfo() return i.fetchNodeInfo()
} else {
return nil
} }
return nil
} }
func (i *instance) registerError() { func (i *instance) registerError() {
@ -196,7 +195,7 @@ func (i *instance) Up() bool {
func (i *instance) fetchNodeInfoURL() error { func (i *instance) fetchNodeInfoURL() error {
url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname) url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname)
var c = &http.Client{ var c = &http.Client{
Timeout: INSTANCE_NODEINFO_TIMEOUT, Timeout: instanceNodeinfoTimeout,
} }
log.Debug(). log.Debug().
@ -229,7 +228,7 @@ func (i *instance) fetchNodeInfoURL() error {
return err return err
} }
nir := new(NodeInfoWellKnownResponse) nir := new(nodeInfoWellKnownResponse)
err = json.Unmarshal(body, &nir) err = json.Unmarshal(body, &nir)
if err != nil { if err != nil {
log.Debug(). log.Debug().
@ -242,14 +241,14 @@ func (i *instance) fetchNodeInfoURL() error {
} }
for _, item := range nir.Links { for _, item := range nir.Links {
if item.Rel == NodeInfoSchemaVersionTwoName { if item.Rel == nodeInfoSchemaVersionTwoName {
log.Debug(). log.Debug().
Str("hostname", i.hostname). Str("hostname", i.hostname).
Str("nodeinfourl", item.Href). Str("nodeinfourl", item.Href).
Msg("success fetching url for nodeinfo") Msg("success fetching url for nodeinfo")
i.Lock() i.Lock()
i.nodeInfoUrl = item.Href i.nodeInfoURL = item.Href
i.Unlock() i.Unlock()
i.registerSuccess() i.registerSuccess()
i.Event("GOT_NODEINFO_URL") i.Event("GOT_NODEINFO_URL")
@ -278,13 +277,13 @@ func (i *instance) fetchNodeInfo() error {
} }
var c = &http.Client{ var c = &http.Client{
Timeout: INSTANCE_NODEINFO_TIMEOUT, Timeout: instanceNodeinfoTimeout,
} }
//FIXME make sure the nodeinfourl is on the same domain as the instance //FIXME make sure the nodeinfourl is on the same domain as the instance
//hostname //hostname
i.Lock() i.Lock()
url := i.nodeInfoUrl url := i.nodeInfoURL
i.Unlock() i.Unlock()
i.Event("BEGIN_NODEINFO_FETCH") i.Event("BEGIN_NODEINFO_FETCH")
@ -313,7 +312,7 @@ func (i *instance) fetchNodeInfo() error {
return err return err
} }
ni := new(NodeInfoVersionTwoSchema) ni := new(nodeInfoVersionTwoSchema)
err = json.Unmarshal(body, &ni) err = json.Unmarshal(body, &ni)
if err != nil { if err != nil {
log.Error(). log.Error().
@ -329,7 +328,7 @@ func (i *instance) fetchNodeInfo() error {
Str("serverVersion", ni.Software.Version). Str("serverVersion", ni.Software.Version).
Str("software", ni.Software.Name). Str("software", ni.Software.Name).
Str("hostname", i.hostname). Str("hostname", i.hostname).
Str("nodeInfoUrl", i.nodeInfoUrl). Str("nodeInfoURL", i.nodeInfoURL).
Msg("received nodeinfo from instance") Msg("received nodeinfo from instance")
i.Lock() i.Lock()
@ -343,7 +342,7 @@ func (i *instance) fetchNodeInfo() error {
Str("software", ni.Software.Name). Str("software", ni.Software.Name).
Msg("detected server software") Msg("detected server software")
i.identified = true i.identified = true
i.implementation = Pleroma i.implementation = implPleroma
i.Unlock() i.Unlock()
i.registerSuccess() i.registerSuccess()
i.Event("GOT_NODEINFO") i.Event("GOT_NODEINFO")
@ -354,7 +353,7 @@ func (i *instance) fetchNodeInfo() error {
Str("software", ni.Software.Name). Str("software", ni.Software.Name).
Msg("detected server software") Msg("detected server software")
i.identified = true i.identified = true
i.implementation = Mastodon i.implementation = implMastodon
i.Unlock() i.Unlock()
i.registerSuccess() i.registerSuccess()
i.Event("GOT_NODEINFO") i.Event("GOT_NODEINFO")

View File

@ -4,7 +4,7 @@ import "time"
// thank fuck for https://mholt.github.io/json-to-go/ otherwise // thank fuck for https://mholt.github.io/json-to-go/ otherwise
// this would have been a giant pain in the dick // this would have been a giant pain in the dick
type MastodonIndexResponse struct { type mastodonIndexResponse struct {
Instances []struct { Instances []struct {
ID string `json:"_id"` ID string `json:"_id"`
AddedAt time.Time `json:"addedAt"` AddedAt time.Time `json:"addedAt"`
@ -48,7 +48,7 @@ type MastodonIndexResponse struct {
} `json:"instances"` } `json:"instances"`
} }
type PleromaIndexResponse []struct { type pleromaIndexResponse []struct {
Domain string `json:"domain"` Domain string `json:"domain"`
Title string `json:"title"` Title string `json:"title"`
Thumbnail string `json:"thumbnail"` Thumbnail string `json:"thumbnail"`
@ -62,7 +62,7 @@ type PleromaIndexResponse []struct {
TextLimit int `json:"text_limit"` TextLimit int `json:"text_limit"`
} }
type NodeInfoVersionTwoSchema struct { type nodeInfoVersionTwoSchema struct {
Version string `json:"version"` Version string `json:"version"`
Software struct { Software struct {
Name string `json:"name"` Name string `json:"name"`
@ -80,7 +80,7 @@ type NodeInfoVersionTwoSchema struct {
OpenRegistrations bool `json:"openRegistrations"` OpenRegistrations bool `json:"openRegistrations"`
} }
type NodeInfoWellKnownResponse struct { type nodeInfoWellKnownResponse struct {
Links []struct { Links []struct {
Rel string `json:"rel"` Rel string `json:"rel"`
Href string `json:"href"` Href string `json:"href"`

View File

@ -9,69 +9,82 @@ import "sync"
import "github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
import "golang.org/x/sync/semaphore" import "golang.org/x/sync/semaphore"
const INDEX_API_TIMEOUT = time.Second * 60 // IndexAPITimeout is the timeout for fetching json instance lists
// from the listing servers
const IndexAPITimeout = time.Second * 60
var USER_AGENT = "https://github.com/sneak/feta indexer bot; sneak@sneak.berlin for feedback" // UserAgent is the user-agent string we provide to servers
var UserAgent = "feta indexer bot, sneak@sneak.berlin for feedback"
// INDEX_CHECK_INTERVAL defines the interval for downloading new lists from // IndexCheckInterval defines the interval for downloading new lists from
// the index APIs run by mastodon/pleroma (default: 1h) // the index APIs run by mastodon/pleroma (default: 1h)
var INDEX_CHECK_INTERVAL = time.Second * 60 * 60 var IndexCheckInterval = time.Second * 60 * 60
// INDEX_ERROR_INTERVAL is used for when the index fetch/parse fails // IndexErrorInterval is used for when the index fetch/parse fails
// (default: 10m) // (default: 10m)
var INDEX_ERROR_INTERVAL = time.Second * 60 * 10 var IndexErrorInterval = time.Second * 60 * 10
// LOG_REPORT_INTERVAL defines how long between logging internal // LogReportInterval defines how long between logging internal
// stats/reporting for user supervision // stats/reporting for user supervision
var LOG_REPORT_INTERVAL = time.Second * 10 var LogReportInterval = time.Second * 10
const mastodonIndexUrl = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false" const mastodonIndexURL = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false"
const pleromaIndexUrl = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api.cgi" const pleromaIndexURL = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api.cgi"
// InstanceLocator is the main data structure for the locator goroutine
// which sprays discovered instance hostnames into the manager
type InstanceLocator struct { type InstanceLocator struct {
pleromaIndexNextRefresh *time.Time pleromaIndexNextRefresh *time.Time
mastodonIndexNextRefresh *time.Time mastodonIndexNextRefresh *time.Time
reportInstanceVia chan InstanceHostname reportInstanceVia chan InstanceHostname
sync.Mutex mu sync.Mutex
} }
func NewInstanceLocator() *InstanceLocator { func newInstanceLocator() *InstanceLocator {
i := new(InstanceLocator) il := new(InstanceLocator)
n := time.Now() n := time.Now()
i.pleromaIndexNextRefresh = &n il.pleromaIndexNextRefresh = &n
i.mastodonIndexNextRefresh = &n il.mastodonIndexNextRefresh = &n
return i return il
} }
func (self *InstanceLocator) AddInstanceNotificationChannel(via chan InstanceHostname) { func (il *InstanceLocator) lock() {
self.Lock() il.mu.Lock()
defer self.Unlock()
self.reportInstanceVia = via
} }
func (self *InstanceLocator) addInstance(hostname InstanceHostname) { func (il *InstanceLocator) unlock() {
il.mu.Unlock()
}
func (il *InstanceLocator) addInstanceNotificationChannel(via chan InstanceHostname) {
il.lock()
defer il.unlock()
il.reportInstanceVia = via
}
func (il *InstanceLocator) addInstance(hostname InstanceHostname) {
// receiver (InstanceManager) is responsible for de-duping against its // receiver (InstanceManager) is responsible for de-duping against its
// map, we just spray // map, we just locate and spray, it manages
self.reportInstanceVia <- hostname il.reportInstanceVia <- hostname
} }
func (self *InstanceLocator) mastodonIndexRefreshDue() bool { func (il *InstanceLocator) mastodonIndexRefreshDue() bool {
return self.mastodonIndexNextRefresh.Before(time.Now()) return il.mastodonIndexNextRefresh.Before(time.Now())
} }
func (self *InstanceLocator) durationUntilNextMastodonIndexRefresh() time.Duration { func (il *InstanceLocator) durationUntilNextMastodonIndexRefresh() time.Duration {
return (time.Duration(-1) * time.Now().Sub(*self.mastodonIndexNextRefresh)) return (time.Duration(-1) * time.Now().Sub(*il.mastodonIndexNextRefresh))
} }
func (self *InstanceLocator) pleromaIndexRefreshDue() bool { func (il *InstanceLocator) pleromaIndexRefreshDue() bool {
return self.pleromaIndexNextRefresh.Before(time.Now()) return il.pleromaIndexNextRefresh.Before(time.Now())
} }
func (self *InstanceLocator) durationUntilNextPleromaIndexRefresh() time.Duration { func (il *InstanceLocator) durationUntilNextPleromaIndexRefresh() time.Duration {
return (time.Duration(-1) * time.Now().Sub(*self.pleromaIndexNextRefresh)) return (time.Duration(-1) * time.Now().Sub(*il.pleromaIndexNextRefresh))
} }
func (self *InstanceLocator) Locate() { func (il *InstanceLocator) locate() {
log.Info().Msg("InstanceLocator starting") log.Info().Msg("InstanceLocator starting")
x := time.Now() x := time.Now()
var pleromaSemaphore = semaphore.NewWeighted(1) var pleromaSemaphore = semaphore.NewWeighted(1)
@ -82,90 +95,90 @@ func (self *InstanceLocator) Locate() {
log.Info().Msg("InstanceLocator tick") log.Info().Msg("InstanceLocator tick")
go func() { go func() {
if self.pleromaIndexRefreshDue() { if il.pleromaIndexRefreshDue() {
if !pleromaSemaphore.TryAcquire(1) { if !pleromaSemaphore.TryAcquire(1) {
return return
} }
self.locatePleroma() il.locatePleroma()
pleromaSemaphore.Release(1) pleromaSemaphore.Release(1)
} }
}() }()
go func() { go func() {
if self.mastodonIndexRefreshDue() { if il.mastodonIndexRefreshDue() {
if !mastodonSemaphore.TryAcquire(1) { if !mastodonSemaphore.TryAcquire(1) {
return return
} }
self.locateMastodon() il.locateMastodon()
mastodonSemaphore.Release(1) mastodonSemaphore.Release(1)
} }
}() }()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) { if time.Now().After(x.Add(LogReportInterval)) {
x = time.Now() x = time.Now()
log.Debug(). log.Debug().
Str("nextMastodonIndexRefresh", self.durationUntilNextMastodonIndexRefresh().String()). Str("nextMastodonIndexRefresh", il.durationUntilNextMastodonIndexRefresh().String()).
Msg("refresh countdown") Msg("refresh countdown")
log.Debug(). log.Debug().
Str("nextPleromaIndexRefresh", self.durationUntilNextPleromaIndexRefresh().String()). Str("nextPleromaIndexRefresh", il.durationUntilNextPleromaIndexRefresh().String()).
Msg("refresh countdown") Msg("refresh countdown")
} }
} }
} }
func (self *InstanceLocator) locateMastodon() { func (il *InstanceLocator) locateMastodon() {
var c = &http.Client{ var c = &http.Client{
Timeout: INDEX_API_TIMEOUT, Timeout: IndexAPITimeout,
} }
req, err := http.NewRequest("GET", mastodonIndexUrl, nil) req, err := http.NewRequest("GET", mastodonIndexURL, nil)
if err != nil { if err != nil {
panic(err) panic(err)
} }
req.Header.Set("User-Agent", USER_AGENT) req.Header.Set("User-Agent", UserAgent)
resp, err := c.Do(req) resp, err := c.Do(req)
if err != nil { if err != nil {
log.Error().Msgf("unable to fetch mastodon instance list: %s", err) log.Error().Msgf("unable to fetch mastodon instance list: %s", err)
t := time.Now().Add(INDEX_ERROR_INTERVAL) t := time.Now().Add(IndexErrorInterval)
self.Lock() il.lock()
self.mastodonIndexNextRefresh = &t il.mastodonIndexNextRefresh = &t
self.Unlock() il.unlock()
return return
} else { }
log.Info(). log.Info().
Msg("fetched mastodon index") Msg("fetched mastodon index")
}
defer resp.Body.Close() defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
log.Error().Msgf("unable to fetch mastodon instance list: %s", err) log.Error().Msgf("unable to fetch mastodon instance list: %s", err)
t := time.Now().Add(INDEX_ERROR_INTERVAL) t := time.Now().Add(IndexErrorInterval)
self.Lock() il.lock()
self.mastodonIndexNextRefresh = &t il.mastodonIndexNextRefresh = &t
self.Unlock() il.unlock()
return return
} }
t := time.Now().Add(INDEX_CHECK_INTERVAL) t := time.Now().Add(IndexCheckInterval)
self.Lock() il.lock()
self.mastodonIndexNextRefresh = &t il.mastodonIndexNextRefresh = &t
self.Unlock() il.unlock()
mi := new(MastodonIndexResponse) mi := new(mastodonIndexResponse)
err = json.Unmarshal(body, &mi) err = json.Unmarshal(body, &mi)
if err != nil { if err != nil {
log.Error().Msgf("unable to parse mastodon instance list: %s", err) log.Error().Msgf("unable to parse mastodon instance list: %s", err)
t := time.Now().Add(INDEX_ERROR_INTERVAL) t := time.Now().Add(IndexErrorInterval)
self.Lock() il.lock()
self.mastodonIndexNextRefresh = &t il.mastodonIndexNextRefresh = &t
self.Unlock() il.unlock()
return return
} }
@ -179,31 +192,31 @@ func (self *InstanceLocator) locateMastodon() {
Int("count", x). Int("count", x).
Msg("received hosts from mastodon index") Msg("received hosts from mastodon index")
for k, _ := range hosts { for k := range hosts {
self.addInstance(InstanceHostname(k)) il.addInstance(InstanceHostname(k))
} }
} }
func (self *InstanceLocator) locatePleroma() { func (il *InstanceLocator) locatePleroma() {
var c = &http.Client{ var c = &http.Client{
Timeout: INDEX_API_TIMEOUT, Timeout: IndexAPITimeout,
} }
req, err := http.NewRequest("GET", pleromaIndexUrl, nil) req, err := http.NewRequest("GET", pleromaIndexURL, nil)
if err != nil { if err != nil {
panic(err) panic(err)
} }
req.Header.Set("User-Agent", USER_AGENT) req.Header.Set("User-Agent", UserAgent)
resp, err := c.Do(req) resp, err := c.Do(req)
if err != nil { if err != nil {
log.Error().Msgf("unable to fetch pleroma instance list: %s", err) log.Error().Msgf("unable to fetch pleroma instance list: %s", err)
t := time.Now().Add(INDEX_ERROR_INTERVAL) t := time.Now().Add(IndexErrorInterval)
self.Lock() il.lock()
self.pleromaIndexNextRefresh = &t il.pleromaIndexNextRefresh = &t
self.Unlock() il.unlock()
return return
} }
@ -212,28 +225,28 @@ func (self *InstanceLocator) locatePleroma() {
if err != nil { if err != nil {
log.Error().Msgf("unable to fetch pleroma instance list: %s", err) log.Error().Msgf("unable to fetch pleroma instance list: %s", err)
t := time.Now().Add(INDEX_ERROR_INTERVAL) t := time.Now().Add(IndexErrorInterval)
self.Lock() il.lock()
self.pleromaIndexNextRefresh = &t il.pleromaIndexNextRefresh = &t
self.Unlock() il.unlock()
return return
} }
// fetch worked // fetch worked
t := time.Now().Add(INDEX_CHECK_INTERVAL) t := time.Now().Add(IndexCheckInterval)
self.Lock() il.lock()
self.pleromaIndexNextRefresh = &t il.pleromaIndexNextRefresh = &t
self.Unlock() il.unlock()
pi := new(PleromaIndexResponse) pi := new(pleromaIndexResponse)
err = json.Unmarshal(body, &pi) err = json.Unmarshal(body, &pi)
if err != nil { if err != nil {
log.Warn().Msgf("unable to parse pleroma instance list: %s", err) log.Warn().Msgf("unable to parse pleroma instance list: %s", err)
t := time.Now().Add(INDEX_ERROR_INTERVAL) t := time.Now().Add(IndexErrorInterval)
self.Lock() il.lock()
self.pleromaIndexNextRefresh = &t il.pleromaIndexNextRefresh = &t
self.Unlock() il.unlock()
return return
} }
@ -247,8 +260,8 @@ func (self *InstanceLocator) locatePleroma() {
Int("count", x). Int("count", x).
Msg("received hosts from pleroma index") Msg("received hosts from pleroma index")
for k, _ := range hosts { for k := range hosts {
self.addInstance(InstanceHostname(k)) il.addInstance(InstanceHostname(k))
} }
} }

View File

@ -7,13 +7,14 @@ import "runtime"
//import "github.com/gin-gonic/gin" //import "github.com/gin-gonic/gin"
import "github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
const HOST_DISCOVERY_PARALLELISM = 1 const hostDiscoveryParallelism = 20
type InstanceBackend interface {
type instanceBackend interface {
//FIXME //FIXME
} }
// InstanceManager is the main data structure for the goroutine that manages
// the list of all known instances, fed by the locator
type InstanceManager struct { type InstanceManager struct {
mu sync.Mutex mu sync.Mutex
instances map[InstanceHostname]*instance instances map[InstanceHostname]*instance
@ -22,14 +23,14 @@ type InstanceManager struct {
hostAdderSemaphore chan bool hostAdderSemaphore chan bool
} }
func NewInstanceManager() *InstanceManager { func newInstanceManager() *InstanceManager {
i := new(InstanceManager) i := new(InstanceManager)
i.hostAdderSemaphore = make(chan bool, HOST_DISCOVERY_PARALLELISM) i.hostAdderSemaphore = make(chan bool, hostDiscoveryParallelism)
i.instances = make(map[InstanceHostname]*instance) i.instances = make(map[InstanceHostname]*instance)
return i return i
} }
func (self *InstanceManager) logCaller(msg string) { func (im *InstanceManager) logCaller(msg string) {
fpcs := make([]uintptr, 1) fpcs := make([]uintptr, 1)
// Skip 2 levels to get the caller // Skip 2 levels to get the caller
n := runtime.Callers(3, fpcs) n := runtime.Callers(3, fpcs)
@ -53,49 +54,47 @@ func (self *InstanceManager) logCaller(msg string) {
Msg(msg) Msg(msg)
} }
func (self *InstanceManager) Lock() { func (im *InstanceManager) lock() {
//self.logCaller("instancemanager attempting to lock") im.mu.Lock()
self.mu.Lock()
//self.logCaller("instancemanager locked")
} }
func (self *InstanceManager) Unlock() { func (im *InstanceManager) unlock() {
self.mu.Unlock() im.mu.Unlock()
//self.logCaller("instancemanager unlocked")
} }
func (self *InstanceManager) AddInstanceNotificationChannel(via chan InstanceHostname) { func (im *InstanceManager) addInstanceNotificationChannel(via chan InstanceHostname) {
self.Lock() im.lock()
defer self.Unlock() defer im.unlock()
self.newInstanceNotifications = via im.newInstanceNotifications = via
} }
func (self *InstanceManager) Manage() { func (im *InstanceManager) manage() {
log.Info().Msg("InstanceManager starting") log.Info().Msg("InstanceManager starting")
go func() { go func() {
self.receiveNewInstanceHostnames() im.receiveNewInstanceHostnames()
}() }()
self.startup = time.Now() im.startup = time.Now()
x := self.startup x := im.startup
for { for {
log.Info().Msg("InstanceManager tick") log.Info().Msg("InstanceManager tick")
self.managerLoop() im.managerLoop()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) { if time.Now().After(x.Add(LogReportInterval)) {
x = time.Now() x = time.Now()
self.logInstanceReport() im.logInstanceReport()
} }
} }
} }
func (self *InstanceManager) managerLoop() { func (im *InstanceManager) managerLoop() {
self.Lock() im.lock()
il := make([]*instance, 0) il := make([]*instance, 0)
for _, v := range self.instances { for _, v := range im.instances {
il = append(il, v) il = append(il, v)
} }
self.Unlock() im.unlock()
// FIXME is this a bug outside of the mutex above?
for _, v := range il { for _, v := range il {
go func(i *instance) { go func(i *instance) {
i.Tick() i.Tick()
@ -103,10 +102,10 @@ func (self *InstanceManager) managerLoop() {
} }
} }
func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool { func (im *InstanceManager) hostnameExists(newhn InstanceHostname) bool {
self.Lock() im.lock()
defer self.Unlock() defer im.unlock()
for k, _ := range self.instances { for k := range im.instances {
if newhn == k { if newhn == k {
return true return true
} }
@ -114,15 +113,16 @@ func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool {
return false return false
} }
func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { func (im *InstanceManager) addInstanceByHostname(newhn InstanceHostname) {
if self.hostnameExists(newhn) { if im.hostnameExists(newhn) {
// ignore adding new if we already know about it
return return
} }
// this blocks on the channel size, limiting concurrency // this blocks on the channel size, limiting concurrency
self.hostAdderSemaphore <- true im.hostAdderSemaphore <- true
i := NewInstance(func(x *instance) { i := newInstance(func(x *instance) {
x.hostname = string(newhn) x.hostname = string(newhn)
}) })
// we do node detection under the addLock to avoid thundering // we do node detection under the addLock to avoid thundering
@ -130,27 +130,28 @@ func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) {
i.detectNodeTypeIfNecessary() i.detectNodeTypeIfNecessary()
// pop an item from the buffered channel // pop an item from the buffered channel
<-self.hostAdderSemaphore <-im.hostAdderSemaphore
// lock the map to insert // lock the map to insert
self.Lock() im.lock()
self.instances[newhn] = i im.instances[newhn] = i
self.Unlock() im.unlock()
} }
func (self *InstanceManager) receiveNewInstanceHostnames() { func (im *InstanceManager) receiveNewInstanceHostnames() {
var newhn InstanceHostname var newhn InstanceHostname
for { for {
newhn = <-self.newInstanceNotifications newhn = <-im.newInstanceNotifications
// receive them fast out of the channel, let the adding function lock to add // receive them fast out of the channel, let the adding function lock to add
// them one at a time // them one at a time, using a bunch of blocked goroutines as our
go self.addInstanceByHostname(newhn) // modification queue
go im.addInstanceByHostname(newhn)
} }
} }
func (self *InstanceManager) logInstanceReport() { func (im *InstanceManager) logInstanceReport() {
r := self.instanceSummaryReport() r := im.instanceSummaryReport()
sublogger := log.With().Logger() sublogger := log.With().Logger()
@ -162,11 +163,11 @@ func (self *InstanceManager) logInstanceReport() {
Msg("instance report") Msg("instance report")
} }
func (self *InstanceManager) listInstances() []*instance { func (im *InstanceManager) listInstances() []*instance {
var out []*instance var out []*instance
self.Lock() im.lock()
defer self.Unlock() defer im.unlock()
for _, v := range self.instances { for _, v := range im.instances {
out = append(out, v) out = append(out, v)
} }
return out return out

View File

@ -2,10 +2,10 @@ package feta
//import "github.com/rs/zerolog/log" //import "github.com/rs/zerolog/log"
type Toot struct { type toot struct {
} }
func NewToot(input []byte) *Toot { func newToot(input []byte) *toot {
t := new(Toot) t := new(toot)
return t return t
} }