starting work on fetch

This commit is contained in:
Jeffrey Paul 2019-12-14 08:34:13 -08:00
parent ada7e38a95
commit 7ae15aa18d
6 changed files with 148 additions and 31 deletions

View File

@ -106,10 +106,10 @@ func (f *Process) runForever() int {
f.api = new(fetaAPIServer) f.api = new(fetaAPIServer)
f.api.setFeta(f) // api needs to get to us to access data f.api.setFeta(f) // api needs to get to us to access data
f.locator.addInstanceNotificationChannel(newInstanceHostnameNotifications) f.locator.setInstanceNotificationChannel(newInstanceHostnameNotifications)
f.manager.addInstanceNotificationChannel(newInstanceHostnameNotifications) f.manager.setInstanceNotificationChannel(newInstanceHostnameNotifications)
f.manager.addTootDestination(f.ingester.getDeliveryChannel()) f.manager.setTootDestination(f.ingester.getDeliveryChannel())
// ingester goroutine: // ingester goroutine:
go f.ingester.ingest() go f.ingester.ingest()

View File

@ -5,7 +5,7 @@ import "github.com/rs/zerolog/log"
type tootIngester struct { type tootIngester struct {
inbound chan *toot inbound chan *toot
recentlySeen recentlySeen []*seenTootMemo
} }
type tootHash string type tootHash string

View File

@ -17,7 +17,7 @@ const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schem
const instanceNodeinfoTimeout = time.Second * 50 const instanceNodeinfoTimeout = time.Second * 50
const instanceHTTPTimeout = time.Second * 50 const instanceHTTPTimeout = time.Second * 120
const instanceSpiderInterval = time.Second * 120 const instanceSpiderInterval = time.Second * 120
@ -33,6 +33,7 @@ const (
type instance struct { type instance struct {
structLock sync.Mutex structLock sync.Mutex
tootDestination chan *toot
errorCount uint errorCount uint
successCount uint successCount uint
highestID int highestID int
@ -62,9 +63,11 @@ func newInstance(options ...func(i *instance)) *instance {
{Name: "BEGIN_NODEINFO_FETCH", Src: []string{"PRE_NODEINFO_FETCH"}, Dst: "FETCHING_NODEINFO"}, {Name: "BEGIN_NODEINFO_FETCH", Src: []string{"PRE_NODEINFO_FETCH"}, Dst: "FETCHING_NODEINFO"},
{Name: "GOT_NODEINFO", Src: []string{"FETCHING_NODEINFO"}, Dst: "READY_FOR_TOOTFETCH"}, {Name: "GOT_NODEINFO", Src: []string{"FETCHING_NODEINFO"}, Dst: "READY_FOR_TOOTFETCH"},
{Name: "FETCH_TIME_REACHED", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "READY_AND_DUE_FETCH"}, {Name: "FETCH_TIME_REACHED", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "READY_AND_DUE_FETCH"},
{Name: "BEGIN_TOOT_FETCH", Src: []string{"READY_AND_DUE_FETCH"}, Dst: "FETCHING"},
{Name: "WEIRD_NODE_RESPONSE", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "WEIRD_NODE"}, {Name: "WEIRD_NODE_RESPONSE", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "WEIRD_NODE"},
{Name: "EARLY_FETCH_ERROR", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "EARLY_ERROR"}, {Name: "EARLY_FETCH_ERROR", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "EARLY_ERROR"},
{Name: "TOOT_FETCH_ERROR", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "TOOT_FETCH_ERROR"}, {Name: "TOOT_FETCH_ERROR", Src: []string{"FETCHING"}, Dst: "TOOT_FETCH_ERROR"},
{Name: "TOOTS_FETCHED", Src: []string{"FETCHING"}, Dst: "READY_FOR_TOOTFETCH"},
}, },
fsm.Callbacks{ fsm.Callbacks{
"enter_state": func(e *fsm.Event) { i.fsmEnterState(e) }, "enter_state": func(e *fsm.Event) { i.fsmEnterState(e) },
@ -83,6 +86,10 @@ func (i *instance) Status() string {
return i.fsm.Current() return i.fsm.Current()
} }
func (i *instance) setTootDestination(d chan *toot) {
i.tootDestination = d
}
func (i *instance) Event(eventname string) { func (i *instance) Event(eventname string) {
i.fsmLock.Lock() i.fsmLock.Lock()
defer i.fsmLock.Unlock() defer i.fsmLock.Unlock()
@ -148,6 +155,9 @@ func (i *instance) isNowPastFetchTime() bool {
return time.Now().After(i.nextFetch) return time.Now().After(i.nextFetch)
} }
// Tick is responsible for pushing idle instance records between states.
// The instances will transition between states when doing stuff (e.g.
// investigating, fetching, et c) as well.
func (i *instance) Tick() { func (i *instance) Tick() {
if i.Status() == "READY_FOR_TOOTFETCH" { if i.Status() == "READY_FOR_TOOTFETCH" {
if i.isNowPastFetchTime() { if i.isNowPastFetchTime() {
@ -155,6 +165,8 @@ func (i *instance) Tick() {
} }
} else if i.Status() == "STATUS_UNKNOWN" { } else if i.Status() == "STATUS_UNKNOWN" {
i.Fetch() i.Fetch()
} else if i.Status() == "READY_AND_DUE_FETCH" {
i.fetchRecentToots()
} }
} }
@ -370,21 +382,66 @@ func (i *instance) fetchNodeInfo() error {
} }
} }
/* func (i *instance) fetchRecentToots() error {
func (i *Instance) fetchRecentToots() ([]byte, error) { // it turns out pleroma supports the mastodon api so we'll just use that
i.Lock() // for everything for now
impl := i.impl url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true",
i.Unlock() i.hostname)
if impl == Mastodon { var c = &http.Client{
return i.fetchRecentTootsJsonFromMastodon() Timeout: instanceHTTPTimeout,
} else if impl == Pleroma {
return i.fetchRecentTootsJsonFromPleroma()
} else {
panic("unimplemented")
} }
i.Event("BEGIN_TOOT_FETCH")
// we set the interval now to the error interval regardless here as a
// safety against bugs to avoid fetching too frequently by logic
// bug. if the fetch is successful, we will conditionally re-update the
// next fetch to now+successInterval.
i.setNextFetchAfter(instanceErrorInterval)
resp, err := c.Get(url)
if err != nil {
log.Debug().
Str("hostname", i.hostname).
Err(err).
Msgf("unable to fetch recent toots")
i.registerError()
i.Event("TOOT_FETCH_ERROR")
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Debug().
Str("hostname", i.hostname).
Err(err).
Msgf("unable to read recent toots from response")
i.registerError()
i.Event("TOOT_FETCH_ERROR")
return err
}
tootList := new(apTootList)
err = json.Unmarshal(body, &tootList)
if err != nil {
log.Error().
Str("hostname", i.hostname).
Err(err).
Msgf("unable to parse recent toot list")
i.registerError()
i.Event("TOOT_FETCH_ERROR")
return err
}
log.Info().
Str("toots", tootList.String()).
Msgf("toots")
panic("unimplemented")
} }
*/
/* /*
func (i *PleromaBackend) fetchRecentToots() ([]byte, error) { func (i *PleromaBackend) fetchRecentToots() ([]byte, error) {
@ -393,11 +450,4 @@ func (i *PleromaBackend) fetchRecentToots() ([]byte, error) {
//i.hostname) //i.hostname)
return nil, nil return nil, nil
} }
func (i *MastodonBackend) fetchRecentTootsJsonFromMastodon() ([]byte, error) {
//url :=
//fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true",
//i.hostname)
return nil, nil
}
*/ */

View File

@ -1,6 +1,7 @@
package feta package feta
import "time" import "time"
import "fmt"
// thank fuck for https://mholt.github.io/json-to-go/ otherwise // thank fuck for https://mholt.github.io/json-to-go/ otherwise
// this would have been a giant pain in the dick // this would have been a giant pain in the dick
@ -86,3 +87,68 @@ type nodeInfoWellKnownResponse struct {
Href string `json:"href"` Href string `json:"href"`
} `json:"links"` } `json:"links"`
} }
func (atl *apTootList) String() string {
return fmt.Sprintf("%+v", atl)
}
type apTootList []struct {
Account struct {
Acct string `json:"acct"`
Avatar string `json:"avatar"`
AvatarStatic string `json:"avatar_static"`
Bot bool `json:"bot"`
CreatedAt time.Time `json:"created_at"`
DisplayName string `json:"display_name"`
Fields []interface{} `json:"fields"`
FollowersCount int `json:"followers_count"`
FollowingCount int `json:"following_count"`
Header string `json:"header"`
HeaderStatic string `json:"header_static"`
ID string `json:"id"`
Locked bool `json:"locked"`
Note string `json:"note"`
Source struct {
Fields []interface{} `json:"fields"`
Note string `json:"note"`
Sensitive bool `json:"sensitive"`
} `json:"source"`
StatusesCount int `json:"statuses_count"`
URL string `json:"url"`
Username string `json:"username"`
} `json:"account"`
Application struct {
Name string `json:"name"`
Website interface{} `json:"website"`
} `json:"application"`
Bookmarked bool `json:"bookmarked"`
Card interface{} `json:"card"`
Content string `json:"content"`
CreatedAt time.Time `json:"created_at"`
Favourited bool `json:"favourited"`
FavouritesCount int `json:"favourites_count"`
ID string `json:"id"`
InReplyToAccountID string `json:"in_reply_to_account_id"`
InReplyToID string `json:"in_reply_to_id"`
Language interface{} `json:"language"`
MediaAttachments []interface{} `json:"media_attachments"`
Mentions []struct {
Acct string `json:"acct"`
ID string `json:"id"`
URL string `json:"url"`
Username string `json:"username"`
} `json:"mentions"`
Muted bool `json:"muted"`
Pinned bool `json:"pinned"`
Poll interface{} `json:"poll"`
Reblog interface{} `json:"reblog"`
Reblogged bool `json:"reblogged"`
ReblogsCount int `json:"reblogs_count"`
RepliesCount int `json:"replies_count"`
Sensitive bool `json:"sensitive"`
SpoilerText string `json:"spoiler_text"`
Tags []interface{} `json:"tags"`
URI string `json:"uri"`
URL string `json:"url"`
Visibility string `json:"visibility"`
}

