making progress, almost ready to write to disk

master
Jeffrey Paul 4 years ago
parent 255554db97
commit d2bd99801d
  1. 6
      Makefile
  2. 2
      apihandlers.go
  3. 2
      apiserver.go
  4. 4
      cmd/feta/main.go
  5. 23
      feta.go
  6. 33
      ingester.go
  7. 34
      ingester/ingester.go
  8. 41
      instance.go
  9. 10
      jsonapis/helpers.go
  10. 30
      jsonapis/structures.go
  11. 11
      locator.go
  12. 24
      manager.go
  13. 16
      seeds/seeds.go
  14. 88
      storage/tootstore.go
  15. 37
      toot.go
  16. 107
      toot/toot.go

@ -12,8 +12,6 @@ IMAGENAME := sneak/$(FN)
UNAME_S := $(shell uname -s)
GOLDFLAGS += -X main.Version=$(VERSION)
GOLDFLAGS += -X main.Buildtime=$(BUILDTIME)
GOLDFLAGS += -X main.Builduser=$(BUILDUSER)@$(BUILDHOST)
GOLDFLAGS += -X main.Buildarch=$(BUILDARCH)
# osx can't statically link apparently?!
@ -39,7 +37,7 @@ clean:
build: ./$(FN)
.lintsetup:
go get -u golang.org/x/lint/golint
go get -v -u golang.org/x/lint/golint
go get -u github.com/GeertJohan/fgt
touch .lintsetup
@ -53,7 +51,7 @@ go-get:
cd cmd/$(FN) && go build -o ../../$(FN) $(GOFLAGS) .
fmt:
go fmt *.go
gofmt -s -w .
test: lint build-docker-image

@ -78,9 +78,7 @@ func (a *fetaAPIServer) getIndexHandler() http.HandlerFunc {
"goroutines": runtime.NumGoroutine(),
"goversion": runtime.Version(),
"version": a.feta.version,
"buildtime": a.feta.buildtime,
"buildarch": a.feta.buildarch,
"builduser": a.feta.builduser,
},
"instanceSummary": a.instanceSummary(),
}

