From 1c7e2f11e048280d12c01a7174a16c5a14432bb5 Mon Sep 17 00:00:00 2001 From: Jeffrey Paul Date: Tue, 5 Nov 2019 15:32:09 -0800 Subject: [PATCH] latest --- Dockerfile | 1 + Makefile | 12 ++++-- apihandlers.go | 76 +++++++++++++++++++++++++++++++++++ api.go => apiserver.go | 26 +----------- archiver.go | 1 - cmd/feta/identity.go | 36 ----------------- cmd/feta/main.go | 56 ++++---------------------- feta.go | 91 ++++++++++++++++++++++++++++++++++++++++++ instance.go | 59 ++++++++++++++++----------- locator.go | 33 +++++++++++---- manager.go | 68 +++++++++++++++---------------- 11 files changed, 279 insertions(+), 180 deletions(-) create mode 100644 apihandlers.go rename api.go => apiserver.go (67%) delete mode 100644 cmd/feta/identity.go create mode 100644 feta.go diff --git a/Dockerfile b/Dockerfile index 644b94a..0ead377 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,6 +3,7 @@ FROM golang:1.13 as builder WORKDIR /go/src/github.com/sneak/feta COPY . . +#RUN make lint && make build RUN make build WORKDIR /go diff --git a/Makefile b/Makefile index e851db3..0a04f06 100644 --- a/Makefile +++ b/Makefile @@ -25,10 +25,10 @@ ifneq ($(UNAME_S),Darwin) GOFLAGS = -ldflags "-linkmode external -extldflags -static $(GOLDFLAGS)" endif -default: run +default: rundebug rundebug: build - DEBUG=1 ./$(FN) + GOTRACEBACK=all DEBUG=1 ./$(FN) run: build ./$(FN) @@ -38,9 +38,13 @@ clean: build: ./$(FN) +lint: + go get -u golang.org/x/lint/golint + go get -u github.com/GeertJohan/fgt + fgt golint + go-get: go get -v - cd cmd/$(FN) && go get -v ./$(FN): *.go cmd/*/*.go go-get cd cmd/$(FN) && go build -o ../../$(FN) $(GOFLAGS) . @@ -59,7 +63,7 @@ build-docker-image: clean build-docker-image-dist: is_uncommitted clean docker build -t $(IMAGENAME):$(VERSION) -t $(IMAGENAME):latest -t $(IMAGENAME):$(BUILDTIMETAG) . -dist: build-docker-image +dist: lint build-docker-image -mkdir -p ./output docker run --rm --entrypoint cat $(IMAGENAME) /bin/$(FN) > output/$(FN) docker save $(IMAGENAME) | bzip2 > output/$(BUILDTIMEFILENAME).$(FN).tbz2 diff --git a/apihandlers.go b/apihandlers.go new file mode 100644 index 0000000..0c89086 --- /dev/null +++ b/apihandlers.go @@ -0,0 +1,76 @@ +package feta + +import "time" +import "net/http" +import "encoding/json" + +import "github.com/gin-gonic/gin" + +type hash map[string]interface{} + +func (a *TootArchiverAPIServer) instances() []hash { + resp := make([]hash, 0) + now := time.Now() + for _, v := range a.archiver.manager.listInstances() { + i := make(hash) + // FIXME figure out why a very short lock here deadlocks + v.Lock() + i["hostname"] = v.hostname + i["nextCheck"] = v.nextFetch.UTC().Format(time.RFC3339) + i["nextCheckAfter"] = (-1 * now.Sub(v.nextFetch)).String() + i["successCount"] = v.successCount + i["errorCount"] = v.errorCount + i["identified"] = v.identified + i["status"] = v.status + i["software"] = "unknown" + i["version"] = "unknown" + if v.identified { + i["software"] = v.serverImplementationString + i["version"] = v.serverVersionString + } + v.Unlock() + resp = append(resp, i) + } + return resp +} + +func (a *TootArchiverAPIServer) getIndexHandler() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + + index := &gin.H{ + "page": "index", + "instances": a.instances(), + "status": "ok", + "now": time.Now().UTC().Format(time.RFC3339), + "uptime": a.archiver.Uptime().String(), + } + + json, err := json.Marshal(index) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.Write(json) + + } +} + +func (a *TootArchiverAPIServer) getHealthCheckHandler() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + resp := &gin.H{ + "status": "ok", + "now": time.Now().UTC().Format(time.RFC3339), + "uptime": a.archiver.Uptime().String(), + } + + json, err := json.Marshal(resp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(json) + } +} diff --git a/api.go b/apiserver.go similarity index 67% rename from api.go rename to apiserver.go index c31cb11..0c9e7ff 100644 --- a/api.go +++ b/apiserver.go @@ -43,30 +43,8 @@ func (a *TootArchiverAPIServer) getRouter() *gin.Engine { // attach logger middleware r.Use(ginzerolog.Logger("gin")) - r.GET("/.well-known/healthcheck.json", func(c *gin.Context) { - c.JSON(200, gin.H{ - "status": "ok", - "now": time.Now().UTC().Format(time.RFC3339), - "uptime": a.archiver.Uptime().String(), - }) - }) - - r.GET("/", func(c *gin.Context) { - ir := a.archiver.manager.instanceSummaryReport() - - il := a.archiver.manager.instanceListForApi() - c.JSON(200, gin.H{ - "status": "ok", - "now": time.Now().UTC().Format(time.RFC3339), - "uptime": a.archiver.Uptime().String(), - "instanceSummary": gin.H{ - "total": ir.total, - "up": ir.up, - "identified": ir.identified, - }, - "instanceList": il, - }) - }) + r.GET("/.well-known/healthcheck.json", gin.WrapF(a.getHealthCheckHandler())) + r.GET("/", gin.WrapF(a.getIndexHandler())) return r } diff --git a/archiver.go b/archiver.go index 3525845..11ca0b9 100644 --- a/archiver.go +++ b/archiver.go @@ -26,7 +26,6 @@ func (a *TootArchiver) RunForever() { newInstanceHostnameNotifications := make(chan InstanceHostname, 10000) a.locator = NewInstanceLocator() - a.manager = NewInstanceManager() a.locator.AddInstanceNotificationChannel(newInstanceHostnameNotifications) diff --git a/cmd/feta/identity.go b/cmd/feta/identity.go deleted file mode 100644 index 964dd11..0000000 --- a/cmd/feta/identity.go +++ /dev/null @@ -1,36 +0,0 @@ -package main - -import ( - "github.com/rs/zerolog/log" -) - -var Version string -var Buildtime string -var Builduser string -var Buildarch string - -type AppIdentity struct { - Version string - Buildtime string - Builduser string - Buildarch string -} - -func GetAppIdentity() *AppIdentity { - i := new(AppIdentity) - i.Version = Version - i.Buildtime = Buildtime - i.Builduser = Builduser - i.Buildarch = Buildarch - return i -} - -func identify() { - i := GetAppIdentity() - log.Info(). - Str("version", i.Version). - Str("buildarch", i.Buildarch). - Str("buildtime", i.Buildtime). - Str("builduser", i.Builduser). - Msg("starting") -} diff --git a/cmd/feta/main.go b/cmd/feta/main.go index 69873a9..2ec65e9 100644 --- a/cmd/feta/main.go +++ b/cmd/feta/main.go @@ -1,57 +1,15 @@ package main import "os" -import "sync" -import "time" - -import "github.com/rs/zerolog" -import "github.com/rs/zerolog/log" -import "golang.org/x/crypto/ssh/terminal" import "github.com/sneak/feta" +// these are filled in at link-time by the build scripts +var Version string +var Buildtime string +var Builduser string +var Buildarch string + func main() { - os.Exit(app()) -} - -func app() int { - log.Logger = log.With().Caller().Logger() - if terminal.IsTerminal(int(os.Stdout.Fd())) { - log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) - } - - identify() - - // always log in UTC - zerolog.TimestampFunc = func() time.Time { - return time.Now().UTC() - } - - zerolog.SetGlobalLevel(zerolog.InfoLevel) - if os.Getenv("DEBUG") != "" { - zerolog.SetGlobalLevel(zerolog.DebugLevel) - } - - archiver := feta.NewTootArchiver() - - api := new(feta.TootArchiverAPIServer) - api.SetArchiver(archiver) - - var wg sync.WaitGroup - - // start api webserver goroutine - wg.Add(1) - go func() { - api.Serve() - wg.Done() - }() - - wg.Add(1) - go func() { - archiver.RunForever() - wg.Done() - }() - - wg.Wait() - return 0 + os.Exit(feta.CLIEntry(Version, Buildtime, Builduser, Buildarch)) } diff --git a/feta.go b/feta.go new file mode 100644 index 0000000..1fad0cd --- /dev/null +++ b/feta.go @@ -0,0 +1,91 @@ +package feta + +import "os" + +//import "os/signal" +import "sync" + +//import "syscall" +import "time" + +import "github.com/rs/zerolog" +import "github.com/rs/zerolog/log" +import "golang.org/x/crypto/ssh/terminal" + +func CLIEntry(version string, buildtime string, buildarch string, builduser string) int { + f := new(FetaProcess) + f.version = version + f.buildtime = buildtime + f.buildarch = buildarch + f.builduser = builduser + f.setupLogging() + return f.runForever() +} + +// FetaProcess is the main structure/process of this app +type FetaProcess struct { + version string + buildtime string + buildarch string + builduser string + archiver *TootArchiver + api *TootArchiverAPIServer + wg *sync.WaitGroup + //quit chan os.Signal +} + +func (f *FetaProcess) identify() { + log.Info(). + Str("version", f.version). + Str("buildtime", f.buildtime). + Str("buildarch", f.buildarch). + Str("builduser", f.builduser). + Msg("starting") +} + +func (f *FetaProcess) setupLogging() { + log.Logger = log.With().Caller().Logger() + if terminal.IsTerminal(int(os.Stdout.Fd())) { + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) + } + + // always log in UTC + zerolog.TimestampFunc = func() time.Time { + return time.Now().UTC() + } + + zerolog.SetGlobalLevel(zerolog.InfoLevel) + if os.Getenv("DEBUG") != "" { + zerolog.SetGlobalLevel(zerolog.DebugLevel) + } + + f.identify() +} + +func (f *FetaProcess) runForever() int { + f.archiver = NewTootArchiver() + + f.api = new(TootArchiverAPIServer) + f.api.SetArchiver(f.archiver) + + //FIXME(sneak) get this channel into places that need to be shut down + //f.quit = make(chan os.Signal) + //signal.Notify(f.quit, syscall.SIGINT, syscall.SIGTERM) + + f.wg = new(sync.WaitGroup) + // start api webserver goroutine + f.wg.Add(1) + go func() { + f.api.Serve() + f.wg.Done() + }() + + f.wg.Add(1) + go func() { + f.archiver.RunForever() + f.wg.Done() + }() + + f.wg.Wait() + return 0 +} diff --git a/instance.go b/instance.go index 2608593..5e96bd3 100644 --- a/instance.go +++ b/instance.go @@ -14,6 +14,8 @@ import "github.com/rs/zerolog/log" const NodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0" +const INSTANCE_NODEINFO_TIMEOUT = time.Second * 5 + const INSTANCE_HTTP_TIMEOUT = time.Second * 60 const INSTANCE_SPIDER_INTERVAL = time.Second * 60 @@ -40,25 +42,27 @@ const ( type Instance struct { sync.RWMutex - errorCount uint - successCount uint - highestId int - hostname string - identified bool - fetching bool - impl InstanceImplementation - backend *InstanceBackend - status InstanceStatus - nextFetch time.Time - nodeInfoUrl string - serverVersion string + errorCount uint + successCount uint + highestId int + hostname string + identified bool + fetching bool + implementation InstanceImplementation + backend *InstanceBackend + status InstanceStatus + nextFetch time.Time + nodeInfoUrl string + serverVersionString string + serverImplementationString string + fetchingLock sync.Mutex } func NewInstance(hostname InstanceHostname) *Instance { self := new(Instance) self.hostname = string(hostname) self.status = InstanceStatusUnknown - self.nextFetch = time.Now().Add(-1 * time.Second) + self.setNextFetchAfter(1 * time.Second) return self } @@ -75,6 +79,9 @@ func (self *Instance) setNextFetchAfter(d time.Duration) { } func (self *Instance) Fetch() { + self.fetchingLock.Lock() + defer self.fetchingLock.Unlock() + err := self.detectNodeTypeIfNecessary() if err != nil { self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) @@ -85,8 +92,8 @@ func (self *Instance) Fetch() { return } - //self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) - log.Info().Msgf("i (%s) should check for toots", self.hostname) + self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) + //log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", self.hostname) } func (self *Instance) dueForFetch() bool { @@ -99,7 +106,7 @@ func (self *Instance) dueForFetch() bool { func (self *Instance) nodeIdentified() bool { self.RLock() defer self.RUnlock() - if self.impl > Unknown { + if self.implementation > Unknown { return true } return false @@ -127,7 +134,7 @@ func (self *Instance) registerSuccess() { self.successCount++ } -func (self *Instance) ApiReport() *gin.H { +func (self *Instance) APIReport() *gin.H { r := gin.H{} return &r } @@ -141,7 +148,7 @@ func (i *Instance) Up() bool { func (i *Instance) fetchNodeInfoURL() error { url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname) var c = &http.Client{ - Timeout: INSTANCE_HTTP_TIMEOUT, + Timeout: INSTANCE_NODEINFO_TIMEOUT, } log.Debug(). @@ -217,7 +224,7 @@ func (i *Instance) fetchNodeInfo() error { } var c = &http.Client{ - Timeout: INSTANCE_HTTP_TIMEOUT, + Timeout: INSTANCE_NODEINFO_TIMEOUT, } //FIXME make sure the nodeinfourl is on the same domain as the instance @@ -268,8 +275,8 @@ func (i *Instance) fetchNodeInfo() error { Msg("received nodeinfo from instance") i.Lock() - defer i.Unlock() - i.serverVersion = ni.Software.Version + i.serverVersionString = ni.Software.Version + i.serverImplementationString = ni.Software.Name ni.Software.Name = strings.ToLower(ni.Software.Name) @@ -278,22 +285,26 @@ func (i *Instance) fetchNodeInfo() error { Str("hostname", i.hostname). Str("software", ni.Software.Name). Msg("detected server software") - i.registerSuccess() i.identified = true - i.impl = Pleroma + i.implementation = Pleroma i.status = InstanceStatusIdentified + i.Unlock() + i.registerSuccess() return nil } else if ni.Software.Name == "mastodon" { i.registerSuccess() i.identified = true - i.impl = Mastodon + i.implementation = Mastodon i.status = InstanceStatusIdentified + i.Unlock() + i.registerSuccess() return nil } else { log.Error(). Str("hostname", i.hostname). Str("software", ni.Software.Name). Msg("FIXME unknown server implementation") + i.Unlock() i.registerError() return errors.New("FIXME unknown server implementation") } diff --git a/locator.go b/locator.go index b1b6886..14c5389 100644 --- a/locator.go +++ b/locator.go @@ -12,12 +12,16 @@ const INDEX_API_TIMEOUT = time.Second * 60 var USER_AGENT = "https://github.com/sneak/feta indexer bot; sneak@sneak.berlin for feedback" -// check with indices only hourly +// INDEX_CHECK_INTERVAL defines the interval for downloading new lists from +// the index APIs run by mastodon/pleroma (default: 1h) var INDEX_CHECK_INTERVAL = time.Second * 60 * 60 -// check with indices after 10 mins if they failed +// INDEX_ERROR_INTERVAL is used for when the index fetch/parse fails +// (default: 10m) var INDEX_ERROR_INTERVAL = time.Second * 60 * 10 +// LOG_REPORT_INTERVAL defines how long between logging internal +// stats/reporting for user supervision var LOG_REPORT_INTERVAL = time.Second * 60 const mastodonIndexUrl = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false" @@ -26,6 +30,8 @@ const pleromaIndexUrl = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api type InstanceLocator struct { pleromaIndexNextRefresh *time.Time mastodonIndexNextRefresh *time.Time + mastodonIndexFetchLock sync.Mutex + pleromaIndexFetchLock sync.Mutex reportInstanceVia chan InstanceHostname sync.Mutex } @@ -55,12 +61,20 @@ func (self *InstanceLocator) Locate() { x := time.Now() for { log.Info().Msg("InstanceLocator tick") - if self.pleromaIndexNextRefresh.Before(time.Now()) { - self.locatePleroma() - } - if self.mastodonIndexNextRefresh.Before(time.Now()) { - self.locateMastodon() - } + go func() { + self.pleromaIndexFetchLock.Lock() + if self.pleromaIndexNextRefresh.Before(time.Now()) { + self.locatePleroma() + } + self.pleromaIndexFetchLock.Unlock() + }() + go func() { + self.mastodonIndexFetchLock.Lock() + if self.mastodonIndexNextRefresh.Before(time.Now()) { + self.locateMastodon() + } + self.mastodonIndexFetchLock.Unlock() + }() time.Sleep(1 * time.Second) if time.Now().After(x.Add(LOG_REPORT_INTERVAL)) { x = time.Now() @@ -95,6 +109,9 @@ func (self *InstanceLocator) locateMastodon() { self.mastodonIndexNextRefresh = &t self.Unlock() return + } else { + log.Info(). + Msg("fetched mastodon index") } defer resp.Body.Close() diff --git a/manager.go b/manager.go index 5211dac..615ac4d 100644 --- a/manager.go +++ b/manager.go @@ -5,7 +5,7 @@ import "time" import "fmt" import "runtime" -import "github.com/gin-gonic/gin" +//import "github.com/gin-gonic/gin" import "github.com/rs/zerolog/log" type InstanceBackend interface { @@ -17,6 +17,7 @@ type InstanceManager struct { instances map[InstanceHostname]*Instance newInstanceNotifications chan InstanceHostname startup time.Time + addLock sync.Mutex } func NewInstanceManager() *InstanceManager { @@ -50,14 +51,14 @@ func (self *InstanceManager) logCaller(msg string) { } func (self *InstanceManager) Lock() { - self.logCaller("instancemanager attempting to lock") + //self.logCaller("instancemanager attempting to lock") self.mu.Lock() - self.logCaller("instancemanager locked") + //self.logCaller("instancemanager locked") } func (self *InstanceManager) Unlock() { self.mu.Unlock() - self.logCaller("instancemanager unlocked") + //self.logCaller("instancemanager unlocked") } func (self *InstanceManager) AddInstanceNotificationChannel(via chan InstanceHostname) { @@ -67,12 +68,10 @@ func (self *InstanceManager) AddInstanceNotificationChannel(via chan InstanceHos } func (self *InstanceManager) Manage() { - self.managerInfiniteLoop() -} - -func (self *InstanceManager) managerInfiniteLoop() { log.Info().Msg("InstanceManager starting") - go self.receiveNewInstanceHostnames() + go func() { + self.receiveNewInstanceHostnames() + }() self.startup = time.Now() for { log.Info().Msg("InstanceManager tick") @@ -83,16 +82,20 @@ func (self *InstanceManager) managerInfiniteLoop() { func (self *InstanceManager) managerLoop() { self.Lock() - defer self.Unlock() + il := make([]*Instance, 0) for _, v := range self.instances { - // wrap in a new goroutine because this needs to iterate - // fast and unlock fast - go func() { - if v.dueForFetch() { - go v.Fetch() - } - }() + il = append(il, v) } + self.Unlock() + + for _, v := range il { + if v.dueForFetch() { + go func(i *Instance) { + i.Fetch() + }(v) + } + } + } func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool { @@ -107,11 +110,19 @@ func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool { } func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { - // only add it if we haven't seen the hostname before + // we do these one at a time + self.addLock.Lock() + defer self.addLock.Unlock() + if self.hostnameExists(newhn) { return } + i := NewInstance(newhn) + // we do node detection under the addLock to avoid thundering + // on startup + i.detectNodeTypeIfNecessary() + self.Lock() defer self.Unlock() self.instances[newhn] = i @@ -121,7 +132,11 @@ func (self *InstanceManager) receiveNewInstanceHostnames() { var newhn InstanceHostname for { newhn = <-self.newInstanceNotifications - self.addInstanceByHostname(newhn) + // receive them fast out of the channel, let the adding function lock to add + // them one at a time + go func() { + self.addInstanceByHostname(newhn) + }() } } @@ -166,21 +181,6 @@ func (self *InstanceManager) listInstances() []*Instance { return out } -func (self *InstanceManager) instanceListForApi() []*gin.H { - var output []*gin.H - - l := self.listInstances() - for _, v := range l { - id := &gin.H{ - "hostname": v.hostname, - "up": v.Up(), - "nextFetch": string(time.Now().Sub(v.nextFetch)), - } - output = append(output, id) - } - return output -} - func (self *InstanceManager) instanceSummaryReport() *InstanceSummaryReport { self.Lock() defer self.Unlock()