From d2bd99801dd6a51c56119445f62470090d5d61ca Mon Sep 17 00:00:00 2001 From: sneak Date: Thu, 19 Dec 2019 04:39:42 -0800 Subject: [PATCH] making progress, almost ready to write to disk --- Makefile | 6 +- apihandlers.go | 2 - apiserver.go | 2 +- cmd/feta/main.go | 4 +- feta.go | 23 +++--- ingester.go | 33 -------- ingester/ingester.go | 34 ++++++++ instance.go | 41 ++++------ jsonapis/helpers.go | 10 +++ jsonapis.go => jsonapis/structures.go | 30 +++----- locator.go | 11 ++- manager.go | 24 +++++- seeds/seeds.go | 16 ++++ storage/tootstore.go | 88 +++++++++++++++++++++ toot.go | 37 --------- toot/toot.go | 107 ++++++++++++++++++++++++++ 16 files changed, 323 insertions(+), 145 deletions(-) delete mode 100644 ingester.go create mode 100644 ingester/ingester.go create mode 100644 jsonapis/helpers.go rename jsonapis.go => jsonapis/structures.go (85%) create mode 100644 seeds/seeds.go create mode 100644 storage/tootstore.go delete mode 100644 toot.go create mode 100644 toot/toot.go diff --git a/Makefile b/Makefile index 7b1ebf2..f3a44df 100644 --- a/Makefile +++ b/Makefile @@ -12,8 +12,6 @@ IMAGENAME := sneak/$(FN) UNAME_S := $(shell uname -s) GOLDFLAGS += -X main.Version=$(VERSION) -GOLDFLAGS += -X main.Buildtime=$(BUILDTIME) -GOLDFLAGS += -X main.Builduser=$(BUILDUSER)@$(BUILDHOST) GOLDFLAGS += -X main.Buildarch=$(BUILDARCH) # osx can't statically link apparently?! @@ -39,7 +37,7 @@ clean: build: ./$(FN) .lintsetup: - go get -u golang.org/x/lint/golint + go get -v -u golang.org/x/lint/golint go get -u github.com/GeertJohan/fgt touch .lintsetup @@ -53,7 +51,7 @@ go-get: cd cmd/$(FN) && go build -o ../../$(FN) $(GOFLAGS) . fmt: - go fmt *.go + gofmt -s -w . test: lint build-docker-image diff --git a/apihandlers.go b/apihandlers.go index 027e1dc..554ac50 100644 --- a/apihandlers.go +++ b/apihandlers.go @@ -78,9 +78,7 @@ func (a *fetaAPIServer) getIndexHandler() http.HandlerFunc { "goroutines": runtime.NumGoroutine(), "goversion": runtime.Version(), "version": a.feta.version, - "buildtime": a.feta.buildtime, "buildarch": a.feta.buildarch, - "builduser": a.feta.builduser, }, "instanceSummary": a.instanceSummary(), } diff --git a/apiserver.go b/apiserver.go index fd8e91c..5053ebd 100644 --- a/apiserver.go +++ b/apiserver.go @@ -22,7 +22,7 @@ func (a *fetaAPIServer) setFeta(feta *Process) { a.feta = feta } -func (a *fetaAPIServer) serve() { +func (a *fetaAPIServer) Serve() { if a.feta == nil { panic("must have feta app from which to serve stats") } diff --git a/cmd/feta/main.go b/cmd/feta/main.go index 0826e32..14ff4b3 100644 --- a/cmd/feta/main.go +++ b/cmd/feta/main.go @@ -6,10 +6,8 @@ import "github.com/sneak/feta" // these are filled in at link-time by the build scripts var Version string -var Buildtime string -var Builduser string var Buildarch string func main() { - os.Exit(feta.CLIEntry(Version, Buildtime, Buildarch, Builduser)) + os.Exit(feta.CLIEntry(Version, Buildarch)) } diff --git a/feta.go b/feta.go index 167a348..716297d 100644 --- a/feta.go +++ b/feta.go @@ -9,18 +9,17 @@ import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm import "github.com/rs/zerolog" import "github.com/rs/zerolog/log" import "github.com/mattn/go-isatty" +import "github.com/sneak/feta/ingester" // InstanceHostname is a special type for holding the hostname of an // instance (string) type InstanceHostname string // CLIEntry is the main entrypoint for the feta process from the cli -func CLIEntry(version string, buildtime string, buildarch string, builduser string) int { +func CLIEntry(version string, buildarch string) int { f := new(Process) f.version = version - f.buildtime = buildtime f.buildarch = buildarch - f.builduser = builduser f.setupLogging() return f.runForever() } @@ -28,12 +27,10 @@ func CLIEntry(version string, buildtime string, buildarch string, builduser stri // Process is the main structure/process of this app type Process struct { version string - buildtime string buildarch string - builduser string locator *InstanceLocator manager *InstanceManager - ingester *tootIngester + ingester *ingester.TootIngester api *fetaAPIServer db *gorm.DB startup time.Time @@ -42,9 +39,7 @@ type Process struct { func (f *Process) identify() { log.Info(). Str("version", f.version). - Str("buildtime", f.buildtime). Str("buildarch", f.buildarch). - Str("builduser", f.builduser). Msg("starting") } @@ -101,7 +96,7 @@ func (f *Process) runForever() int { f.locator = newInstanceLocator() f.manager = newInstanceManager() - f.ingester = newTootIngester() + f.ingester = ingester.NewTootIngester() f.api = new(fetaAPIServer) f.api.setFeta(f) // api needs to get to us to access data @@ -109,18 +104,18 @@ func (f *Process) runForever() int { f.locator.setInstanceNotificationChannel(newInstanceHostnameNotifications) f.manager.setInstanceNotificationChannel(newInstanceHostnameNotifications) - f.manager.setTootDestination(f.ingester.getDeliveryChannel()) + f.manager.setTootDestination(f.ingester.GetDeliveryChannel()) // ingester goroutine: - go f.ingester.ingest() + go f.ingester.Ingest() // locator goroutine: - go f.locator.locate() + go f.locator.Locate() // manager goroutine: - go f.manager.manage() + go f.manager.Manage() - go f.api.serve() + go f.api.Serve() // this goroutine (main) does nothing until we handle signals // FIXME(sneak) diff --git a/ingester.go b/ingester.go deleted file mode 100644 index 9f9cc3d..0000000 --- a/ingester.go +++ /dev/null @@ -1,33 +0,0 @@ -package feta - -import "time" -import "github.com/rs/zerolog/log" - -type tootIngester struct { - inbound chan *toot - recentlySeen []*seenTootMemo -} - -type tootHash string - -type seenTootMemo struct { - lastSeen time.Time - tootHash tootHash -} - -func newTootIngester() *tootIngester { - ti := new(tootIngester) - ti.inbound = make(chan *toot, 1) - return ti -} - -func (ti *tootIngester) getDeliveryChannel() chan *toot { - return ti.inbound -} - -func (ti *tootIngester) ingest() { - log.Info().Msg("tootIngester starting") - for { - time.Sleep(1 * time.Second) // FIXME do something - } -} diff --git a/ingester/ingester.go b/ingester/ingester.go new file mode 100644 index 0000000..d9ca2cb --- /dev/null +++ b/ingester/ingester.go @@ -0,0 +1,34 @@ +package ingester + +import "time" +import "github.com/rs/zerolog/log" +import "github.com/sneak/feta/toot" +import "github.com/sneak/feta/storage" + +type TootIngester struct { + inbound chan *toot.Toot + recentlySeen []*seenTootMemo + storageBackend *storage.TootStorageBackend +} + +type seenTootMemo struct { + lastSeen time.Time + tootHash toot.TootHash +} + +func NewTootIngester() *TootIngester { + ti := new(TootIngester) + ti.inbound = make(chan *toot.Toot, 1) + return ti +} + +func (ti *TootIngester) GetDeliveryChannel() chan *toot.Toot { + return ti.inbound +} + +func (ti *TootIngester) Ingest() { + log.Info().Msg("TootIngester starting") + for { + time.Sleep(1 * time.Second) // FIXME do something + } +} diff --git a/instance.go b/instance.go index b7877cd..ffd5fcc 100644 --- a/instance.go +++ b/instance.go @@ -12,15 +12,14 @@ import "errors" //import "github.com/gin-gonic/gin" import "github.com/looplab/fsm" import "github.com/rs/zerolog/log" +import "github.com/sneak/feta/storage" +import "github.com/sneak/feta/toot" +import "github.com/sneak/feta/jsonapis" const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0" - const instanceNodeinfoTimeout = time.Second * 50 - const instanceHTTPTimeout = time.Second * 120 - const instanceSpiderInterval = time.Second * 120 - const instanceErrorInterval = time.Second * 60 * 30 type instanceImplementation int @@ -33,7 +32,7 @@ const ( type instance struct { structLock sync.Mutex - tootDestination chan *toot + tootDestination chan *toot.Toot errorCount uint successCount uint highestID int @@ -42,6 +41,7 @@ type instance struct { fetching bool implementation instanceImplementation backend *instanceBackend + storageBackend *storage.TootStorageBackend nextFetch time.Time nodeInfoURL string serverVersionString string @@ -86,7 +86,7 @@ func (i *instance) Status() string { return i.fsm.Current() } -func (i *instance) setTootDestination(d chan *toot) { +func (i *instance) setTootDestination(d chan *toot.Toot) { i.tootDestination = d } @@ -240,7 +240,7 @@ func (i *instance) fetchNodeInfoURL() error { return err } - nir := new(nodeInfoWellKnownResponse) + nir := new(jsonapis.NodeInfoWellKnownResponse) err = json.Unmarshal(body, &nir) if err != nil { log.Debug(). @@ -324,7 +324,7 @@ func (i *instance) fetchNodeInfo() error { return err } - ni := new(nodeInfoVersionTwoSchema) + ni := new(jsonapis.NodeInfoVersionTwoSchema) err = json.Unmarshal(body, &ni) if err != nil { log.Error(). @@ -383,6 +383,8 @@ func (i *instance) fetchNodeInfo() error { } func (i *instance) fetchRecentToots() error { + // this would have been about a billion times shorter in python + // it turns out pleroma supports the mastodon api so we'll just use that // for everything for now url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", @@ -423,8 +425,7 @@ func (i *instance) fetchRecentToots() error { return err } - tootList := new(apTootList) - err = json.Unmarshal(body, &tootList) + tc, err := toot.NewTootCollectionFromMastodonAPIResponse(body, i.hostname) if err != nil { log.Error(). @@ -440,22 +441,14 @@ func (i *instance) fetchRecentToots() error { log.Info(). Str("hostname", i.hostname). - Int("tootCount", len(*tootList)). + Int("tootCount", len(*tc)). Msgf("got and parsed toots") i.registerSuccess() i.Event("TOOTS_FETCHED") + i.setNextFetchAfter(instanceSpiderInterval) - for _, x := range *tootList { - fmt.Printf("%s\n", x.Content) - } - panic("unimplemented") + // FIXME stream toots to ingester attached to manager instead + //i.storeToots(tc) + panic("lol") + return nil } - -/* -func (i *PleromaBackend) fetchRecentToots() ([]byte, error) { - //url := - //fmt.Sprintf("https://%s/api/statuses/public_and_external_timeline.json?count=100", - //i.hostname) - return nil, nil -} -*/ diff --git a/jsonapis/helpers.go b/jsonapis/helpers.go new file mode 100644 index 0000000..bf533fc --- /dev/null +++ b/jsonapis/helpers.go @@ -0,0 +1,10 @@ +package jsonapis + +import "fmt" +import "encoding/json" + +func (atl *apTootList) String() string { + return fmt.Sprintf("%+v", atl) +} + +type apTootList []json.RawMessage diff --git a/jsonapis.go b/jsonapis/structures.go similarity index 85% rename from jsonapis.go rename to jsonapis/structures.go index 3799302..b436868 100644 --- a/jsonapis.go +++ b/jsonapis/structures.go @@ -1,12 +1,10 @@ -package feta +package jsonapis import "time" -import "fmt" -import "encoding/json" // thank fuck for https://mholt.github.io/json-to-go/ otherwise // this would have been a giant pain in the dick -type mastodonIndexResponse struct { +type MastodonIndexResponse struct { Instances []struct { ID string `json:"_id"` AddedAt time.Time `json:"addedAt"` @@ -50,7 +48,7 @@ type mastodonIndexResponse struct { } `json:"instances"` } -type pleromaIndexResponse []struct { +type PleromaIndexResponse []struct { Domain string `json:"domain"` Title string `json:"title"` Thumbnail string `json:"thumbnail"` @@ -64,7 +62,7 @@ type pleromaIndexResponse []struct { TextLimit int `json:"text_limit"` } -type nodeInfoVersionTwoSchema struct { +type NodeInfoVersionTwoSchema struct { Version string `json:"version"` Software struct { Name string `json:"name"` @@ -82,32 +80,24 @@ type nodeInfoVersionTwoSchema struct { OpenRegistrations bool `json:"openRegistrations"` } -type nodeInfoWellKnownResponse struct { +type NodeInfoWellKnownResponse struct { Links []struct { Rel string `json:"rel"` Href string `json:"href"` } `json:"links"` } -func (atl *apTootList) String() string { - return fmt.Sprintf("%+v", atl) -} - -type apTootList []json.RawMessage - -type tootFromAPI struct { +type APISerializedToot struct { Account struct { Acct string `json:"acct"` ID string `json:"id"` URL string `json:"url"` Username string `json:"username"` } `json:"account"` - Content string `json:"content"` - CreatedAt time.Time `json:"created_at"` - ID string `json:"id"` - InReplyToAccountID string `json:"in_reply_to_account_id"` - InReplyToID string `json:"in_reply_to_id"` - Mentions []struct { + Content string `json:"content"` + CreatedAt time.Time `json:"created_at"` + ID string `json:"id"` + Mentions []struct { Acct string `json:"acct"` ID string `json:"id"` URL string `json:"url"` diff --git a/locator.go b/locator.go index c44cae7..ae88f4b 100644 --- a/locator.go +++ b/locator.go @@ -8,10 +8,11 @@ import "sync" import "github.com/rs/zerolog/log" import "golang.org/x/sync/semaphore" +import "github.com/sneak/feta/jsonapis" // IndexAPITimeout is the timeout for fetching json instance lists // from the listing servers -const IndexAPITimeout = time.Second * 60 +const IndexAPITimeout = time.Second * 60 * 3 // UserAgent is the user-agent string we provide to servers var UserAgent = "feta indexer bot, sneak@sneak.berlin for feedback" @@ -84,7 +85,9 @@ func (il *InstanceLocator) durationUntilNextPleromaIndexRefresh() time.Duration return (time.Duration(-1) * time.Now().Sub(*il.pleromaIndexNextRefresh)) } -func (il *InstanceLocator) locate() { +// Locate is the main entrypoint for the instancelocator, designed to be +// called once in its own gorutine. +func (il *InstanceLocator) Locate() { log.Info().Msg("InstanceLocator starting") x := time.Now() var pleromaSemaphore = semaphore.NewWeighted(1) @@ -171,7 +174,7 @@ func (il *InstanceLocator) locateMastodon() { il.mastodonIndexNextRefresh = &t il.unlock() - mi := new(mastodonIndexResponse) + mi := new(jsonapis.MastodonIndexResponse) err = json.Unmarshal(body, &mi) if err != nil { log.Error().Msgf("unable to parse mastodon instance list: %s", err) @@ -239,7 +242,7 @@ func (il *InstanceLocator) locatePleroma() { il.pleromaIndexNextRefresh = &t il.unlock() - pi := new(pleromaIndexResponse) + pi := new(jsonapis.PleromaIndexResponse) err = json.Unmarshal(body, &pi) if err != nil { log.Warn().Msgf("unable to parse pleroma instance list: %s", err) diff --git a/manager.go b/manager.go index 98ddd12..39ba4bb 100644 --- a/manager.go +++ b/manager.go @@ -6,6 +6,8 @@ import "runtime" //import "github.com/gin-gonic/gin" import "github.com/rs/zerolog/log" +import "github.com/sneak/feta/toot" +import "github.com/sneak/feta/seeds" const hostDiscoveryParallelism = 20 @@ -19,7 +21,7 @@ type InstanceManager struct { mu sync.Mutex instances map[InstanceHostname]*instance newInstanceNotifications chan InstanceHostname - tootDestination chan *toot + tootDestination chan *toot.Toot startup time.Time hostAdderSemaphore chan bool } @@ -31,7 +33,7 @@ func newInstanceManager() *InstanceManager { return i } -func (im *InstanceManager) setTootDestination(td chan *toot) { +func (im *InstanceManager) setTootDestination(td chan *toot.Toot) { im.tootDestination = td } @@ -73,13 +75,29 @@ func (im *InstanceManager) setInstanceNotificationChannel(via chan InstanceHostn im.newInstanceNotifications = via } -func (im *InstanceManager) manage() { +func (im *InstanceManager) receiveSeedInstanceHostnames() { + for _, x := range seeds.SeedInstances { + go func(tmp InstanceHostname) { + im.addInstanceByHostname(tmp) + }(InstanceHostname(x)) + } +} + +// Manage is the main entrypoint of the InstanceManager, designed to be +// called once in its own goroutine. +func (im *InstanceManager) Manage() { log.Info().Msg("InstanceManager starting") go func() { im.receiveNewInstanceHostnames() }() + im.startup = time.Now() x := im.startup + + go func() { + im.receiveSeedInstanceHostnames() + }() + for { log.Info().Msg("InstanceManager tick") im.managerLoop() diff --git a/seeds/seeds.go b/seeds/seeds.go new file mode 100644 index 0000000..d532aba --- /dev/null +++ b/seeds/seeds.go @@ -0,0 +1,16 @@ +package seeds + +var SeedInstances = [...]string{ + "splat.soy", + "veenus.art", + "iscute.moe", + "order.life", + "princess.cat", + "blobturtle.club", + "busshi.moe", + "thewired.xyz", + "wetfish.space", + "underscore.world", + "fedi.valkyrie.world", + "gnosis.systems", +} diff --git a/storage/tootstore.go b/storage/tootstore.go new file mode 100644 index 0000000..668285e --- /dev/null +++ b/storage/tootstore.go @@ -0,0 +1,88 @@ +package storage + +import "errors" +import "io/ioutil" +import "os" +import "strings" +import "sync" + +import "github.com/sneak/feta/toot" + +type TootStorageBackend interface { + TootExists(t toot.Toot) bool + StoreToot(t toot.Toot) error + StoreToots(tc []*toot.Toot) error +} + +type TootFSStorage struct { + root string +} + +func NewTootFSStorage(root string) *TootFSStorage { + ts := new(TootFSStorage) + ts.root = root + return ts +} + +func (ts *TootFSStorage) StoreToots(tc []*toot.Toot) error { + var returnErrors []string + for _, item := range tc { + err := ts.StoreToot(*item) + if err != nil { + returnErrors = append(returnErrors, err.Error()) + continue + } + } + if len(returnErrors) == 0 { + return nil + } + return errors.New(strings.Join(returnErrors, "; ")) +} + +func (ts *TootFSStorage) TootExists(t toot.Toot) bool { + path := t.DiskStoragePath() + full := ts.root + "/" + path + _, err := os.Stat(full) + if os.IsNotExist(err) { + return false + } + return true +} + +func (ts *TootFSStorage) StoreToot(t toot.Toot) error { + path := t.DiskStoragePath() + full := ts.root + "/" + path + return ioutil.WriteFile(full, t.Original, 0644) +} + +type TootMemoryStorage struct { + sync.Mutex + toots map[toot.TootHash]toot.Toot + //maxSize uint // FIXME support eviction +} + +func NewTootMemoryStorage() *TootMemoryStorage { + ts := new(TootMemoryStorage) + ts.toots = make(map[toot.TootHash]toot.Toot) + return ts +} + +func (ts *TootMemoryStorage) StoreToot(t toot.Toot) { + th := t.Hash + if ts.TootExists(th) { + return + } + ts.Lock() + defer ts.Unlock() + ts.toots[th] = t + return +} + +func (ts *TootMemoryStorage) TootExists(th toot.TootHash) bool { + ts.Lock() + defer ts.Unlock() + if _, ok := ts.toots[th]; ok { //this syntax is so gross + return true + } + return false +} diff --git a/toot.go b/toot.go deleted file mode 100644 index 2018d90..0000000 --- a/toot.go +++ /dev/null @@ -1,37 +0,0 @@ -package feta - -import "encoding/json" - -type toot struct { - original *json.RawMessage - parsed *tootFromAPI -} - -func newToots(input []*json.RawMessage) []*toot { - l := make([]*toot, 0) - for x := range input { - t := newToot(x) - if t != nil { - l = append(l, t) - } - } - return l -} - -func newToot(input *json.RawMessage) *toot { - t := new(toot) - t.original = input - t.parsed = new(tootFromAPI) - err = json.Unmarshal(*input, t.parsed) - if err != nil { - t.parsed = nil - } - return t -} - -func (t *toot) identityHashInput() string { - // id + datestamp + acct + content -} - -func (t *toot) hash() tootHash { -} diff --git a/toot/toot.go b/toot/toot.go new file mode 100644 index 0000000..8158fbc --- /dev/null +++ b/toot/toot.go @@ -0,0 +1,107 @@ +package toot + +import "fmt" +import "encoding/json" +import "errors" +import "strings" +import "github.com/sneak/feta/jsonapis" +import "github.com/davecgh/go-spew/spew" +import "github.com/rs/zerolog/log" + +//import "encoding/hex" +import mh "github.com/multiformats/go-multihash" +import mhopts "github.com/multiformats/go-multihash/opts" + +type TootHash string + +type Toot struct { + Original []byte + Parsed *jsonapis.APISerializedToot + Hash TootHash + FromHost string +} + +func NewTootCollectionFromMastodonAPIResponse(in []byte, hostname string) (*[]Toot, error) { + var rt []json.RawMessage + err := json.Unmarshal(in, &rt) + if err != nil { + return nil, errors.New("unable to parse api response") + } + + var tc []Toot + + // iterate over rawtoots from api + for _, item := range rt { + parsed := new(jsonapis.APISerializedToot) + err := json.Unmarshal(item, parsed) + if err != nil { + log.Error().Msg("unable to parse toot, skipping") + continue + } + t := new(Toot) + t.Parsed = parsed + o, err := item.MarshalJSON() + if err != nil { + panic(err) + } + t.Original = o + t.FromHost = hostname + t.calcHash() + tc = append(tc, *t) + } + spew.Dump(tc) + panic("") + return &tc, nil +} + +func (t *Toot) String() string { + return fmt.Sprintf("%#v", t) +} + +func (t *Toot) multiHash(in []byte) string { + opts := new(mhopts.Options) + opts.Algorithm = "sha2-256" + opts.Encoding = "base58" + var found bool + opts.AlgorithmCode, found = mh.Names[opts.Algorithm] + if !found { + panic("oops") + } + opts.Length = mh.DefaultLengths[opts.AlgorithmCode] + r := strings.NewReader(string(in)) + h, err := opts.Multihash(r) + if err != nil { + panic(err) + } + return h.B58String() +} + +func (t *Toot) DiskStoragePath() string { + // FIXME make this error if fields are missing + // '/YYYYMMDD/example.com/username/YYYY-MM-DD.HHMMSS.username@fromHost.multihash.json' + return fmt.Sprintf("%s/%s/%s/%s.%s@%s.%s.json", + t.Parsed.CreatedAt.Format("20060102"), + strings.ToLower(t.FromHost), + t.Parsed.Account.Acct, + t.Parsed.CreatedAt.Format("2006-01-02.150405"), + t.Parsed.Account.Acct, + strings.ToLower(t.FromHost), + t.Hash, + ) +} + +func (t *Toot) identityHashInput() string { + return fmt.Sprintf( + "%s.%s.%s.%s.%s", + t.Parsed.Account.URL, + t.Parsed.CreatedAt, + t.Parsed.ID, + t.Parsed.Content, + strings.ToLower(t.FromHost), + ) +} + +func (t *Toot) calcHash() { + hi := t.identityHashInput() + t.Hash = TootHash(t.multiHash([]byte(hi))) +}