From 7ae15aa18d591fb3f74f47a5dfa6e2c2101a250c Mon Sep 17 00:00:00 2001 From: sneak Date: Sat, 14 Dec 2019 08:34:13 -0800 Subject: [PATCH] starting work on fetch --- feta.go | 6 ++-- ingester.go | 4 +-- instance.go | 92 +++++++++++++++++++++++++++++++++++++++++------------ jsonapis.go | 66 ++++++++++++++++++++++++++++++++++++++ locator.go | 2 +- manager.go | 9 +++--- 6 files changed, 148 insertions(+), 31 deletions(-) diff --git a/feta.go b/feta.go index 6213289..167a348 100644 --- a/feta.go +++ b/feta.go @@ -106,10 +106,10 @@ func (f *Process) runForever() int { f.api = new(fetaAPIServer) f.api.setFeta(f) // api needs to get to us to access data - f.locator.addInstanceNotificationChannel(newInstanceHostnameNotifications) - f.manager.addInstanceNotificationChannel(newInstanceHostnameNotifications) + f.locator.setInstanceNotificationChannel(newInstanceHostnameNotifications) + f.manager.setInstanceNotificationChannel(newInstanceHostnameNotifications) - f.manager.addTootDestination(f.ingester.getDeliveryChannel()) + f.manager.setTootDestination(f.ingester.getDeliveryChannel()) // ingester goroutine: go f.ingester.ingest() diff --git a/ingester.go b/ingester.go index b84a519..9f9cc3d 100644 --- a/ingester.go +++ b/ingester.go @@ -4,8 +4,8 @@ import "time" import "github.com/rs/zerolog/log" type tootIngester struct { - inbound chan *toot - recentlySeen + inbound chan *toot + recentlySeen []*seenTootMemo } type tootHash string diff --git a/instance.go b/instance.go index 74b6012..220d2d7 100644 --- a/instance.go +++ b/instance.go @@ -17,7 +17,7 @@ const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schem const instanceNodeinfoTimeout = time.Second * 50 -const instanceHTTPTimeout = time.Second * 50 +const instanceHTTPTimeout = time.Second * 120 const instanceSpiderInterval = time.Second * 120 @@ -33,6 +33,7 @@ const ( type instance struct { structLock sync.Mutex + tootDestination chan *toot errorCount uint successCount uint highestID int @@ -62,9 +63,11 @@ func newInstance(options ...func(i *instance)) *instance { {Name: "BEGIN_NODEINFO_FETCH", Src: []string{"PRE_NODEINFO_FETCH"}, Dst: "FETCHING_NODEINFO"}, {Name: "GOT_NODEINFO", Src: []string{"FETCHING_NODEINFO"}, Dst: "READY_FOR_TOOTFETCH"}, {Name: "FETCH_TIME_REACHED", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "READY_AND_DUE_FETCH"}, + {Name: "BEGIN_TOOT_FETCH", Src: []string{"READY_AND_DUE_FETCH"}, Dst: "FETCHING"}, {Name: "WEIRD_NODE_RESPONSE", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "WEIRD_NODE"}, {Name: "EARLY_FETCH_ERROR", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "EARLY_ERROR"}, - {Name: "TOOT_FETCH_ERROR", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "TOOT_FETCH_ERROR"}, + {Name: "TOOT_FETCH_ERROR", Src: []string{"FETCHING"}, Dst: "TOOT_FETCH_ERROR"}, + {Name: "TOOTS_FETCHED", Src: []string{"FETCHING"}, Dst: "READY_FOR_TOOTFETCH"}, }, fsm.Callbacks{ "enter_state": func(e *fsm.Event) { i.fsmEnterState(e) }, @@ -83,6 +86,10 @@ func (i *instance) Status() string { return i.fsm.Current() } +func (i *instance) setTootDestination(d chan *toot) { + i.tootDestination = d +} + func (i *instance) Event(eventname string) { i.fsmLock.Lock() defer i.fsmLock.Unlock() @@ -148,6 +155,9 @@ func (i *instance) isNowPastFetchTime() bool { return time.Now().After(i.nextFetch) } +// Tick is responsible for pushing idle instance records between states. +// The instances will transition between states when doing stuff (e.g. +// investigating, fetching, et c) as well. func (i *instance) Tick() { if i.Status() == "READY_FOR_TOOTFETCH" { if i.isNowPastFetchTime() { @@ -155,6 +165,8 @@ func (i *instance) Tick() { } } else if i.Status() == "STATUS_UNKNOWN" { i.Fetch() + } else if i.Status() == "READY_AND_DUE_FETCH" { + i.fetchRecentToots() } } @@ -370,21 +382,66 @@ func (i *instance) fetchNodeInfo() error { } } -/* -func (i *Instance) fetchRecentToots() ([]byte, error) { - i.Lock() - impl := i.impl - i.Unlock() +func (i *instance) fetchRecentToots() error { + // it turns out pleroma supports the mastodon api so we'll just use that + // for everything for now + url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", + i.hostname) - if impl == Mastodon { - return i.fetchRecentTootsJsonFromMastodon() - } else if impl == Pleroma { - return i.fetchRecentTootsJsonFromPleroma() - } else { - panic("unimplemented") + var c = &http.Client{ + Timeout: instanceHTTPTimeout, } + + i.Event("BEGIN_TOOT_FETCH") + // we set the interval now to the error interval regardless here as a + // safety against bugs to avoid fetching too frequently by logic + // bug. if the fetch is successful, we will conditionally re-update the + // next fetch to now+successInterval. + i.setNextFetchAfter(instanceErrorInterval) + resp, err := c.Get(url) + + if err != nil { + log.Debug(). + Str("hostname", i.hostname). + Err(err). + Msgf("unable to fetch recent toots") + i.registerError() + i.Event("TOOT_FETCH_ERROR") + return err + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + + if err != nil { + log.Debug(). + Str("hostname", i.hostname). + Err(err). + Msgf("unable to read recent toots from response") + i.registerError() + i.Event("TOOT_FETCH_ERROR") + return err + } + + tootList := new(apTootList) + err = json.Unmarshal(body, &tootList) + + if err != nil { + log.Error(). + Str("hostname", i.hostname). + Err(err). + Msgf("unable to parse recent toot list") + i.registerError() + i.Event("TOOT_FETCH_ERROR") + return err + } + + log.Info(). + Str("toots", tootList.String()). + Msgf("toots") + + panic("unimplemented") } -*/ /* func (i *PleromaBackend) fetchRecentToots() ([]byte, error) { @@ -393,11 +450,4 @@ func (i *PleromaBackend) fetchRecentToots() ([]byte, error) { //i.hostname) return nil, nil } - -func (i *MastodonBackend) fetchRecentTootsJsonFromMastodon() ([]byte, error) { - //url := - //fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", - //i.hostname) - return nil, nil -} */ diff --git a/jsonapis.go b/jsonapis.go index c71da00..f1b3393 100644 --- a/jsonapis.go +++ b/jsonapis.go @@ -1,6 +1,7 @@ package feta import "time" +import "fmt" // thank fuck for https://mholt.github.io/json-to-go/ otherwise // this would have been a giant pain in the dick @@ -86,3 +87,68 @@ type nodeInfoWellKnownResponse struct { Href string `json:"href"` } `json:"links"` } + +func (atl *apTootList) String() string { + return fmt.Sprintf("%+v", atl) +} + +type apTootList []struct { + Account struct { + Acct string `json:"acct"` + Avatar string `json:"avatar"` + AvatarStatic string `json:"avatar_static"` + Bot bool `json:"bot"` + CreatedAt time.Time `json:"created_at"` + DisplayName string `json:"display_name"` + Fields []interface{} `json:"fields"` + FollowersCount int `json:"followers_count"` + FollowingCount int `json:"following_count"` + Header string `json:"header"` + HeaderStatic string `json:"header_static"` + ID string `json:"id"` + Locked bool `json:"locked"` + Note string `json:"note"` + Source struct { + Fields []interface{} `json:"fields"` + Note string `json:"note"` + Sensitive bool `json:"sensitive"` + } `json:"source"` + StatusesCount int `json:"statuses_count"` + URL string `json:"url"` + Username string `json:"username"` + } `json:"account"` + Application struct { + Name string `json:"name"` + Website interface{} `json:"website"` + } `json:"application"` + Bookmarked bool `json:"bookmarked"` + Card interface{} `json:"card"` + Content string `json:"content"` + CreatedAt time.Time `json:"created_at"` + Favourited bool `json:"favourited"` + FavouritesCount int `json:"favourites_count"` + ID string `json:"id"` + InReplyToAccountID string `json:"in_reply_to_account_id"` + InReplyToID string `json:"in_reply_to_id"` + Language interface{} `json:"language"` + MediaAttachments []interface{} `json:"media_attachments"` + Mentions []struct { + Acct string `json:"acct"` + ID string `json:"id"` + URL string `json:"url"` + Username string `json:"username"` + } `json:"mentions"` + Muted bool `json:"muted"` + Pinned bool `json:"pinned"` + Poll interface{} `json:"poll"` + Reblog interface{} `json:"reblog"` + Reblogged bool `json:"reblogged"` + ReblogsCount int `json:"reblogs_count"` + RepliesCount int `json:"replies_count"` + Sensitive bool `json:"sensitive"` + SpoilerText string `json:"spoiler_text"` + Tags []interface{} `json:"tags"` + URI string `json:"uri"` + URL string `json:"url"` + Visibility string `json:"visibility"` +} diff --git a/locator.go b/locator.go index 3efaf22..c44cae7 100644 --- a/locator.go +++ b/locator.go @@ -56,7 +56,7 @@ func (il *InstanceLocator) unlock() { il.mu.Unlock() } -func (il *InstanceLocator) addInstanceNotificationChannel(via chan InstanceHostname) { +func (il *InstanceLocator) setInstanceNotificationChannel(via chan InstanceHostname) { il.lock() defer il.unlock() il.reportInstanceVia = via diff --git a/manager.go b/manager.go index 3cf9a28..98ddd12 100644 --- a/manager.go +++ b/manager.go @@ -19,7 +19,7 @@ type InstanceManager struct { mu sync.Mutex instances map[InstanceHostname]*instance newInstanceNotifications chan InstanceHostname - newToots chan *toot + tootDestination chan *toot startup time.Time hostAdderSemaphore chan bool } @@ -31,8 +31,8 @@ func newInstanceManager() *InstanceManager { return i } -func (im *InstanceManager) addTootDestination(td chan *toot) { - im.newToots = td +func (im *InstanceManager) setTootDestination(td chan *toot) { + im.tootDestination = td } func (im *InstanceManager) logCaller(msg string) { @@ -67,7 +67,7 @@ func (im *InstanceManager) unlock() { im.mu.Unlock() } -func (im *InstanceManager) addInstanceNotificationChannel(via chan InstanceHostname) { +func (im *InstanceManager) setInstanceNotificationChannel(via chan InstanceHostname) { im.lock() defer im.unlock() im.newInstanceNotifications = via @@ -129,6 +129,7 @@ func (im *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { i := newInstance(func(x *instance) { x.hostname = string(newhn) + x.setTootDestination(im.tootDestination) }) // we do node detection under the addLock to avoid thundering // on startup