View File

@ -56,7 +56,7 @@ func (il *InstanceLocator) unlock() {
il.mu.Unlock() il.mu.Unlock()
} }
func (il *InstanceLocator) addInstanceNotificationChannel(via chan InstanceHostname) { func (il *InstanceLocator) setInstanceNotificationChannel(via chan InstanceHostname) {
il.lock() il.lock()
defer il.unlock() defer il.unlock()
il.reportInstanceVia = via il.reportInstanceVia = via

View File

@ -19,7 +19,7 @@ type InstanceManager struct {
mu sync.Mutex mu sync.Mutex
instances map[InstanceHostname]*instance instances map[InstanceHostname]*instance
newInstanceNotifications chan InstanceHostname newInstanceNotifications chan InstanceHostname
newToots chan *toot tootDestination chan *toot
startup time.Time startup time.Time
hostAdderSemaphore chan bool hostAdderSemaphore chan bool
} }
@ -31,8 +31,8 @@ func newInstanceManager() *InstanceManager {
return i return i
} }
func (im *InstanceManager) addTootDestination(td chan *toot) { func (im *InstanceManager) setTootDestination(td chan *toot) {
im.newToots = td im.tootDestination = td
} }
func (im *InstanceManager) logCaller(msg string) { func (im *InstanceManager) logCaller(msg string) {
@ -67,7 +67,7 @@ func (im *InstanceManager) unlock() {
im.mu.Unlock() im.mu.Unlock()
} }
func (im *InstanceManager) addInstanceNotificationChannel(via chan InstanceHostname) { func (im *InstanceManager) setInstanceNotificationChannel(via chan InstanceHostname) {
im.lock() im.lock()
defer im.unlock() defer im.unlock()
im.newInstanceNotifications = via im.newInstanceNotifications = via
@ -129,6 +129,7 @@ func (im *InstanceManager) addInstanceByHostname(newhn InstanceHostname) {
i := newInstance(func(x *instance) { i := newInstance(func(x *instance) {
x.hostname = string(newhn) x.hostname = string(newhn)
x.setTootDestination(im.tootDestination)
}) })
// we do node detection under the addLock to avoid thundering // we do node detection under the addLock to avoid thundering
// on startup // on startup