This commit is contained in:
Jeffrey Paul 2019-11-05 15:32:09 -08:00
parent d33f093ab5
commit 1c7e2f11e0
11 changed files with 279 additions and 180 deletions

View File

@ -3,6 +3,7 @@ FROM golang:1.13 as builder
WORKDIR /go/src/github.com/sneak/feta
COPY . .
#RUN make lint && make build
RUN make build
WORKDIR /go

View File

@ -25,10 +25,10 @@ ifneq ($(UNAME_S),Darwin)
GOFLAGS = -ldflags "-linkmode external -extldflags -static $(GOLDFLAGS)"
endif
default: run
default: rundebug
rundebug: build
DEBUG=1 ./$(FN)
GOTRACEBACK=all DEBUG=1 ./$(FN)
run: build
./$(FN)
@ -38,9 +38,13 @@ clean:
build: ./$(FN)
lint:
go get -u golang.org/x/lint/golint
go get -u github.com/GeertJohan/fgt
fgt golint
go-get:
go get -v
cd cmd/$(FN) && go get -v
./$(FN): *.go cmd/*/*.go go-get
cd cmd/$(FN) && go build -o ../../$(FN) $(GOFLAGS) .
@ -59,7 +63,7 @@ build-docker-image: clean
build-docker-image-dist: is_uncommitted clean
docker build -t $(IMAGENAME):$(VERSION) -t $(IMAGENAME):latest -t $(IMAGENAME):$(BUILDTIMETAG) .
dist: build-docker-image
dist: lint build-docker-image
-mkdir -p ./output
docker run --rm --entrypoint cat $(IMAGENAME) /bin/$(FN) > output/$(FN)
docker save $(IMAGENAME) | bzip2 > output/$(BUILDTIMEFILENAME).$(FN).tbz2

76
apihandlers.go Normal file
View File

@ -0,0 +1,76 @@
package feta
import "time"
import "net/http"
import "encoding/json"
import "github.com/gin-gonic/gin"
type hash map[string]interface{}
func (a *TootArchiverAPIServer) instances() []hash {
resp := make([]hash, 0)
now := time.Now()
for _, v := range a.archiver.manager.listInstances() {
i := make(hash)
// FIXME figure out why a very short lock here deadlocks
v.Lock()
i["hostname"] = v.hostname
i["nextCheck"] = v.nextFetch.UTC().Format(time.RFC3339)
i["nextCheckAfter"] = (-1 * now.Sub(v.nextFetch)).String()
i["successCount"] = v.successCount
i["errorCount"] = v.errorCount
i["identified"] = v.identified
i["status"] = v.status
i["software"] = "unknown"
i["version"] = "unknown"
if v.identified {
i["software"] = v.serverImplementationString
i["version"] = v.serverVersionString
}
v.Unlock()
resp = append(resp, i)
}
return resp
}
func (a *TootArchiverAPIServer) getIndexHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
index := &gin.H{
"page": "index",
"instances": a.instances(),
"status": "ok",
"now": time.Now().UTC().Format(time.RFC3339),
"uptime": a.archiver.Uptime().String(),
}
json, err := json.Marshal(index)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(json)
}
}
func (a *TootArchiverAPIServer) getHealthCheckHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
resp := &gin.H{
"status": "ok",
"now": time.Now().UTC().Format(time.RFC3339),
"uptime": a.archiver.Uptime().String(),
}
json, err := json.Marshal(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(json)
}
}

View File

@ -43,30 +43,8 @@ func (a *TootArchiverAPIServer) getRouter() *gin.Engine {
// attach logger middleware
r.Use(ginzerolog.Logger("gin"))
r.GET("/.well-known/healthcheck.json", func(c *gin.Context) {
c.JSON(200, gin.H{
"status": "ok",
"now": time.Now().UTC().Format(time.RFC3339),
"uptime": a.archiver.Uptime().String(),
})
})
r.GET("/", func(c *gin.Context) {
ir := a.archiver.manager.instanceSummaryReport()
il := a.archiver.manager.instanceListForApi()
c.JSON(200, gin.H{
"status": "ok",
"now": time.Now().UTC().Format(time.RFC3339),
"uptime": a.archiver.Uptime().String(),
"instanceSummary": gin.H{
"total": ir.total,
"up": ir.up,
"identified": ir.identified,
},
"instanceList": il,
})
})
r.GET("/.well-known/healthcheck.json", gin.WrapF(a.getHealthCheckHandler()))
r.GET("/", gin.WrapF(a.getIndexHandler()))
return r
}

View File

@ -26,7 +26,6 @@ func (a *TootArchiver) RunForever() {
newInstanceHostnameNotifications := make(chan InstanceHostname, 10000)
a.locator = NewInstanceLocator()
a.manager = NewInstanceManager()
a.locator.AddInstanceNotificationChannel(newInstanceHostnameNotifications)

View File

@ -1,36 +0,0 @@
package main
import (
"github.com/rs/zerolog/log"
)
var Version string
var Buildtime string
var Builduser string
var Buildarch string
type AppIdentity struct {
Version string
Buildtime string
Builduser string
Buildarch string
}
func GetAppIdentity() *AppIdentity {
i := new(AppIdentity)
i.Version = Version
i.Buildtime = Buildtime
i.Builduser = Builduser
i.Buildarch = Buildarch
return i
}
func identify() {
i := GetAppIdentity()
log.Info().
Str("version", i.Version).
Str("buildarch", i.Buildarch).
Str("buildtime", i.Buildtime).
Str("builduser", i.Builduser).
Msg("starting")
}

View File

@ -1,57 +1,15 @@
package main
import "os"
import "sync"
import "time"
import "github.com/rs/zerolog"
import "github.com/rs/zerolog/log"
import "golang.org/x/crypto/ssh/terminal"
import "github.com/sneak/feta"
// these are filled in at link-time by the build scripts
var Version string
var Buildtime string
var Builduser string
var Buildarch string
func main() {
os.Exit(app())
}
func app() int {
log.Logger = log.With().Caller().Logger()
if terminal.IsTerminal(int(os.Stdout.Fd())) {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
}
identify()
// always log in UTC
zerolog.TimestampFunc = func() time.Time {
return time.Now().UTC()
}
zerolog.SetGlobalLevel(zerolog.InfoLevel)
if os.Getenv("DEBUG") != "" {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
}
archiver := feta.NewTootArchiver()
api := new(feta.TootArchiverAPIServer)
api.SetArchiver(archiver)
var wg sync.WaitGroup
// start api webserver goroutine
wg.Add(1)
go func() {
api.Serve()
wg.Done()
}()
wg.Add(1)
go func() {
archiver.RunForever()
wg.Done()
}()
wg.Wait()
return 0
os.Exit(feta.CLIEntry(Version, Buildtime, Builduser, Buildarch))
}

91
feta.go Normal file
View File

@ -0,0 +1,91 @@
package feta
import "os"
//import "os/signal"
import "sync"
//import "syscall"
import "time"
import "github.com/rs/zerolog"
import "github.com/rs/zerolog/log"
import "golang.org/x/crypto/ssh/terminal"
func CLIEntry(version string, buildtime string, buildarch string, builduser string) int {
f := new(FetaProcess)
f.version = version
f.buildtime = buildtime
f.buildarch = buildarch
f.builduser = builduser
f.setupLogging()
return f.runForever()
}
// FetaProcess is the main structure/process of this app
type FetaProcess struct {
version string
buildtime string
buildarch string
builduser string
archiver *TootArchiver
api *TootArchiverAPIServer
wg *sync.WaitGroup
//quit chan os.Signal
}
func (f *FetaProcess) identify() {
log.Info().
Str("version", f.version).
Str("buildtime", f.buildtime).
Str("buildarch", f.buildarch).
Str("builduser", f.builduser).
Msg("starting")
}
func (f *FetaProcess) setupLogging() {
log.Logger = log.With().Caller().Logger()
if terminal.IsTerminal(int(os.Stdout.Fd())) {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
}
// always log in UTC
zerolog.TimestampFunc = func() time.Time {
return time.Now().UTC()
}
zerolog.SetGlobalLevel(zerolog.InfoLevel)
if os.Getenv("DEBUG") != "" {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
}
f.identify()
}
func (f *FetaProcess) runForever() int {
f.archiver = NewTootArchiver()
f.api = new(TootArchiverAPIServer)
f.api.SetArchiver(f.archiver)
//FIXME(sneak) get this channel into places that need to be shut down
//f.quit = make(chan os.Signal)
//signal.Notify(f.quit, syscall.SIGINT, syscall.SIGTERM)
f.wg = new(sync.WaitGroup)
// start api webserver goroutine
f.wg.Add(1)
go func() {
f.api.Serve()
f.wg.Done()
}()
f.wg.Add(1)
go func() {
f.archiver.RunForever()
f.wg.Done()
}()
f.wg.Wait()
return 0
}

View File

@ -14,6 +14,8 @@ import "github.com/rs/zerolog/log"
const NodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0"
const INSTANCE_NODEINFO_TIMEOUT = time.Second * 5
const INSTANCE_HTTP_TIMEOUT = time.Second * 60
const INSTANCE_SPIDER_INTERVAL = time.Second * 60
@ -40,25 +42,27 @@ const (
type Instance struct {
sync.RWMutex
errorCount uint
successCount uint
highestId int
hostname string
identified bool
fetching bool
impl InstanceImplementation
backend *InstanceBackend
status InstanceStatus
nextFetch time.Time
nodeInfoUrl string
serverVersion string
errorCount uint
successCount uint
highestId int
hostname string
identified bool
fetching bool
implementation InstanceImplementation
backend *InstanceBackend
status InstanceStatus
nextFetch time.Time
nodeInfoUrl string
serverVersionString string
serverImplementationString string
fetchingLock sync.Mutex
}
func NewInstance(hostname InstanceHostname) *Instance {
self := new(Instance)
self.hostname = string(hostname)
self.status = InstanceStatusUnknown
self.nextFetch = time.Now().Add(-1 * time.Second)
self.setNextFetchAfter(1 * time.Second)
return self
}
@ -75,6 +79,9 @@ func (self *Instance) setNextFetchAfter(d time.Duration) {
}
func (self *Instance) Fetch() {
self.fetchingLock.Lock()
defer self.fetchingLock.Unlock()
err := self.detectNodeTypeIfNecessary()
if err != nil {
self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL)
@ -85,8 +92,8 @@ func (self *Instance) Fetch() {
return
}
//self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL)
log.Info().Msgf("i (%s) should check for toots", self.hostname)
self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL)
//log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", self.hostname)
}
func (self *Instance) dueForFetch() bool {
@ -99,7 +106,7 @@ func (self *Instance) dueForFetch() bool {
func (self *Instance) nodeIdentified() bool {
self.RLock()
defer self.RUnlock()
if self.impl > Unknown {
if self.implementation > Unknown {
return true
}
return false
@ -127,7 +134,7 @@ func (self *Instance) registerSuccess() {
self.successCount++
}
func (self *Instance) ApiReport() *gin.H {
func (self *Instance) APIReport() *gin.H {
r := gin.H{}
return &r
}
@ -141,7 +148,7 @@ func (i *Instance) Up() bool {
func (i *Instance) fetchNodeInfoURL() error {
url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname)
var c = &http.Client{
Timeout: INSTANCE_HTTP_TIMEOUT,
Timeout: INSTANCE_NODEINFO_TIMEOUT,
}
log.Debug().
@ -217,7 +224,7 @@ func (i *Instance) fetchNodeInfo() error {
}
var c = &http.Client{
Timeout: INSTANCE_HTTP_TIMEOUT,
Timeout: INSTANCE_NODEINFO_TIMEOUT,
}
//FIXME make sure the nodeinfourl is on the same domain as the instance
@ -268,8 +275,8 @@ func (i *Instance) fetchNodeInfo() error {
Msg("received nodeinfo from instance")
i.Lock()
defer i.Unlock()
i.serverVersion = ni.Software.Version
i.serverVersionString = ni.Software.Version
i.serverImplementationString = ni.Software.Name
ni.Software.Name = strings.ToLower(ni.Software.Name)
@ -278,22 +285,26 @@ func (i *Instance) fetchNodeInfo() error {
Str("hostname", i.hostname).
Str("software", ni.Software.Name).
Msg("detected server software")
i.registerSuccess()
i.identified = true
i.impl = Pleroma
i.implementation = Pleroma
i.status = InstanceStatusIdentified
i.Unlock()
i.registerSuccess()
return nil
} else if ni.Software.Name == "mastodon" {
i.registerSuccess()
i.identified = true
i.impl = Mastodon
i.implementation = Mastodon
i.status = InstanceStatusIdentified
i.Unlock()
i.registerSuccess()
return nil
} else {
log.Error().
Str("hostname", i.hostname).
Str("software", ni.Software.Name).
Msg("FIXME unknown server implementation")
i.Unlock()
i.registerError()
return errors.New("FIXME unknown server implementation")
}

View File

@ -12,12 +12,16 @@ const INDEX_API_TIMEOUT = time.Second * 60
var USER_AGENT = "https://github.com/sneak/feta indexer bot; sneak@sneak.berlin for feedback"
// check with indices only hourly
// INDEX_CHECK_INTERVAL defines the interval for downloading new lists from
// the index APIs run by mastodon/pleroma (default: 1h)
var INDEX_CHECK_INTERVAL = time.Second * 60 * 60
// check with indices after 10 mins if they failed
// INDEX_ERROR_INTERVAL is used for when the index fetch/parse fails
// (default: 10m)
var INDEX_ERROR_INTERVAL = time.Second * 60 * 10
// LOG_REPORT_INTERVAL defines how long between logging internal
// stats/reporting for user supervision
var LOG_REPORT_INTERVAL = time.Second * 60
const mastodonIndexUrl = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false"
@ -26,6 +30,8 @@ const pleromaIndexUrl = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api
type InstanceLocator struct {
pleromaIndexNextRefresh *time.Time
mastodonIndexNextRefresh *time.Time
mastodonIndexFetchLock sync.Mutex
pleromaIndexFetchLock sync.Mutex
reportInstanceVia chan InstanceHostname
sync.Mutex
}
@ -55,12 +61,20 @@ func (self *InstanceLocator) Locate() {
x := time.Now()
for {
log.Info().Msg("InstanceLocator tick")
if self.pleromaIndexNextRefresh.Before(time.Now()) {
self.locatePleroma()
}
if self.mastodonIndexNextRefresh.Before(time.Now()) {
self.locateMastodon()
}
go func() {
self.pleromaIndexFetchLock.Lock()
if self.pleromaIndexNextRefresh.Before(time.Now()) {
self.locatePleroma()
}
self.pleromaIndexFetchLock.Unlock()
}()
go func() {
self.mastodonIndexFetchLock.Lock()
if self.mastodonIndexNextRefresh.Before(time.Now()) {
self.locateMastodon()
}
self.mastodonIndexFetchLock.Unlock()
}()
time.Sleep(1 * time.Second)
if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) {
x = time.Now()
@ -95,6 +109,9 @@ func (self *InstanceLocator) locateMastodon() {
self.mastodonIndexNextRefresh = &t
self.Unlock()
return
} else {
log.Info().
Msg("fetched mastodon index")
}
defer resp.Body.Close()

View File

@ -5,7 +5,7 @@ import "time"
import "fmt"
import "runtime"
import "github.com/gin-gonic/gin"
//import "github.com/gin-gonic/gin"
import "github.com/rs/zerolog/log"
type InstanceBackend interface {
@ -17,6 +17,7 @@ type InstanceManager struct {
instances map[InstanceHostname]*Instance
newInstanceNotifications chan InstanceHostname
startup time.Time
addLock sync.Mutex
}
func NewInstanceManager() *InstanceManager {
@ -50,14 +51,14 @@ func (self *InstanceManager) logCaller(msg string) {
}
func (self *InstanceManager) Lock() {
self.logCaller("instancemanager attempting to lock")
//self.logCaller("instancemanager attempting to lock")
self.mu.Lock()
self.logCaller("instancemanager locked")
//self.logCaller("instancemanager locked")
}
func (self *InstanceManager) Unlock() {
self.mu.Unlock()
self.logCaller("instancemanager unlocked")
//self.logCaller("instancemanager unlocked")
}
func (self *InstanceManager) AddInstanceNotificationChannel(via chan InstanceHostname) {
@ -67,12 +68,10 @@ func (self *InstanceManager) AddInstanceNotificationChannel(via chan InstanceHos
}
func (self *InstanceManager) Manage() {
self.managerInfiniteLoop()
}
func (self *InstanceManager) managerInfiniteLoop() {
log.Info().Msg("InstanceManager starting")
go self.receiveNewInstanceHostnames()
go func() {
self.receiveNewInstanceHostnames()
}()
self.startup = time.Now()
for {
log.Info().Msg("InstanceManager tick")
@ -83,16 +82,20 @@ func (self *InstanceManager) managerInfiniteLoop() {
func (self *InstanceManager) managerLoop() {
self.Lock()
defer self.Unlock()
il := make([]*Instance, 0)
for _, v := range self.instances {
// wrap in a new goroutine because this needs to iterate
// fast and unlock fast
go func() {
if v.dueForFetch() {
go v.Fetch()
}
}()
il = append(il, v)
}
self.Unlock()
for _, v := range il {
if v.dueForFetch() {
go func(i *Instance) {
i.Fetch()
}(v)
}
}
}
func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool {
@ -107,11 +110,19 @@ func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool {
}
func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) {
// only add it if we haven't seen the hostname before
// we do these one at a time
self.addLock.Lock()
defer self.addLock.Unlock()
if self.hostnameExists(newhn) {
return
}
i := NewInstance(newhn)
// we do node detection under the addLock to avoid thundering
// on startup
i.detectNodeTypeIfNecessary()
self.Lock()
defer self.Unlock()
self.instances[newhn] = i
@ -121,7 +132,11 @@ func (self *InstanceManager) receiveNewInstanceHostnames() {
var newhn InstanceHostname
for {
newhn = <-self.newInstanceNotifications
self.addInstanceByHostname(newhn)
// receive them fast out of the channel, let the adding function lock to add
// them one at a time
go func() {
self.addInstanceByHostname(newhn)
}()
}
}
@ -166,21 +181,6 @@ func (self *InstanceManager) listInstances() []*Instance {
return out
}
func (self *InstanceManager) instanceListForApi() []*gin.H {
var output []*gin.H
l := self.listInstances()
for _, v := range l {
id := &gin.H{
"hostname": v.hostname,
"up": v.Up(),
"nextFetch": string(time.Now().Sub(v.nextFetch)),
}
output = append(output, id)
}
return output
}
func (self *InstanceManager) instanceSummaryReport() *InstanceSummaryReport {
self.Lock()
defer self.Unlock()