diff --git a/Makefile b/Makefile index a3bef2c..d778d9b 100644 --- a/Makefile +++ b/Makefile @@ -12,8 +12,6 @@ IMAGENAME := sneak/$(FN) UNAME_S := $(shell uname -s) GOLDFLAGS += -X main.Version=$(VERSION) -GOLDFLAGS += -X main.Buildtime=$(BUILDTIME) -GOLDFLAGS += -X main.Builduser=$(BUILDUSER)@$(BUILDHOST) GOLDFLAGS += -X main.Buildarch=$(BUILDARCH) # osx can't statically link apparently?! @@ -39,21 +37,21 @@ clean: build: ./$(FN) .lintsetup: - go get -u golang.org/x/lint/golint + go get -v -u golang.org/x/lint/golint go get -u github.com/GeertJohan/fgt touch .lintsetup -lint: .lintsetup - fgt golint +lint: fmt .lintsetup + fgt golint ./... go-get: - go get -v + cd cmd/$(FN) && go get -v -./$(FN): *.go cmd/*/*.go go-get +./$(FN): */*.go cmd/*/*.go go-get cd cmd/$(FN) && go build -o ../../$(FN) $(GOFLAGS) . fmt: - go fmt *.go + gofmt -s -w . test: lint build-docker-image diff --git a/README.md b/README.md index 4ce570c..ae85347 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,27 @@ archives the fediverse [![CircleCI](https://circleci.com/gh/sneak/feta.svg?style=svg)](https://circleci.com/gh/sneak/feta) -# author +# ethics statement + +It seems that some splinter groups are not well acquainted with the norms of +publishing data on the web. + +Publishing your toots/messages on a server without marking them private or +requiring authentication and thus making them available to the web is an act +of affirmative consent to allowing others to download those toots/messages +(usually by viewing them in a browser on your profile page). If you don't +want your toots downloaded by remote/unauthenticated users on the web, do +not publish them to the web. + +If you publish them to the whole web (and your home instance serves them to +all comers), do not be surprised or feel violated when people download (and +optionally save) them, as your home instance permits them to. + +We do not have a right to be forgotten, as we do not have a right to delete +legitimately-obtained files from the hard drives of other people. + +# Author Jeffrey Paul <[sneak@sneak.berlin](mailto:sneak@sneak.berlin)> +[@sneak@sneak.berlin](https://s.sneak.berlin/@sneak) diff --git a/cmd/feta/main.go b/cmd/feta/main.go index 0826e32..5f40b5f 100644 --- a/cmd/feta/main.go +++ b/cmd/feta/main.go @@ -2,14 +2,16 @@ package main import "os" -import "github.com/sneak/feta" +import "github.com/sneak/feta/process" // these are filled in at link-time by the build scripts + +// Version is the git version of the app var Version string -var Buildtime string -var Builduser string + +// Buildarch contains the architecture it is compiled for var Buildarch string func main() { - os.Exit(feta.CLIEntry(Version, Buildtime, Buildarch, Builduser)) + os.Exit(process.CLIEntry(Version, Buildarch)) } diff --git a/config.go b/config.go new file mode 100644 index 0000000..746b5a7 --- /dev/null +++ b/config.go @@ -0,0 +1,19 @@ +package feta + +import "time" + +// FIXME this should use viper or something + +// Config stores the configuration for the feta process +type Config struct { + LogReportInterval time.Duration + FSStorageLocation string +} + +// GetConfig returns the config +func GetConfig() *Config { + c := new(Config) + c.LogReportInterval = time.Second * 10 + c.FSStorageLocation = "/home/sneak/Library/ApplicationSupport/feta/tootarchive" + return c +} diff --git a/dbmodel.go b/db/dbmodel.go similarity index 69% rename from dbmodel.go rename to db/dbmodel.go index d025a58..2201f43 100644 --- a/dbmodel.go +++ b/db/dbmodel.go @@ -1,5 +1,6 @@ -package feta +package db +import "github.com/sneak/feta/process" import "github.com/jinzhu/gorm" import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm @@ -9,6 +10,6 @@ type savedInstance struct { software string } -func (f *Process) databaseMigrations() { +func (f *process.Feta) databaseMigrations() { f.db.AutoMigrate(&savedInstance{}) } diff --git a/feta.go b/feta.go deleted file mode 100644 index d6ee5b0..0000000 --- a/feta.go +++ /dev/null @@ -1,124 +0,0 @@ -package feta - -import "os" -import "time" - -import "github.com/jinzhu/gorm" -import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm - -import "github.com/rs/zerolog" -import "github.com/rs/zerolog/log" -import "github.com/mattn/go-isatty" - -// InstanceHostname is a special type for holding the hostname of an -// instance (string) -type InstanceHostname string - -// CLIEntry is the main entrypoint for the feta process from the cli -func CLIEntry(version string, buildtime string, buildarch string, builduser string) int { - f := new(Process) - f.version = version - f.buildtime = buildtime - f.buildarch = buildarch - f.builduser = builduser - f.setupLogging() - return f.runForever() -} - -// Process is the main structure/process of this app -type Process struct { - version string - buildtime string - buildarch string - builduser string - locator *InstanceLocator - manager *InstanceManager - api *fetaAPIServer - db *gorm.DB - startup time.Time -} - -func (f *Process) identify() { - log.Info(). - Str("version", f.version). - Str("buildtime", f.buildtime). - Str("buildarch", f.buildarch). - Str("builduser", f.builduser). - Msg("starting") -} - -func (f *Process) setupLogging() { - - log.Logger = log.With().Caller().Logger() - - tty := isatty.IsTerminal(os.Stdin.Fd()) || isatty.IsCygwinTerminal(os.Stdin.Fd()) - - if tty { - out := zerolog.NewConsoleWriter( - func(w *zerolog.ConsoleWriter) { - // Customize time format - w.TimeFormat = time.RFC3339 - }, - ) - log.Logger = log.Output(out) - } - - // always log in UTC - zerolog.TimestampFunc = func() time.Time { - return time.Now().UTC() - } - - zerolog.SetGlobalLevel(zerolog.InfoLevel) - if os.Getenv("DEBUG") != "" { - zerolog.SetGlobalLevel(zerolog.DebugLevel) - } - - f.identify() -} - -func (f *Process) uptime() time.Duration { - return time.Since(f.startup) -} - -func (f *Process) setupDatabase() { - var err error - f.db, err = gorm.Open("sqlite3", "feta.sqlite") - - if err != nil { - panic(err) - } - - f.databaseMigrations() -} - -func (f *Process) runForever() int { - f.startup = time.Now() - - f.setupDatabase() - - newInstanceHostnameNotifications := make(chan InstanceHostname) - - f.locator = newInstanceLocator() - f.manager = newInstanceManager() - f.api = new(fetaAPIServer) - f.api.setFeta(f) // api needs to get to us to access data - - f.locator.addInstanceNotificationChannel(newInstanceHostnameNotifications) - f.manager.addInstanceNotificationChannel(newInstanceHostnameNotifications) - - // locator goroutine: - go f.locator.locate() - - // manager goroutine: - go f.manager.manage() - - go f.api.serve() - - // this goroutine (main) does nothing until we handle signals - // FIXME(sneak) - for { - time.Sleep(1 * time.Second) - } - - return 0 -} diff --git a/ingester/ingester.go b/ingester/ingester.go new file mode 100644 index 0000000..60556d7 --- /dev/null +++ b/ingester/ingester.go @@ -0,0 +1,59 @@ +package ingester + +import "time" +import "github.com/rs/zerolog/log" +import "github.com/sneak/feta/toot" +import "github.com/sneak/feta/storage" + +// TootIngester is the data structure for the ingester process that is +// responsible for storing the discovered toots +type TootIngester struct { + inbound chan *toot.Toot + recentlySeen []*seenTootMemo + storageBackend storage.TootStorageBackend +} + +type seenTootMemo struct { + lastSeen time.Time + tootHash toot.Hash +} + +// NewTootIngester returns a fresh TootIngester for your use +func NewTootIngester() *TootIngester { + ti := new(TootIngester) + ti.inbound = make(chan *toot.Toot, 10000) + return ti +} + +// SetStorageBackend takes a type conforming to TootStorageBackend for +// persisting toots somewhere/somehow +func (ti *TootIngester) SetStorageBackend(be storage.TootStorageBackend) { + ti.storageBackend = be +} + +// GetDeliveryChannel returns a channel that receives pointers to toots +// which the ingester will dedupe and store +func (ti *TootIngester) GetDeliveryChannel() chan *toot.Toot { + return ti.inbound +} + +// Ingest is the main entrypoint for the TootIngester goroutine +func (ti *TootIngester) Ingest() { + log.Info().Msg("TootIngester starting") + go ti.readFromInboundChannel() +} + +func (ti *TootIngester) readFromInboundChannel() { + for { + nt := <-ti.inbound + go ti.storeToot(nt) + } +} + +func (ti *TootIngester) storeToot(t *toot.Toot) { + // FIXME first check for dupes in recentlySeen + if ti.storageBackend == nil { + panic("no storage backend") + } + ti.storageBackend.StoreToot(*t) +} diff --git a/instance.go b/instance/instance.go similarity index 50% rename from instance.go rename to instance/instance.go index 727bbb9..67e5de1 100644 --- a/instance.go +++ b/instance/instance.go @@ -1,4 +1,4 @@ -package feta +package instance import "encoding/json" import "fmt" @@ -12,46 +12,53 @@ import "errors" //import "github.com/gin-gonic/gin" import "github.com/looplab/fsm" import "github.com/rs/zerolog/log" +import "github.com/sneak/feta/storage" +import "github.com/sneak/feta/toot" +import "github.com/sneak/feta/jsonapis" const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0" - const instanceNodeinfoTimeout = time.Second * 50 - -const instanceHTTPTimeout = time.Second * 50 - +const instanceHTTPTimeout = time.Second * 120 const instanceSpiderInterval = time.Second * 120 - const instanceErrorInterval = time.Second * 60 * 30 type instanceImplementation int +// Hostname is a special type for holding the hostname of an +// instance (string) +type Hostname string + const ( implUnknown instanceImplementation = iota implMastodon implPleroma ) -type instance struct { +// Instance stores all the information we know about an instance +type Instance struct { structLock sync.Mutex - errorCount uint - successCount uint + tootDestination chan *toot.Toot + ErrorCount uint + SuccessCount uint highestID int - hostname string - identified bool + Hostname string + Identified bool fetching bool implementation instanceImplementation - backend *instanceBackend - nextFetch time.Time + storageBackend *storage.TootStorageBackend + NextFetch time.Time nodeInfoURL string - serverVersionString string - serverImplementationString string + ServerVersionString string + ServerImplementationString string fetchingLock sync.Mutex fsm *fsm.FSM fsmLock sync.Mutex } -func newInstance(options ...func(i *instance)) *instance { - i := new(instance) +// New returns a new instance, argument is a function that operates on the +// new instance +func New(options ...func(i *Instance)) *Instance { + i := new(Instance) i.setNextFetchAfter(1 * time.Second) i.fsm = fsm.NewFSM( @@ -62,9 +69,11 @@ func newInstance(options ...func(i *instance)) *instance { {Name: "BEGIN_NODEINFO_FETCH", Src: []string{"PRE_NODEINFO_FETCH"}, Dst: "FETCHING_NODEINFO"}, {Name: "GOT_NODEINFO", Src: []string{"FETCHING_NODEINFO"}, Dst: "READY_FOR_TOOTFETCH"}, {Name: "FETCH_TIME_REACHED", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "READY_AND_DUE_FETCH"}, + {Name: "BEGIN_TOOT_FETCH", Src: []string{"READY_AND_DUE_FETCH"}, Dst: "FETCHING"}, {Name: "WEIRD_NODE_RESPONSE", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "WEIRD_NODE"}, {Name: "EARLY_FETCH_ERROR", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "EARLY_ERROR"}, - {Name: "TOOT_FETCH_ERROR", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "TOOT_FETCH_ERROR"}, + {Name: "TOOT_FETCH_ERROR", Src: []string{"FETCHING"}, Dst: "TOOT_FETCH_ERROR"}, + {Name: "TOOTS_FETCHED", Src: []string{"FETCHING"}, Dst: "READY_FOR_TOOTFETCH"}, }, fsm.Callbacks{ "enter_state": func(e *fsm.Event) { i.fsmEnterState(e) }, @@ -77,65 +86,82 @@ func newInstance(options ...func(i *instance)) *instance { return i } -func (i *instance) Status() string { +// Status returns the instance's state in the FSM +func (i *Instance) Status() string { i.fsmLock.Lock() defer i.fsmLock.Unlock() return i.fsm.Current() } -func (i *instance) Event(eventname string) { +// SetTootDestination takes a channel from the manager that all toots +// fetched from this instance should be pushed into. The instance is not +// responsible for deduplication, it should shove all toots on every fetch +// into the channel. +func (i *Instance) SetTootDestination(d chan *toot.Toot) { + i.tootDestination = d +} + +// Event is the method that alters the FSM +func (i *Instance) Event(eventname string) { i.fsmLock.Lock() defer i.fsmLock.Unlock() i.fsm.Event(eventname) } -func (i *instance) fsmEnterState(e *fsm.Event) { +func (i *Instance) fsmEnterState(e *fsm.Event) { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("state", e.Dst). Msg("instance changed state") } -func (i *instance) Lock() { +// Lock locks the instance's mutex for reading/writing from the structure +func (i *Instance) Lock() { i.structLock.Lock() } -func (i *instance) Unlock() { +// Unlock unlocks the instance's mutex for reading/writing from the structure +func (i *Instance) Unlock() { i.structLock.Unlock() } -func (i *instance) bumpFetch() { +func (i *Instance) bumpFetch() { i.Lock() defer i.Unlock() - i.nextFetch = time.Now().Add(120 * time.Second) + i.NextFetch = time.Now().Add(120 * time.Second) } -func (i *instance) setNextFetchAfter(d time.Duration) { +func (i *Instance) setNextFetchAfter(d time.Duration) { i.Lock() defer i.Unlock() - i.nextFetch = time.Now().Add(d) + i.NextFetch = time.Now().Add(d) } -func (i *instance) Fetch() { +// Fetch prepares an instance for fetching. Bad name, fix it. +// FIXME(sneak) +func (i *Instance) Fetch() { i.fetchingLock.Lock() defer i.fetchingLock.Unlock() i.setNextFetchAfter(instanceErrorInterval) - err := i.detectNodeTypeIfNecessary() + err := i.DetectNodeTypeIfNecessary() if err != nil { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msg("unable to fetch instance metadata") return } i.setNextFetchAfter(instanceSpiderInterval) - log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", i.hostname) + log.Info(). + Str("hostname", i.Hostname). + Msg("instance now ready for fetch") } -func (i *instance) dueForFetch() bool { +// FIXME rename this function +func (i *Instance) dueForFetch() bool { // this just checks FSM state, the ticker must update it and do time // calcs if i.Status() == "READY_AND_DUE_FETCH" { @@ -144,21 +170,26 @@ func (i *instance) dueForFetch() bool { return false } -func (i *instance) isNowPastFetchTime() bool { - return time.Now().After(i.nextFetch) +func (i *Instance) isNowPastFetchTime() bool { + return time.Now().After(i.NextFetch) } -func (i *instance) Tick() { +// Tick is responsible for pushing idle instance records between states. +// The instances will transition between states when doing stuff (e.g. +// investigating, fetching, et c) as well. +func (i *Instance) Tick() { if i.Status() == "READY_FOR_TOOTFETCH" { if i.isNowPastFetchTime() { i.Event("FETCH_TIME_REACHED") } } else if i.Status() == "STATUS_UNKNOWN" { i.Fetch() + } else if i.Status() == "READY_AND_DUE_FETCH" { + i.fetchRecentToots() } } -func (i *instance) nodeIdentified() bool { +func (i *Instance) nodeIdentified() bool { i.Lock() defer i.Unlock() if i.implementation > implUnknown { @@ -167,47 +198,50 @@ func (i *instance) nodeIdentified() bool { return false } -func (i *instance) detectNodeTypeIfNecessary() error { +// DetectNodeTypeIfNecessary does some network requests if the node is as +// yet unidenfitied. No-op otherwise. +func (i *Instance) DetectNodeTypeIfNecessary() error { if !i.nodeIdentified() { return i.fetchNodeInfo() - } - return nil + } + return nil } -func (i *instance) registerError() { +func (i *Instance) registerError() { i.Lock() defer i.Unlock() - i.errorCount++ + i.ErrorCount++ } -func (i *instance) registerSuccess() { +func (i *Instance) registerSuccess() { i.Lock() defer i.Unlock() - i.successCount++ + i.SuccessCount++ } -func (i *instance) Up() bool { +// Up returns true if the success count is >0 +func (i *Instance) Up() bool { i.Lock() defer i.Unlock() - return i.successCount > 0 + return i.SuccessCount > 0 } -func (i *instance) fetchNodeInfoURL() error { - url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname) +func (i *Instance) fetchNodeInfoURL() error { + url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.Hostname) var c = &http.Client{ Timeout: instanceNodeinfoTimeout, } log.Debug(). Str("url", url). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Msg("fetching nodeinfo reference URL") i.Event("BEGIN_NODEINFO_URL_FETCH") resp, err := c.Get(url) if err != nil { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msg("unable to fetch nodeinfo, node is down?") i.registerError() @@ -220,7 +254,7 @@ func (i *instance) fetchNodeInfoURL() error { if err != nil { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msg("unable to read nodeinfo") i.registerError() @@ -228,11 +262,11 @@ func (i *instance) fetchNodeInfoURL() error { return err } - nir := new(nodeInfoWellKnownResponse) + nir := new(jsonapis.NodeInfoWellKnownResponse) err = json.Unmarshal(body, &nir) if err != nil { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msg("unable to parse nodeinfo, node is weird") i.registerError() @@ -243,7 +277,7 @@ func (i *instance) fetchNodeInfoURL() error { for _, item := range nir.Links { if item.Rel == nodeInfoSchemaVersionTwoName { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("nodeinfourl", item.Href). Msg("success fetching url for nodeinfo") @@ -255,21 +289,21 @@ func (i *instance) fetchNodeInfoURL() error { return nil } log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("item-rel", item.Rel). Str("item-href", item.Href). Msg("nodeinfo entry") } log.Error(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Msg("incomplete nodeinfo") i.registerError() i.Event("WEIRD_NODE_RESPONSE") return errors.New("incomplete nodeinfo") } -func (i *instance) fetchNodeInfo() error { +func (i *Instance) fetchNodeInfo() error { err := i.fetchNodeInfoURL() if err != nil { @@ -291,7 +325,7 @@ func (i *instance) fetchNodeInfo() error { if err != nil { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msgf("unable to fetch nodeinfo data") i.registerError() @@ -304,7 +338,7 @@ func (i *instance) fetchNodeInfo() error { if err != nil { log.Error(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msgf("unable to read nodeinfo data") i.registerError() @@ -312,11 +346,11 @@ func (i *instance) fetchNodeInfo() error { return err } - ni := new(nodeInfoVersionTwoSchema) + ni := new(jsonapis.NodeInfoVersionTwoSchema) err = json.Unmarshal(body, &ni) if err != nil { log.Error(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Err(err). Msgf("unable to parse nodeinfo") i.registerError() @@ -327,21 +361,21 @@ func (i *instance) fetchNodeInfo() error { log.Debug(). Str("serverVersion", ni.Software.Version). Str("software", ni.Software.Name). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("nodeInfoURL", i.nodeInfoURL). Msg("received nodeinfo from instance") i.Lock() - i.serverVersionString = ni.Software.Version - i.serverImplementationString = ni.Software.Name + i.ServerVersionString = ni.Software.Version + i.ServerImplementationString = ni.Software.Name ni.Software.Name = strings.ToLower(ni.Software.Name) if ni.Software.Name == "pleroma" { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("software", ni.Software.Name). Msg("detected server software") - i.identified = true + i.Identified = true i.implementation = implPleroma i.Unlock() i.registerSuccess() @@ -349,10 +383,10 @@ func (i *instance) fetchNodeInfo() error { return nil } else if ni.Software.Name == "mastodon" { log.Debug(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("software", ni.Software.Name). Msg("detected server software") - i.identified = true + i.Identified = true i.implementation = implMastodon i.Unlock() i.registerSuccess() @@ -360,7 +394,7 @@ func (i *instance) fetchNodeInfo() error { return nil } else { log.Error(). - Str("hostname", i.hostname). + Str("hostname", i.Hostname). Str("software", ni.Software.Name). Msg("FIXME unknown server implementation") i.Unlock() @@ -370,34 +404,78 @@ func (i *instance) fetchNodeInfo() error { } } -/* -func (i *Instance) fetchRecentToots() ([]byte, error) { - i.Lock() - impl := i.impl - i.Unlock() +func (i *Instance) fetchRecentToots() error { + // this would have been about a billion times shorter in python - if impl == Mastodon { - return i.fetchRecentTootsJsonFromMastodon() - } else if impl == Pleroma { - return i.fetchRecentTootsJsonFromPleroma() - } else { - panic("unimplemented") + // it turns out pleroma supports the mastodon api so we'll just use that + // for everything for now + url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", + i.Hostname) + + var c = &http.Client{ + Timeout: instanceHTTPTimeout, + } + + i.Event("BEGIN_TOOT_FETCH") + // we set the interval now to the error interval regardless here as a + // safety against bugs to avoid fetching too frequently by logic + // bug. if the fetch is successful, we will conditionally re-update the + // next fetch to now+successInterval. + i.setNextFetchAfter(instanceErrorInterval) + resp, err := c.Get(url) + + if err != nil { + log.Debug(). + Str("hostname", i.Hostname). + Err(err). + Msgf("unable to fetch recent toots") + i.registerError() + i.Event("TOOT_FETCH_ERROR") + return err + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + + if err != nil { + log.Debug(). + Str("hostname", i.Hostname). + Err(err). + Msgf("unable to read recent toots from response") + i.registerError() + i.Event("TOOT_FETCH_ERROR") + return err + } + + tc, err := toot.NewTootCollectionFromMastodonAPIResponse(body, i.Hostname) + + if err != nil { + log.Error(). + Str("hostname", i.Hostname). + Err(err). + Msgf("unable to parse recent toot list") + i.registerError() + i.Event("TOOT_FETCH_ERROR") + return err + } + + log.Info(). + Str("hostname", i.Hostname). + Int("tootCount", len(tc)). + Msgf("got and parsed toots") + i.registerSuccess() + i.Event("TOOTS_FETCHED") + i.setNextFetchAfter(instanceSpiderInterval) + + // this should go fast as either the channel is buffered bigly or the + // ingester receives fast and does its own buffering, but run it in its + // own goroutine anyway because why not + go i.sendTootsToIngester(tc) + return nil +} + +func (i *Instance) sendTootsToIngester(tc []*toot.Toot) { + for _, item := range tc { + i.tootDestination <- item } } -*/ - -/* -func (i *PleromaBackend) fetchRecentToots() ([]byte, error) { - //url := - //fmt.Sprintf("https://%s/api/statuses/public_and_external_timeline.json?count=100", - //i.hostname) - return nil, nil -} - -func (i *MastodonBackend) fetchRecentTootsJsonFromMastodon() ([]byte, error) { - //url := - //fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", - //i.hostname) - return nil, nil -} -*/ diff --git a/jsonapis/helpers.go b/jsonapis/helpers.go new file mode 100644 index 0000000..bf533fc --- /dev/null +++ b/jsonapis/helpers.go @@ -0,0 +1,10 @@ +package jsonapis + +import "fmt" +import "encoding/json" + +func (atl *apTootList) String() string { + return fmt.Sprintf("%+v", atl) +} + +type apTootList []json.RawMessage diff --git a/jsonapis.go b/jsonapis/structures.go similarity index 71% rename from jsonapis.go rename to jsonapis/structures.go index c71da00..96c4110 100644 --- a/jsonapis.go +++ b/jsonapis/structures.go @@ -1,10 +1,13 @@ -package feta +package jsonapis import "time" // thank fuck for https://mholt.github.io/json-to-go/ otherwise // this would have been a giant pain in the dick -type mastodonIndexResponse struct { + +// MastodonIndexResponse is the json api shape from the mastodon instance +// indexer +type MastodonIndexResponse struct { Instances []struct { ID string `json:"_id"` AddedAt time.Time `json:"addedAt"` @@ -48,7 +51,9 @@ type mastodonIndexResponse struct { } `json:"instances"` } -type pleromaIndexResponse []struct { +// PleromaIndexResponse is the json api shape from the pleroma instance +// indexer +type PleromaIndexResponse []struct { Domain string `json:"domain"` Title string `json:"title"` Thumbnail string `json:"thumbnail"` @@ -62,7 +67,8 @@ type pleromaIndexResponse []struct { TextLimit int `json:"text_limit"` } -type nodeInfoVersionTwoSchema struct { +// NodeInfoVersionTwoSchema is the json format of nodeinfo 2.0 +type NodeInfoVersionTwoSchema struct { Version string `json:"version"` Software struct { Name string `json:"name"` @@ -80,9 +86,34 @@ type nodeInfoVersionTwoSchema struct { OpenRegistrations bool `json:"openRegistrations"` } -type nodeInfoWellKnownResponse struct { +// NodeInfoWellKnownResponse is the json format of the nodeinfo schema +type NodeInfoWellKnownResponse struct { Links []struct { Rel string `json:"rel"` Href string `json:"href"` } `json:"links"` } + +// APISerializedToot is a partial shape of the json serialized form of a +// toot from the mastodon api (also used by pleroma). We save the original +// json from the server though so this is just a minimal subset that we need +// to deserialize for purposes of this indexer app. +type APISerializedToot struct { + Account struct { + Acct string `json:"acct"` + ID string `json:"id"` + URL string `json:"url"` + Username string `json:"username"` + } `json:"account"` + Content string `json:"content"` + CreatedAt time.Time `json:"created_at"` + ID string `json:"id"` + Mentions []struct { + Acct string `json:"acct"` + ID string `json:"id"` + URL string `json:"url"` + Username string `json:"username"` + } `json:"mentions"` + URI string `json:"uri"` + URL string `json:"url"` +} diff --git a/locator.go b/locator/locator.go similarity index 83% rename from locator.go rename to locator/locator.go index d8af1cd..1efad65 100644 --- a/locator.go +++ b/locator/locator.go @@ -1,4 +1,4 @@ -package feta +package locator import "encoding/json" import "io/ioutil" @@ -8,10 +8,13 @@ import "sync" import "github.com/rs/zerolog/log" import "golang.org/x/sync/semaphore" +import "github.com/sneak/feta/jsonapis" +import "github.com/sneak/feta/instance" +import "github.com/sneak/feta" // IndexAPITimeout is the timeout for fetching json instance lists // from the listing servers -const IndexAPITimeout = time.Second * 60 +const IndexAPITimeout = time.Second * 60 * 3 // UserAgent is the user-agent string we provide to servers var UserAgent = "feta indexer bot, sneak@sneak.berlin for feedback" @@ -24,10 +27,6 @@ var IndexCheckInterval = time.Second * 60 * 60 // (default: 10m) var IndexErrorInterval = time.Second * 60 * 10 -// LogReportInterval defines how long between logging internal -// stats/reporting for user supervision -var LogReportInterval = time.Second * 10 - const mastodonIndexURL = "https://instances.social/list.json?q%5Busers%5D=&q%5Bsearch%5D=&strict=false" const pleromaIndexURL = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api.cgi" @@ -36,11 +35,12 @@ const pleromaIndexURL = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api type InstanceLocator struct { pleromaIndexNextRefresh *time.Time mastodonIndexNextRefresh *time.Time - reportInstanceVia chan InstanceHostname - mu sync.Mutex + reportInstanceVia chan instance.Hostname + mu sync.Mutex } -func newInstanceLocator() *InstanceLocator { +// New returns an InstanceLocator for use by the process. +func New() *InstanceLocator { il := new(InstanceLocator) n := time.Now() il.pleromaIndexNextRefresh = &n @@ -49,20 +49,22 @@ func newInstanceLocator() *InstanceLocator { } func (il *InstanceLocator) lock() { - il.mu.Lock() + il.mu.Lock() } func (il *InstanceLocator) unlock() { - il.mu.Unlock() + il.mu.Unlock() } -func (il *InstanceLocator) addInstanceNotificationChannel(via chan InstanceHostname) { +// SetInstanceNotificationChannel is the way the instanceLocator returns +// newly discovered instances back to the manager for query/addition +func (il *InstanceLocator) SetInstanceNotificationChannel(via chan instance.Hostname) { il.lock() defer il.unlock() il.reportInstanceVia = via } -func (il *InstanceLocator) addInstance(hostname InstanceHostname) { +func (il *InstanceLocator) addInstance(hostname instance.Hostname) { // receiver (InstanceManager) is responsible for de-duping against its // map, we just locate and spray, it manages il.reportInstanceVia <- hostname @@ -84,7 +86,9 @@ func (il *InstanceLocator) durationUntilNextPleromaIndexRefresh() time.Duration return (time.Duration(-1) * time.Now().Sub(*il.pleromaIndexNextRefresh)) } -func (il *InstanceLocator) locate() { +// Locate is the main entrypoint for the instancelocator, designed to be +// called once in its own gorutine. +func (il *InstanceLocator) Locate() { log.Info().Msg("InstanceLocator starting") x := time.Now() var pleromaSemaphore = semaphore.NewWeighted(1) @@ -116,7 +120,8 @@ func (il *InstanceLocator) locate() { time.Sleep(1 * time.Second) - if time.Now().After(x.Add(LogReportInterval)) { + c := feta.GetConfig() + if time.Now().After(x.Add(c.LogReportInterval)) { x = time.Now() log.Debug(). Str("nextMastodonIndexRefresh", il.durationUntilNextMastodonIndexRefresh().String()). @@ -151,8 +156,8 @@ func (il *InstanceLocator) locateMastodon() { return } - log.Info(). - Msg("fetched mastodon index") + log.Info(). + Msg("fetched mastodon index") defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) @@ -171,7 +176,7 @@ func (il *InstanceLocator) locateMastodon() { il.mastodonIndexNextRefresh = &t il.unlock() - mi := new(mastodonIndexResponse) + mi := new(jsonapis.MastodonIndexResponse) err = json.Unmarshal(body, &mi) if err != nil { log.Error().Msgf("unable to parse mastodon instance list: %s", err) @@ -193,7 +198,7 @@ func (il *InstanceLocator) locateMastodon() { Msg("received hosts from mastodon index") for k := range hosts { - il.addInstance(InstanceHostname(k)) + il.addInstance(instance.Hostname(k)) } } @@ -239,7 +244,7 @@ func (il *InstanceLocator) locatePleroma() { il.pleromaIndexNextRefresh = &t il.unlock() - pi := new(pleromaIndexResponse) + pi := new(jsonapis.PleromaIndexResponse) err = json.Unmarshal(body, &pi) if err != nil { log.Warn().Msgf("unable to parse pleroma instance list: %s", err) @@ -261,7 +266,7 @@ func (il *InstanceLocator) locatePleroma() { Msg("received hosts from pleroma index") for k := range hosts { - il.addInstance(InstanceHostname(k)) + il.addInstance(instance.Hostname(k)) } } diff --git a/manager.go b/manager/manager.go similarity index 54% rename from manager.go rename to manager/manager.go index 330da3a..fa09afd 100644 --- a/manager.go +++ b/manager/manager.go @@ -1,4 +1,4 @@ -package feta +package manager import "sync" import "time" @@ -6,30 +6,41 @@ import "runtime" //import "github.com/gin-gonic/gin" import "github.com/rs/zerolog/log" +import "github.com/sneak/feta/toot" +import "github.com/sneak/feta/seeds" +import "github.com/sneak/feta/instance" -const hostDiscoveryParallelism = 20 +const hostDiscoveryParallelism = 5 -type instanceBackend interface { - //FIXME -} +// LogReportInterval defines how long between logging internal +// stats/reporting for user supervision +var LogReportInterval = time.Second * 10 // InstanceManager is the main data structure for the goroutine that manages // the list of all known instances, fed by the locator type InstanceManager struct { mu sync.Mutex - instances map[InstanceHostname]*instance - newInstanceNotifications chan InstanceHostname + instances map[instance.Hostname]*instance.Instance + newInstanceNotifications chan instance.Hostname + tootDestination chan *toot.Toot startup time.Time hostAdderSemaphore chan bool } -func newInstanceManager() *InstanceManager { +// New returns a new InstanceManager for use by the Process +func New() *InstanceManager { i := new(InstanceManager) i.hostAdderSemaphore = make(chan bool, hostDiscoveryParallelism) - i.instances = make(map[InstanceHostname]*instance) + i.instances = make(map[instance.Hostname]*instance.Instance) return i } +// SetTootDestination provides the instancemanager with a channel to the +// ingester that it can give to its instances +func (im *InstanceManager) SetTootDestination(td chan *toot.Toot) { + im.tootDestination = td +} + func (im *InstanceManager) logCaller(msg string) { fpcs := make([]uintptr, 1) // Skip 2 levels to get the caller @@ -62,19 +73,39 @@ func (im *InstanceManager) unlock() { im.mu.Unlock() } -func (im *InstanceManager) addInstanceNotificationChannel(via chan InstanceHostname) { +// SetInstanceNotificationChannel is how the Process tells the +// InstanceManager about the channel from the InstanceLocator so that the +// InstanceLocator can provide it/us (the InstanceManager) with new +// instance.Hostnames. We (the manager) deduplicate the list ourselves. +func (im *InstanceManager) SetInstanceNotificationChannel(via chan instance.Hostname) { im.lock() defer im.unlock() im.newInstanceNotifications = via } -func (im *InstanceManager) manage() { +func (im *InstanceManager) receiveSeedInstanceHostnames() { + for _, x := range seeds.SeedInstances { + go func(tmp instance.Hostname) { + im.addInstanceByHostname(tmp) + }(instance.Hostname(x)) + } +} + +// Manage is the main entrypoint of the InstanceManager, designed to be +// called once in its own goroutine. +func (im *InstanceManager) Manage() { log.Info().Msg("InstanceManager starting") go func() { im.receiveNewInstanceHostnames() }() + im.startup = time.Now() x := im.startup + + go func() { + im.receiveSeedInstanceHostnames() + }() + for { log.Info().Msg("InstanceManager tick") im.managerLoop() @@ -88,21 +119,21 @@ func (im *InstanceManager) manage() { func (im *InstanceManager) managerLoop() { im.lock() - il := make([]*instance, 0) + il := make([]*instance.Instance, 0) for _, v := range im.instances { il = append(il, v) } im.unlock() - // FIXME is this a bug outside of the mutex above? + // FIXME is this a bug outside of the mutex above? for _, v := range il { - go func(i *instance) { + go func(i *instance.Instance) { i.Tick() }(v) } } -func (im *InstanceManager) hostnameExists(newhn InstanceHostname) bool { +func (im *InstanceManager) hostnameExists(newhn instance.Hostname) bool { im.lock() defer im.unlock() for k := range im.instances { @@ -113,21 +144,22 @@ func (im *InstanceManager) hostnameExists(newhn InstanceHostname) bool { return false } -func (im *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { +func (im *InstanceManager) addInstanceByHostname(newhn instance.Hostname) { if im.hostnameExists(newhn) { - // ignore adding new if we already know about it + // ignore adding new if we already know about it return } // this blocks on the channel size, limiting concurrency im.hostAdderSemaphore <- true - i := newInstance(func(x *instance) { - x.hostname = string(newhn) + i := instance.New(func(x *instance.Instance) { + x.Hostname = string(newhn) // set hostname + x.SetTootDestination(im.tootDestination) // copy ingester input channel from manager to instance }) - // we do node detection under the addLock to avoid thundering + // we do node detection under the adderSemaphore to avoid thundering // on startup - i.detectNodeTypeIfNecessary() + i.DetectNodeTypeIfNecessary() // pop an item from the buffered channel <-im.hostAdderSemaphore @@ -140,12 +172,12 @@ func (im *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { } func (im *InstanceManager) receiveNewInstanceHostnames() { - var newhn InstanceHostname + var newhn instance.Hostname for { newhn = <-im.newInstanceNotifications // receive them fast out of the channel, let the adding function lock to add // them one at a time, using a bunch of blocked goroutines as our - // modification queue + // modification queue go im.addInstanceByHostname(newhn) } } @@ -163,8 +195,10 @@ func (im *InstanceManager) logInstanceReport() { Msg("instance report") } -func (im *InstanceManager) listInstances() []*instance { - var out []*instance +// ListInstances dumps a slice of all Instances the InstanceManager knows +// about +func (im *InstanceManager) ListInstances() []*instance.Instance { + var out []*instance.Instance im.lock() defer im.unlock() for _, v := range im.instances { @@ -175,7 +209,7 @@ func (im *InstanceManager) listInstances() []*instance { func (im *InstanceManager) instanceSummaryReport() map[string]uint { r := make(map[string]uint) - for _, v := range im.listInstances() { + for _, v := range im.ListInstances() { v.Lock() r[v.Status()]++ v.Unlock() diff --git a/process/feta.go b/process/feta.go new file mode 100644 index 0000000..5f61ffd --- /dev/null +++ b/process/feta.go @@ -0,0 +1,140 @@ +package process + +import "os" +import "time" + +import "github.com/jinzhu/gorm" +import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm + +import "github.com/rs/zerolog" +import "github.com/rs/zerolog/log" +import "github.com/mattn/go-isatty" + +import "github.com/sneak/feta/ingester" +import "github.com/sneak/feta/storage" +import "github.com/sneak/feta/locator" +import "github.com/sneak/feta/manager" +import "github.com/sneak/feta/instance" + +// CLIEntry is the main entrypoint for the feta process from the cli +func CLIEntry(version string, buildarch string) int { + f := new(Feta) + f.version = version + f.buildarch = buildarch + f.setupLogging() + return f.runForever() +} + +// Feta is the main structure/process of this app +type Feta struct { + version string + buildarch string + locator *locator.InstanceLocator + manager *manager.InstanceManager + ingester *ingester.TootIngester + api *Server + db *gorm.DB + startup time.Time +} + +func (f *Feta) identify() { + log.Info(). + Str("version", f.version). + Str("buildarch", f.buildarch). + Msg("starting") +} + +func (f *Feta) setupLogging() { + + log.Logger = log.With().Caller().Logger() + + tty := isatty.IsTerminal(os.Stdin.Fd()) || isatty.IsCygwinTerminal(os.Stdin.Fd()) + + if tty { + out := zerolog.NewConsoleWriter( + func(w *zerolog.ConsoleWriter) { + // Customize time format + w.TimeFormat = time.RFC3339 + }, + ) + log.Logger = log.Output(out) + } + + // always log in UTC + zerolog.TimestampFunc = func() time.Time { + return time.Now().UTC() + } + + zerolog.SetGlobalLevel(zerolog.InfoLevel) + if os.Getenv("DEBUG") != "" { + zerolog.SetGlobalLevel(zerolog.DebugLevel) + } + + f.identify() +} + +func (f *Feta) uptime() time.Duration { + return time.Since(f.startup) +} + +/* +func (f *Feta) setupDatabase() { + var err error + f.db, err = gorm.Open("sqlite3", "feta.sqlite") + + if err != nil { + panic(err) + } + + //f.databaseMigrations() +} +*/ + +func (f *Feta) runForever() int { + f.startup = time.Now() + + //f.setupDatabase() + + // FIXME move this channel creation into the manager's constructor + // and add getters/setters on the manager/locator + newInstanceHostnameNotifications := make(chan instance.Hostname) + + f.locator = locator.New() + f.manager = manager.New() + f.ingester = ingester.NewTootIngester() + + home := os.Getenv("HOME") + if home == "" { + panic("can't find home directory") + } + + diskBackend := storage.NewTootFSStorage(home + "/.local/feta") + f.ingester.SetStorageBackend(diskBackend) + + f.api = new(Server) + f.api.SetFeta(f) // api needs to get to us to access data + + f.locator.SetInstanceNotificationChannel(newInstanceHostnameNotifications) + f.manager.SetInstanceNotificationChannel(newInstanceHostnameNotifications) + + f.manager.SetTootDestination(f.ingester.GetDeliveryChannel()) + + // ingester goroutine: + go f.ingester.Ingest() + + // locator goroutine: + go f.locator.Locate() + + // manager goroutine: + go f.manager.Manage() + + go f.api.Serve() + + // this goroutine (main) does nothing until we handle signals + // FIXME(sneak) + for { + time.Sleep(1 * time.Second) + } + + return 0 +} diff --git a/apihandlers.go b/process/handlers.go similarity index 67% rename from apihandlers.go rename to process/handlers.go index 027e1dc..a1ac43d 100644 --- a/apihandlers.go +++ b/process/handlers.go @@ -1,4 +1,4 @@ -package feta +package process import "time" import "net/http" @@ -11,25 +11,25 @@ import "github.com/gin-gonic/gin" type hash map[string]interface{} -func (a *fetaAPIServer) instances() []hash { +func (a *Server) instances() []hash { resp := make([]hash, 0) now := time.Now() - for _, v := range a.feta.manager.listInstances() { + for _, v := range a.feta.manager.ListInstances() { i := make(hash) // FIXME figure out why a very short lock here deadlocks v.Lock() - i["hostname"] = v.hostname - i["nextCheck"] = v.nextFetch.UTC().Format(time.RFC3339) - i["nextCheckAfter"] = (-1 * now.Sub(v.nextFetch)).String() - i["successCount"] = v.successCount - i["errorCount"] = v.errorCount - i["identified"] = v.identified + i["hostname"] = v.Hostname + i["nextCheck"] = v.NextFetch.UTC().Format(time.RFC3339) + i["nextCheckAfter"] = (-1 * now.Sub(v.NextFetch)).String() + i["successCount"] = v.SuccessCount + i["errorCount"] = v.ErrorCount + i["identified"] = v.Identified i["status"] = v.Status() i["software"] = "unknown" i["version"] = "unknown" - if v.identified { - i["software"] = v.serverImplementationString - i["version"] = v.serverVersionString + if v.Identified { + i["software"] = v.ServerImplementationString + i["version"] = v.ServerVersionString } v.Unlock() resp = append(resp, i) @@ -37,21 +37,21 @@ func (a *fetaAPIServer) instances() []hash { return resp } -func (a *fetaAPIServer) instanceSummary() map[string]int { +func (a *Server) instanceSummary() map[string]int { resp := make(map[string]int) - for _, v := range a.feta.manager.listInstances() { + for _, v := range a.feta.manager.ListInstances() { v.Lock() resp[fmt.Sprintf("STATUS_%s", v.Status())]++ - if v.serverImplementationString != "" { + if v.ServerImplementationString != "" { //FIXME(sneak) sanitize this to a-z0-9, it is server-provided - resp[fmt.Sprintf("SOFTWARE_%s", strings.ToUpper(v.serverImplementationString))]++ + resp[fmt.Sprintf("SOFTWARE_%s", strings.ToUpper(v.ServerImplementationString))]++ } v.Unlock() } return resp } -func (a *fetaAPIServer) getInstanceListHandler() http.HandlerFunc { +func (a *Server) getInstanceListHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { result := &gin.H{ @@ -69,7 +69,7 @@ func (a *fetaAPIServer) getInstanceListHandler() http.HandlerFunc { } } -func (a *fetaAPIServer) getIndexHandler() http.HandlerFunc { +func (a *Server) getIndexHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { index := &gin.H{ "server": &gin.H{ @@ -78,9 +78,7 @@ func (a *fetaAPIServer) getIndexHandler() http.HandlerFunc { "goroutines": runtime.NumGoroutine(), "goversion": runtime.Version(), "version": a.feta.version, - "buildtime": a.feta.buildtime, "buildarch": a.feta.buildarch, - "builduser": a.feta.builduser, }, "instanceSummary": a.instanceSummary(), } @@ -96,7 +94,7 @@ func (a *fetaAPIServer) getIndexHandler() http.HandlerFunc { } } -func (a *fetaAPIServer) getHealthCheckHandler() http.HandlerFunc { +func (a *Server) getHealthCheckHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { resp := &gin.H{ "status": "ok", diff --git a/apiserver.go b/process/server.go similarity index 75% rename from apiserver.go rename to process/server.go index fd8e91c..ddff565 100644 --- a/apiserver.go +++ b/process/server.go @@ -1,4 +1,4 @@ -package feta +package process import "fmt" import "net/http" @@ -10,19 +10,24 @@ import "github.com/rs/zerolog/log" import "github.com/gin-gonic/gin" import "github.com/dn365/gin-zerolog" -type fetaAPIServer struct { - feta *Process +// Server is the HTTP webserver object +type Server struct { + feta *Feta port uint router *gin.Engine server *http.Server debug bool } -func (a *fetaAPIServer) setFeta(feta *Process) { +// SetFeta tells the http Server where to find the Process object so that it +// can pull stats and other information for serving via http +func (a *Server) SetFeta(feta *Feta) { a.feta = feta } -func (a *fetaAPIServer) serve() { +// Serve is the entrypoint for the Server, which should run in its own +// goroutine (started by the Process) +func (a *Server) Serve() { if a.feta == nil { panic("must have feta app from which to serve stats") } @@ -50,7 +55,7 @@ func (a *fetaAPIServer) serve() { } } -func (a *fetaAPIServer) initRouter() { +func (a *Server) initRouter() { // empty router r := gin.New() @@ -69,7 +74,7 @@ func (a *fetaAPIServer) initRouter() { a.router = r } -func (a *fetaAPIServer) initServer() { +func (a *Server) initServer() { if !a.debug { gin.SetMode(gin.ReleaseMode) } diff --git a/seeds/seeds.go b/seeds/seeds.go new file mode 100644 index 0000000..dbee079 --- /dev/null +++ b/seeds/seeds.go @@ -0,0 +1,553 @@ +package seeds + +// SeedInstances is a list of instance hostnames used to seed the indexer. +// This list so far is a bunch of instances that have been +// banned/defederated by others so it's important to seed them so that we +// can always get their toots for archiving; they will likely not appear in +// common mentions/public indices. +// update: now includes a bunch of other instances too +var SeedInstances = [...]string{ + "blobturtle.club", + "busshi.moe", + "fedi.valkyrie.world", + "gnosis.systems", + "iscute.moe", + "kink.town", + "kinky.business", + "kinkyelephant.com", + "kiwec.net", + "kiwifarms.cc", + "kiwifarms.is", + "kiwifarms.net", + "kneegrows.top", + "knzk.me", + "kowai.youkai.town", + "koyu.space", + "krauser.org", + "kuko.hamburg", + "kune.gouge.re", + "kyot.me", + "kys.moe", + "lanners.uk", + "larvata.com", + "latinos.social", + "layer8.space", + "leftlibertarian.club", + "lesbian.energy", + "lets.saynoto.lgbt", + "letsalllovela.in", + "lgbtq.cool", + "lgbtqia.is", + "liberdon.com", + "libertarianism.club", + "librem.one", + "librenet.co.za", + "ligma.pro", + "likeable.space", + "linuxrocks.online", + "litodon.de", + "littles.space", + "liveview.cf", + "loci.onl", + "logjam.city", + "lol5.tun.a4.io", + "loli.estate", + "lolis.world", + "lost-angles.im", + "luvdon.cc", + "luvdon.ddns.net", + "m.1994.io", + "m.apertron.com", + "m.bnolet.me", + "m.danq.me", + "m.eula.dev", + "m.fratm.com", + "m.kretschmann.social", + "m.xorkle.com", + "magikarp.fun", + "maik.social", + "majak.de", + "makito.me", + "maly.io", + "manx.social", + "marchgenso.me", + "mares.cafe", + "mas.korrigan.tech", + "mas.to", + "mast.astragroup.info", + "mastadon.ml", + "masto.lost-angles.im", + "masto.misell.cymru", + "masto.ml", + "masto.mywebprojects.co.uk", + "masto.polarisfm.net", + "masto.powerlot.net", + "masto.stanisic.nl", + "mastoc.net", + "mastodon-network.com", + "mastodon.aekrylov.me", + "mastodon.alienlebarge.ch", + "mastodon.amaseto.com", + "mastodon.art", + "mastodon.aventer.biz", + "mastodon.blue", + "mastodon.cipherbliss.com", + "mastodon.circlelinego.com", + "mastodon.codeplumbers.eu", + "mastodon.coder.town", + "mastodon.com.pl", + "mastodon.corecoding.dev", + "mastodon.cyber-tribal.com", + "mastodon.dlitz.net", + "mastodon.echoz.io", + "mastodon.eric.ovh", + "mastodon.ericbeckers.nl", + "mastodon.fail", + "mastodon.freifunk-minden.de", + "mastodon.fricloud.dk", + "mastodon.funigtor.fr", + "mastodon.gamedev.place", + "mastodon.gargantia.fr", + "mastodon.geofox.org", + "mastodon.globalrevolution.tv", + "mastodon.gougere.fr", + "mastodon.grin.hu", + "mastodon.h.etbus.ch", + "mastodon.host", + "mastodon.hugolecourt.fr", + "mastodon.ie", + "mastodon.immae.eu", + "mastodon.inferiorlattice.com", + "mastodon.inhji.de", + "mastodon.jectrum.de", + "mastodon.jeder.pl", + "mastodon.kerenon.com", + "mastodon.kliu.io", + "mastodon.kosebamse.com", + "mastodon.leptonics.com", + "mastodon.local.lubar.me", + "mastodon.loliandstuff.moe", + "mastodon.lubar.me", + "mastodon.lunorian.is", + "mastodon.macsnet.cz", + "mastodon.maescool.be", + "mastodon.me.uk", + "mastodon.mynameisivan.ru", + "mastodon.naoy.fr", + "mastodon.nobodysstuff.de", + "mastodon.ocf.berkeley.edu", + "mastodon.openpsychology.net", + "mastodon.org.ua", + "mastodon.org.uk", + "mastodon.otherreality.net", + "mastodon.owls.io", + "mastodon.redflag.social", + "mastodon.roocita.com", + "mastodon.rylees.net", + "mastodon.scarletsisters.xyz", + "mastodon.schemacs.com", + "mastodon.scuttle.org", + "mastodon.sebbo.net", + "mastodon.sedryk.info", + "mastodon.social", + "mastodon.social", + "mastodon.soses.ca", + "mastodon.spiderden.net", + "mastodon.starrevolution.org", + "mastodon.syntik.fr", + "mastodon.technology", + "mastodon.technology", + "mastodon.toni.im", + "mastodon.toniozz75.fr", + "mastodon.truf-kin.com", + "mastodon.xhrpb.com", + "mastodon.yolovision-inc.com", + "mastodon.zapashcanon.fr", + "mastodon.zwei.net", + "mastofant.de", + "masttest.zwei.net", + "mcphail.uk", + "me.frankmeeuwsen.xyz", + "megadon.net", + "melalandia.tk", + "menzel-it.social", + "meow.social", + "mgub.yt", + "mikep.ro", + "ministry.moonbutt.science", + "misskey.io", + "misskey.nl", + "mmorpg.social", + "mobile.co", + "monsterpit.net", + "moytura.org", + "mst.mpdevel.com", + "mst.thewebzone.net", + "mst.vsta.org", + "mstdn.alternanet.fr", + "mstdn.ikebuku.ro", + "mstdn.io", + "mstdn.jp", + "mstdn.maud.io", + "mstdn.mx", + "mstdn.novium.pw", + "mstdn.openalgeria.org", + "mstdn.social", + "mstdn.tsukiyono.0am.jp", + "mstdn.waifu.space", + "mstdn.xxil.cc", + "mu.zaitcev.nu", + "mudl.us", + "multicast.social", + "music.pawoo.net", + "myflog.net", + "mypolis.zapto.org", + "myprayer.center", + "neckbeard.xyz", + "neenster.org", + "nerdynate.live", + "networked.space", + "netzsphaere.xyz", + "newjack.city", + "newsbots.eu", + "niedersachsen.social", + "ninja.social", + "nitro.horse", + "niu.moe", + "noagenda.social", + "noagendasocial.com", + "nojack.easydns.ca", + "nomoresha.me", + "nonexiste.net", + "norden.social", + "nordenmedia.com", + "not-develop.gab.com", + "not.phrack.fyi", + "npf.mlpol.net", + "nsfw.social", + "nudie.social", + "nyaa.social", + "octodon.social", + "odin.run", + "ohhi.icu", + "oneway.masto.host", + "opensim.fun", + "order.life", + "oslo.town", + "our.wtf", + "oursquad.rocks", + "outaouais.club", + "pachyder.me", + "pars.ee", + "patch.cx", + "pawoo.net", + "penguicon.social", + "pettingzoo.co", + "photodon.org", + "phreedom.tk", + "pieville.net", + "pifke.social", + "pigeon.town", + "pixfed.com", + "pl.765racing.com", + "pl.apelsin.la", + "pl.knotteye.cc", + "pl.kotobank.ch", + "pl.koyu.space", + "pl.kys.moe", + "pl.ohno.host", + "pl.smuglo.li", + "pl.wowana.me", + "pla.social", + "plag.masto.host", + "plankton.cz", + "playvicious.social", + "pleroma.1d4.us", + "pleroma.ch405.xyz", + "pleroma.cloud", + "pleroma.comfy.moe", + "pleroma.cucked.me", + "pleroma.fr", + "pleroma.kiwifarms.net", + "pleroma.miniwa.moe", + "pleroma.quaylessed.icu", + "pleroma.rareome.ga", + "pleroma.soykaf.com", + "pleroma.teromene.fr", + "pleroma.travnewmatic.com", + "pleroma.tuxcrafting.cf", + "pleroma.yorha.club", + "pltest.feminism.lgbt", + "plural.cafe", + "pokemon.men", + "polycule.club", + "pornfed.social", + "porntoot.com", + "post.mashek.net", + "pouet.jablon.fr", + "ppl.town", + "preteengirls.biz", + "pridelands.io", + "princess.cat", + "privacytools.io", + "producers.masto.host", + "programmer.technology", + "programmingsocks.com", + "project.social", + "protohype.net", + "prsm.space", + "psyopshop.com", + "pumba.space", + "pyyython.org", + "qoto.org", + "quasi.social", + "queer.farm", + "queersin.space", + "quey.org", + "quitter.pw", + "r3bl.social", + "rainbowdash.net", + "raki.social", + "rapefeminists.network", + "rebels.rest", + "redliberal.com", + "redroo.ml", + "redterrorcollective.net", + "relay-mypolis.zapto.org", + "relay.selfhosting.rocks", + "remotenode.host", + "rhubarb.land", + "rigcz.club", + "rightmastodon.com", + "rivals.space", + "rly.wtf", + "roar.killtheradio.net", + "ronin.world", + "roughseas.xyz", + "rrfarmbot.appspot.com", + "rubber.social", + "rva.party", + "s.b252.gq", + "s.huggingservers.uk", + "sackheads.social", + "sadposting.space", + "sammiesweetie.com", + "sangha.social", + "sapphos.be", + "scouts.devosmium.xyz", + "sealion.club", + "secure.kiwi", + "serious.ferret.business", + "shigusegubu.club", + "shinomiya.group", + "shiro.dog", + "shitasstits.life", + "shitposter.club", + "shpposter.club", + "simstodon.com", + "simulacron.de", + "sinblr.com", + "skippers-bin.com", + "skoops.social", + "slum.cloud", + "smuglo.li", + "sn.angry.im", + "snabelen.no", + "snaggletooth.life", + "snel.social", + "snuskete.net", + "soc.psychedelic.cat", + "social.1in9.net", + "social.adlerweb.info", + "social.allthefallen.ninja", + "social.au2pb.net", + "social.avareborn.de", + "social.azkware.net", + "social.b252.gq", + "social.backbord.net", + "social.bam.yt", + "social.bau-ha.us", + "social.beepboop.ga", + "social.cereza.de", + "social.cloudsumu.com", + "social.culturewar.us", + "social.cutienaut.club", + "social.digimortal.org", + "social.elqhost.net", + "social.end-the-stigma.com", + "social.enyutech.io", + "social.fab-l3.org", + "social.fedi.farm", + "social.fff-du.de", + "social.firc.de", + "social.florianjensen.com", + "social.foxfam.club", + "social.gattai.net", + "social.gnu.one", + "social.guizzyordi.info", + "social.headsca.la", + "social.heldscal.la", + "social.heroicwisdom.com", + "social.hidamari.blue", + "social.hodakov.me", + "social.homunyan.com", + "social.i2p.rocks", + "social.imirhil.fr", + "social.ingobernable.net", + "social.joshuacasey.net", + "social.lansky.name", + "social.librem.one", + "social.longden.me", + "social.louisoft01.moe", + "social.lucci.xyz", + "social.luschmar.ch", + "social.lyte.dev", + "social.mark.atwood.name", + "social.mhtube.de", + "social.minkenstein.de", + "social.mjb.im", + "social.mochi.academy", + "social.moseskaranja.com", + "social.mylinux.cz", + "social.net.ua", + "social.netdc.ca", + "social.niicow974.fr", + "social.nobodyhasthe.biz", + "social.nofftopia.com", + "social.noscraft.cf", + "social.offline.network", + "social.omniatv.com", + "social.panthermodern.net", + "social.privacytools.io", + "social.proyectolanuevatierra.com", + "social.puri.sm", + "social.putz.space", + "social.quodverum.com", + "social.radio.af", + "social.raptorengineering.io", + "social.rosnovsky.us", + "social.ryankes.eu", + "social.seattle.wa.us", + "social.secline.de", + "social.skankhunt42.pw", + "social.sunshinegardens.org", + "social.super-niche.club", + "social.taker.fr", + "social.targaryen.house", + "social.tchncs.de", + "social.thisisjoes.site", + "social.tomica.me", + "social.troll.academy", + "social.wiuwiu.de", + "social.zwei.net", + "sociala.me", + "socialnetwork.ninja", + "socl.win", + "socnet.supes.com", + "soderstrom.social", + "soteria.mastodon.host", + "souk.getloci.com", + "southflorida.social", + "spacetime.social", + "speakfree.world", + "spinster.dev", + "spinster.xyz", + "splat.soy", + "sprocket.group", + "starship.coffee", + "stereophonic.space", + "sunbeam.city", + "sunshinegardens.org", + "sweet.sugarcube.pw", + "swingset.social", + "switter.at", + "switter.co", + "syrup.zone", + "take.iteasy.club", + "takeoverthe.world", + "tamiltoot.online", + "tank.im", + "taosforum.com", + "tardis.world", + "tassaron.com", + "techflake.ch", + "the.hedgehoghunter.club", + "the.scream.zone", + "thechad.zone", + "thefreestate.xyz", + "thelballwiki.gq", + "thetower.xyz", + "thewired.xyz", + "thicc.horse", + "toot.brussels", + "toot.cat", + "toot.chemnitz.social", + "toot.devfs.xyz", + "toot.flairy.de", + "toot.forumanalogue.fr", + "toot.kiez.xyz", + "toot.love", + "toot.my", + "toot.nx-pod.de", + "toot.onl", + "toot.party", + "toot.site", + "toot.temsa.me", + "toot.wales", + "toot.world", + "toot.worldrovine.com", + "toot.ws", + "tooting.ch", + "toots.slothy.win", + "toucans.social", + "travel-friends.chat", + "tri.cash", + "triangletoot.party", + "triggerhub.ru", + "tron.buzz", + "twimblr.xyz", + "twitter.1d4.us", + "uelfte.club", + "underscore.world", + "unsafe.space", + "unsocial.pztrn.name", + "va11hal.la", + "vampire.estate", + "veenus.art", + "veenus.art", + "voice.masto.host", + "voluntaryism.club", + "vulpine.club", + "wagesofsinisdeath.com", + "waifu.social", + "waifuappreciation.club", + "warc.space", + "weeaboo.space", + "weedis.life", + "weirder.earth", + "welovela.in", + "wetfish.space", + "whitespashe.uk", + "witches.live", + "witches.town", + "wogan.im", + "woofer.alfter.us", + "wrestlr.social", + "wrongthink.net", + "www.misanthropebazaar.com", + "wxw.moe", + "x0r.stream", + "xa0.uk", + "xn--6r8h.tk", + "xoldie.com", + "yang.social", + "yarr.io", + "yeehaw.town", + "yeet.social", + "yiff.rocks", + "yorishiro.space", + "youkai.town", + "zerohack.xyz", + "zion-techs.com", + "zomglol.wtf", +} diff --git a/storage/tootstore.go b/storage/tootstore.go new file mode 100644 index 0000000..d55fd22 --- /dev/null +++ b/storage/tootstore.go @@ -0,0 +1,110 @@ +package storage + +import "errors" +import "io/ioutil" +import "path/filepath" +import "os" +import "strings" +import "sync" + +import "github.com/sneak/feta/toot" + +// TootStorageBackend is the interface to which storage backends must +// conform for storing toots +type TootStorageBackend interface { + TootExists(t toot.Toot) bool + StoreToot(t toot.Toot) error + StoreToots(tc []*toot.Toot) error +} + +// TootFSStorage is a TootStorageBackend that writes to the local +// filesystem. +type TootFSStorage struct { + root string +} + +// NewTootFSStorage returns a *TootFSStorage for writing toots to the +// local filesystem +func NewTootFSStorage(root string) *TootFSStorage { + ts := new(TootFSStorage) + ts.root = root + return ts +} + +// StoreToots writes a slice of pointers to toots to disk +func (ts *TootFSStorage) StoreToots(tc []*toot.Toot) error { + var returnErrors []string + for _, item := range tc { + err := ts.StoreToot(*item) + if err != nil { + returnErrors = append(returnErrors, err.Error()) + continue + } + } + if len(returnErrors) == 0 { + return nil + } + return errors.New(strings.Join(returnErrors, "; ")) +} + +// TootExists checks to see if we have already written a toot to disk or +// not. Note that the ingester de-dupes with a table in memory so that this +// will only really get used on app restarts +func (ts *TootFSStorage) TootExists(t toot.Toot) bool { + path := t.DiskStoragePath() + full := ts.root + "/" + path + _, err := os.Stat(full) + if os.IsNotExist(err) { + return false + } + return true +} + +// StoreToot writes a single toot to disk +func (ts *TootFSStorage) StoreToot(t toot.Toot) error { + path := t.DiskStoragePath() + full := ts.root + "/" + path + dir := filepath.Dir(full) + err := os.MkdirAll(dir, 0755) + if err != nil { + return err + } + return ioutil.WriteFile(full, t.Original, 0644) +} + +// TootMemoryStorage is a TootStorageBackend that just stores all ingested +// toots in ram forever until the computer fills up and catches fire and explodes +type TootMemoryStorage struct { + sync.Mutex + toots map[toot.Hash]toot.Toot + //maxSize uint // FIXME support eviction +} + +// NewTootMemoryStorage returns a *TootMemoryStorage for storing toots in +// ram forever +func NewTootMemoryStorage() *TootMemoryStorage { + ts := new(TootMemoryStorage) + ts.toots = make(map[toot.Hash]toot.Toot) + return ts +} + +// StoreToot saves a single toot into an in-memory hashtable +func (ts *TootMemoryStorage) StoreToot(t toot.Toot) { + if ts.TootExists(t) { + return + } + ts.Lock() + defer ts.Unlock() + ts.toots[t.Hash] = t + return +} + +// TootExists checks to see if we have a toot in memory already +func (ts *TootMemoryStorage) TootExists(t toot.Toot) bool { + ts.Lock() + defer ts.Unlock() + if _, ok := ts.toots[t.Hash]; ok { //this syntax is so gross + return true + } + return false +} diff --git a/toot.go b/toot.go deleted file mode 100644 index 7a58330..0000000 --- a/toot.go +++ /dev/null @@ -1,11 +0,0 @@ -package feta - -//import "github.com/rs/zerolog/log" - -type toot struct { -} - -func newToot(input []byte) *toot { - t := new(toot) - return t -} diff --git a/toot/toot.go b/toot/toot.go new file mode 100644 index 0000000..83a1e08 --- /dev/null +++ b/toot/toot.go @@ -0,0 +1,117 @@ +package toot + +import "fmt" +import "encoding/json" +import "errors" +import "strings" +import "github.com/sneak/feta/jsonapis" + +//import "github.com/davecgh/go-spew/spew" +import "github.com/rs/zerolog/log" + +//import "encoding/hex" +import mh "github.com/multiformats/go-multihash" +import mhopts "github.com/multiformats/go-multihash/opts" + +// Hash is a type for storing a string-based base58 multihash of a +// toot's identity +type Hash string + +// Toot is an object we use internally for storing a discovered toot +type Toot struct { + Original []byte + Parsed *jsonapis.APISerializedToot + Hash Hash + FromHost string +} + +// NewTootCollectionFromMastodonAPIResponse takes a byte array from a masto +// api response and provides you with a nice array of pointers to parsed +// toots +func NewTootCollectionFromMastodonAPIResponse(in []byte, hostname string) ([]*Toot, error) { + var rt []json.RawMessage + err := json.Unmarshal(in, &rt) + if err != nil { + return nil, errors.New("unable to parse api response") + } + + var tc []*Toot + + // iterate over rawtoots from api + for _, item := range rt { + parsed := new(jsonapis.APISerializedToot) + err := json.Unmarshal(item, parsed) + if err != nil { + log.Error().Msg("unable to parse toot, skipping") + continue + } + t := new(Toot) + t.Parsed = parsed + o, err := item.MarshalJSON() + if err != nil { + panic(err) + } + t.Original = o + t.FromHost = hostname + t.calcHash() + tc = append(tc, t) + } + return tc, nil +} + +func (t *Toot) String() string { + return fmt.Sprintf("%#v", t) +} + +func (t *Toot) multiHash(in []byte) string { + opts := new(mhopts.Options) + opts.Algorithm = "sha2-256" + opts.Encoding = "base58" + var found bool + opts.AlgorithmCode, found = mh.Names[opts.Algorithm] + if !found { + panic("oops") + } + opts.Length = mh.DefaultLengths[opts.AlgorithmCode] + r := strings.NewReader(string(in)) + h, err := opts.Multihash(r) + if err != nil { + panic(err) + } + return h.B58String() +} + +// DiskStoragePath is a helper function on a Toot that allows it to provide +// a storage path on disk. This should probably be moved into the FSStorage +// backend instead. FIXME +// It's here because it's a pure function that just formats its own toot attributes +// into a string. +func (t *Toot) DiskStoragePath() string { + // FIXME make this error if fields are missing + // '/YYYYMMDD/example.com/username/YYYY-MM-DD.HHMMSS.username@fromHost.multihash.json' + return fmt.Sprintf("%s/%s/%s/%s.%s@%s.%s.json", + t.Parsed.CreatedAt.Format("20060102"), + strings.ToLower(t.FromHost), + t.Parsed.Account.Acct, + t.Parsed.CreatedAt.Format("2006-01-02.150405"), + t.Parsed.Account.Acct, + strings.ToLower(t.FromHost), + t.Hash, + ) +} + +func (t *Toot) identityHashInput() string { + return fmt.Sprintf( + "%s.%s.%s.%s.%s", + t.Parsed.Account.URL, + t.Parsed.CreatedAt, + t.Parsed.ID, + t.Parsed.Content, + strings.ToLower(t.FromHost), + ) +} + +func (t *Toot) calcHash() { + hi := t.identityHashInput() + t.Hash = Hash(t.multiHash([]byte(hi))) +}