@ -22,7 +22,7 @@ func (a *fetaAPIServer) setFeta(feta *Process) {
a.feta = feta
}
func (a *fetaAPIServer) serve() {
func (a *fetaAPIServer) Serve() {
if a.feta == nil {
panic("must have feta app from which to serve stats")
}

@ -6,10 +6,8 @@ import "github.com/sneak/feta"
// these are filled in at link-time by the build scripts
var Version string
var Buildtime string
var Builduser string
var Buildarch string
func main() {
os.Exit(feta.CLIEntry(Version, Buildtime, Buildarch, Builduser))
os.Exit(feta.CLIEntry(Version, Buildarch))
}

@ -9,18 +9,17 @@ import _ "github.com/jinzhu/gorm/dialects/sqlite" // required for orm
import "github.com/rs/zerolog"
import "github.com/rs/zerolog/log"
import "github.com/mattn/go-isatty"
import "github.com/sneak/feta/ingester"
// InstanceHostname is a special type for holding the hostname of an
// instance (string)
type InstanceHostname string
// CLIEntry is the main entrypoint for the feta process from the cli
func CLIEntry(version string, buildtime string, buildarch string, builduser string) int {
func CLIEntry(version string, buildarch string) int {
f := new(Process)
f.version = version
f.buildtime = buildtime
f.buildarch = buildarch
f.builduser = builduser
f.setupLogging()
return f.runForever()
}
@ -28,12 +27,10 @@ func CLIEntry(version string, buildtime string, buildarch string, builduser stri
// Process is the main structure/process of this app
type Process struct {
version string
buildtime string
buildarch string
builduser string
locator *InstanceLocator
manager *InstanceManager
ingester *tootIngester
ingester *ingester.TootIngester
api *fetaAPIServer
db *gorm.DB
startup time.Time
@ -42,9 +39,7 @@ type Process struct {
func (f *Process) identify() {
log.Info().
Str("version", f.version).
Str("buildtime", f.buildtime).
Str("buildarch", f.buildarch).
Str("builduser", f.builduser).
Msg("starting")
}
@ -101,7 +96,7 @@ func (f *Process) runForever() int {
f.locator = newInstanceLocator()
f.manager = newInstanceManager()
f.ingester = newTootIngester()
f.ingester = ingester.NewTootIngester()
f.api = new(fetaAPIServer)
f.api.setFeta(f) // api needs to get to us to access data
@ -109,18 +104,18 @@ func (f *Process) runForever() int {
f.locator.setInstanceNotificationChannel(newInstanceHostnameNotifications)
f.manager.setInstanceNotificationChannel(newInstanceHostnameNotifications)
f.manager.setTootDestination(f.ingester.getDeliveryChannel())
f.manager.setTootDestination(f.ingester.GetDeliveryChannel())
// ingester goroutine:
go f.ingester.ingest()
go f.ingester.Ingest()
// locator goroutine:
go f.locator.locate()
go f.locator.Locate()
// manager goroutine:
go f.manager.manage()
go f.manager.Manage()
go f.api.serve()
go f.api.Serve()
// this goroutine (main) does nothing until we handle signals
// FIXME(sneak)

@ -1,33 +0,0 @@
package feta
import "time"
import "github.com/rs/zerolog/log"
type tootIngester struct {
inbound chan *toot
recentlySeen []*seenTootMemo
}
type tootHash string
type seenTootMemo struct {
lastSeen time.Time
tootHash tootHash
}
func newTootIngester() *tootIngester {
ti := new(tootIngester)
ti.inbound = make(chan *toot, 1)
return ti
}
func (ti *tootIngester) getDeliveryChannel() chan *toot {
return ti.inbound
}
func (ti *tootIngester) ingest() {
log.Info().Msg("tootIngester starting")
for {
time.Sleep(1 * time.Second) // FIXME do something
}
}

@ -0,0 +1,34 @@
package ingester
import "time"
import "github.com/rs/zerolog/log"
import "github.com/sneak/feta/toot"
import "github.com/sneak/feta/storage"
type TootIngester struct {
inbound chan *toot.Toot
recentlySeen []*seenTootMemo
storageBackend *storage.TootStorageBackend
}
type seenTootMemo struct {
lastSeen time.Time
tootHash toot.TootHash
}
func NewTootIngester() *TootIngester {
ti := new(TootIngester)
ti.inbound = make(chan *toot.Toot, 1)
return ti
}
func (ti *TootIngester) GetDeliveryChannel() chan *toot.Toot {
return ti.inbound
}
func (ti *TootIngester) Ingest() {
log.Info().Msg("TootIngester starting")
for {
time.Sleep(1 * time.Second) // FIXME do something
}
}

@ -12,15 +12,14 @@ import "errors"
//import "github.com/gin-gonic/gin"
import "github.com/looplab/fsm"
import "github.com/rs/zerolog/log"
import "github.com/sneak/feta/storage"
import "github.com/sneak/feta/toot"
import "github.com/sneak/feta/jsonapis"
const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0"
const instanceNodeinfoTimeout = time.Second * 50
const instanceHTTPTimeout = time.Second * 120
const instanceSpiderInterval = time.Second * 120
const instanceErrorInterval = time.Second * 60 * 30
type instanceImplementation int
@ -33,7 +32,7 @@ const (
type instance struct {
structLock sync.Mutex
tootDestination chan *toot
tootDestination chan *toot.Toot
errorCount uint
successCount uint
highestID int
@ -42,6 +41,7 @@ type instance struct {
fetching bool
implementation instanceImplementation
backend *instanceBackend
storageBackend *storage.TootStorageBackend
nextFetch time.Time
nodeInfoURL string
serverVersionString string
@ -86,7 +86,7 @@ func (i *instance) Status() string {
return i.fsm.Current()
}
func (i *instance) setTootDestination(d chan *toot) {
func (i *instance) setTootDestination(d chan *toot.Toot) {
i.tootDestination = d
}
@ -240,7 +240,7 @@ func (i *instance) fetchNodeInfoURL() error {
return err
}
nir := new(nodeInfoWellKnownResponse)
nir := new(jsonapis.NodeInfoWellKnownResponse)
err = json.Unmarshal(body, &nir)
if err != nil {
log.Debug().
@ -324,7 +324,7 @@ func (i *instance) fetchNodeInfo() error {
return err
}
ni := new(nodeInfoVersionTwoSchema)
ni := new(jsonapis.NodeInfoVersionTwoSchema)
err = json.Unmarshal(body, &ni)
if err != nil {
log.Error().
@ -383,6 +383,8 @@ func (i *instance) fetchNodeInfo() error {
}
func (i *instance) fetchRecentToots() error {
// this would have been about a billion times shorter in python
// it turns out pleroma supports the mastodon api so we'll just use that
// for everything for now
url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true",
@ -423,8 +425,7 @@ func (i *instance) fetchRecentToots() error {
return err
}
tootList := new(apTootList)
err = json.Unmarshal(body, &tootList)
tc, err := toot.NewTootCollectionFromMastodonAPIResponse(body, i.hostname)
if err != nil {
log.Error().
@ -440,22 +441,14 @@ func (i *instance) fetchRecentToots() error {
log.Info().
Str("hostname", i.hostname).
Int("tootCount", len(*tootList)).
Int("tootCount", len(*tc)).
Msgf("got and parsed toots")
i.registerSuccess()
i.Event("TOOTS_FETCHED")
i.setNextFetchAfter(instanceSpiderInterval)
for _, x := range *tootList {
fmt.Printf("%s\n", x.Content)
}
panic("unimplemented")
}
/*
func (i *PleromaBackend) fetchRecentToots() ([]byte, error) {
//url :=
//fmt.Sprintf("https://%s/api/statuses/public_and_external_timeline.json?count=100",
//i.hostname)
return nil, nil
// FIXME stream toots to ingester attached to manager instead
//i.storeToots(tc)
panic("lol")
return nil
}
*/

@ -0,0 +1,10 @@
package jsonapis
import "fmt"
import "encoding/json"
func (atl *apTootList) String() string {
return fmt.Sprintf("%+v", atl)
}
type apTootList []json.RawMessage

@ -1,12 +1,10 @@
package feta
package jsonapis
import "time"
import "fmt"
import "encoding/json"
// thank fuck for https://mholt.github.io/json-to-go/ otherwise
// this would have been a giant pain in the dick
type mastodonIndexResponse struct {
type MastodonIndexResponse struct {
Instances []struct {
ID string `json:"_id"`
AddedAt time.Time `json:"addedAt"`
@ -50,7 +48,7 @@ type mastodonIndexResponse struct {
} `json:"instances"`
}
type pleromaIndexResponse []struct {
type PleromaIndexResponse []struct {
Domain string `json:"domain"`
Title string `json:"title"`
Thumbnail string `json:"thumbnail"`
@ -64,7 +62,7 @@ type pleromaIndexResponse []struct {
TextLimit int `json:"text_limit"`
}
type nodeInfoVersionTwoSchema struct {
type NodeInfoVersionTwoSchema struct {
Version string `json:"version"`
Software struct {
Name string `json:"name"`
@ -82,32 +80,24 @@ type nodeInfoVersionTwoSchema struct {
OpenRegistrations bool `json:"openRegistrations"`
}
type nodeInfoWellKnownResponse struct {
type NodeInfoWellKnownResponse struct {
Links []struct {
Rel string `json:"rel"`
Href string `json:"href"`
} `json:"links"`
}
func (atl *apTootList) String() string {
return fmt.Sprintf("%+v", atl)
}
type apTootList []json.RawMessage
type tootFromAPI struct {
type APISerializedToot struct {
Account struct {
Acct string `json:"acct"`
ID string `json:"id"`
URL string `json:"url"`
Username string `json:"username"`
} `json:"account"`
Content string `json:"content"`
CreatedAt time.Time `json:"created_at"`
ID string `json:"id"`
InReplyToAccountID string `json:"in_reply_to_account_id"`
InReplyToID string `json:"in_reply_to_id"`
Mentions []struct {
Content string `json:"content"`
CreatedAt time.Time `json:"created_at"`
ID string `json:"id"`
Mentions []struct {
Acct string `json:"acct"`
ID string `json:"id"`
URL string `json:"url"`

@ -8,10 +8,11 @@ import "sync"
import "github.com/rs/zerolog/log"
import "golang.org/x/sync/semaphore"
import "github.com/sneak/feta/jsonapis"
// IndexAPITimeout is the timeout for fetching json instance lists
// from the listing servers
const IndexAPITimeout = time.Second * 60
const IndexAPITimeout = time.Second * 60 * 3
// UserAgent is the user-agent string we provide to servers
var UserAgent = "feta indexer bot, sneak@sneak.berlin for feedback"
@ -84,7 +85,9 @@ func (il *InstanceLocator) durationUntilNextPleromaIndexRefresh() time.Duration
return (time.Duration(-1) * time.Now().Sub(*il.pleromaIndexNextRefresh))
}
func (il *InstanceLocator) locate() {
// Locate is the main entrypoint for the instancelocator, designed to be
// called once in its own gorutine.
func (il *InstanceLocator) Locate() {
log.Info().Msg("InstanceLocator starting")
x := time.Now()
var pleromaSemaphore = semaphore.NewWeighted(1)
@ -171,7 +174,7 @@ func (il *InstanceLocator) locateMastodon() {
il.mastodonIndexNextRefresh = &t
il.unlock()
mi := new(mastodonIndexResponse)
mi := new(jsonapis.MastodonIndexResponse)
err = json.Unmarshal(body, &mi)
if err != nil {
log.Error().Msgf("unable to parse mastodon instance list: %s", err)
@ -239,7 +242,7 @@ func (il *InstanceLocator) locatePleroma() {
il.pleromaIndexNextRefresh = &t
il.unlock()
pi := new(pleromaIndexResponse)
pi := new(jsonapis.PleromaIndexResponse)
err = json.Unmarshal(body, &pi)
if err != nil {
log.Warn().Msgf("unable to parse pleroma instance list: %s", err)

@ -6,6 +6,8 @@ import "runtime"
//import "github.com/gin-gonic/gin"
import "github.com/rs/zerolog/log"
import "github.com/sneak/feta/toot"
import "github.com/sneak/feta/seeds"
const hostDiscoveryParallelism = 20
@ -19,7 +21,7 @@ type InstanceManager struct {
mu sync.Mutex
instances map[InstanceHostname]*instance
newInstanceNotifications chan InstanceHostname
tootDestination chan *toot
tootDestination chan *toot.Toot
startup time.Time
hostAdderSemaphore chan bool
}
@ -31,7 +33,7 @@ func newInstanceManager() *InstanceManager {
return i
}
func (im *InstanceManager) setTootDestination(td chan *toot) {
func (im *InstanceManager) setTootDestination(td chan *toot.Toot) {
im.tootDestination = td
}
@ -73,13 +75,29 @@ func (im *InstanceManager) setInstanceNotificationChannel(via chan InstanceHostn
im.newInstanceNotifications = via
}
func (im *InstanceManager) manage() {
func (im *InstanceManager) receiveSeedInstanceHostnames() {
for _, x := range seeds.SeedInstances {
go func(tmp InstanceHostname) {
im.addInstanceByHostname(tmp)
}(InstanceHostname(x))
}
}
// Manage is the main entrypoint of the InstanceManager, designed to be
// called once in its own goroutine.
func (im *InstanceManager) Manage() {
log.Info().Msg("InstanceManager starting")
go func() {
im.receiveNewInstanceHostnames()
}()
im.startup = time.Now()
x := im.startup
go func() {
im.receiveSeedInstanceHostnames()
}()
for {
log.Info().Msg("InstanceManager tick")
im.managerLoop()

@ -0,0 +1,16 @@
package seeds
var SeedInstances = [...]string{
"splat.soy",
"veenus.art",
"iscute.moe",
"order.life",
"princess.cat",
"blobturtle.club",
"busshi.moe",
"thewired.xyz",
"wetfish.space",
"underscore.world",
"fedi.valkyrie.world",
"gnosis.systems",
}

@ -0,0 +1,88 @@
package storage
import "errors"
import "io/ioutil"
import "os"
import "strings"
import "sync"
import "github.com/sneak/feta/toot"
type TootStorageBackend interface {
TootExists(t toot.Toot) bool
StoreToot(t toot.Toot) error
StoreToots(tc []*toot.Toot) error
}
type TootFSStorage struct {
root string
}
func NewTootFSStorage(root string) *TootFSStorage {
ts := new(TootFSStorage)
ts.root = root
return ts
}
func (ts *TootFSStorage) StoreToots(tc []*toot.Toot) error {
var returnErrors []string
for _, item := range tc {
err := ts.StoreToot(*item)
if err != nil {
returnErrors = append(returnErrors, err.Error())
continue
}
}
if len(returnErrors) == 0 {
return nil
}
return errors.New(strings.Join(returnErrors, "; "))
}
func (ts *TootFSStorage) TootExists(t toot.Toot) bool {
path := t.DiskStoragePath()
full := ts.root + "/" + path
_, err := os.Stat(full)
if os.IsNotExist(err) {
return false
}
return true
}
func (ts *TootFSStorage) StoreToot(t toot.Toot) error {
path := t.DiskStoragePath()
full := ts.root + "/" + path
return ioutil.WriteFile(full, t.Original, 0644)
}
type TootMemoryStorage struct {
sync.Mutex
toots map[toot.TootHash]toot.Toot
//maxSize uint // FIXME support eviction
}
func NewTootMemoryStorage() *TootMemoryStorage {
ts := new(TootMemoryStorage)
ts.toots = make(map[toot.TootHash]toot.Toot)
return ts
}
func (ts *TootMemoryStorage) StoreToot(t toot.Toot) {
th := t.Hash
if ts.TootExists(th) {
return
}
ts.Lock()
defer ts.Unlock()
ts.toots[th] = t
return
}
func (ts *TootMemoryStorage) TootExists(th toot.TootHash) bool {
ts.Lock()
defer ts.Unlock()
if _, ok := ts.toots[th]; ok { //this syntax is so gross
return true
}
return false
}

@ -1,37 +0,0 @@
package feta
import "encoding/json"
type toot struct {
original *json.RawMessage
parsed *tootFromAPI
}
func newToots(input []*json.RawMessage) []*toot {
l := make([]*toot, 0)
for x := range input {
t := newToot(x)
if t != nil {
l = append(l, t)
}
}
return l
}
func newToot(input *json.RawMessage) *toot {
t := new(toot)
t.original = input
t.parsed = new(tootFromAPI)
err = json.Unmarshal(*input, t.parsed)
if err != nil {
t.parsed = nil
}
return t
}
func (t *toot) identityHashInput() string {
// id + datestamp + acct + content
}
func (t *toot) hash() tootHash {
}

@ -0,0 +1,107 @@
package toot
import "fmt"
import "encoding/json"
import "errors"
import "strings"
import "github.com/sneak/feta/jsonapis"
import "github.com/davecgh/go-spew/spew"
import "github.com/rs/zerolog/log"
//import "encoding/hex"
import mh "github.com/multiformats/go-multihash"
import mhopts "github.com/multiformats/go-multihash/opts"
type TootHash string
type Toot struct {
Original []byte
Parsed *jsonapis.APISerializedToot
Hash TootHash
FromHost string
}
func NewTootCollectionFromMastodonAPIResponse(in []byte, hostname string) (*[]Toot, error) {
var rt []json.RawMessage
err := json.Unmarshal(in, &rt)
if err != nil {
return nil, errors.New("unable to parse api response")
}
var tc []Toot
// iterate over rawtoots from api
for _, item := range rt {
parsed := new(jsonapis.APISerializedToot)
err := json.Unmarshal(item, parsed)
if err != nil {
log.Error().Msg("unable to parse toot, skipping")
continue
}
t := new(Toot)
t.Parsed = parsed
o, err := item.MarshalJSON()
if err != nil {
panic(err)
}
t.Original = o
t.FromHost = hostname
t.calcHash()
tc = append(tc, *t)
}
spew.Dump(tc)
panic("")
return &tc, nil
}
func (t *Toot) String() string {
return fmt.Sprintf("%#v", t)
}
func (t *Toot) multiHash(in []byte) string {
opts := new(mhopts.Options)
opts.Algorithm = "sha2-256"
opts.Encoding = "base58"
var found bool
opts.AlgorithmCode, found = mh.Names[opts.Algorithm]
if !found {
panic("oops")
}
opts.Length = mh.DefaultLengths[opts.AlgorithmCode]
r := strings.NewReader(string(in))
h, err := opts.Multihash(r)
if err != nil {
panic(err)
}
return h.B58String()
}
func (t *Toot) DiskStoragePath() string {
// FIXME make this error if fields are missing
// '/YYYYMMDD/example.com/username/YYYY-MM-DD.HHMMSS.username@fromHost.multihash.json'
return fmt.Sprintf("%s/%s/%s/%s.%s@%s.%s.json",
t.Parsed.CreatedAt.Format("20060102"),
strings.ToLower(t.FromHost),
t.Parsed.Account.Acct,
t.Parsed.CreatedAt.Format("2006-01-02.150405"),
t.Parsed.Account.Acct,
strings.ToLower(t.FromHost),
t.Hash,
)
}
func (t *Toot) identityHashInput() string {
return fmt.Sprintf(
"%s.%s.%s.%s.%s",
t.Parsed.Account.URL,
t.Parsed.CreatedAt,
t.Parsed.ID,
t.Parsed.Content,
strings.ToLower(t.FromHost),
)
}
func (t *Toot) calcHash() {
hi := t.identityHashInput()
t.Hash = TootHash(t.multiHash([]byte(hi)))
}
Loading…
Cancel
Save