You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
feta/instance/instance.go

477 lines
12 KiB

package instance
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"sync"
"time"
"git.eeqj.de/sneak/feta/jsonapis"
"git.eeqj.de/sneak/feta/toot"
"github.com/google/uuid"
"github.com/looplab/fsm"
"github.com/rs/zerolog/log"
)
//import "github.com/gin-gonic/gin"
const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0"
const instanceNodeinfoTimeout = time.Second * 50
const instanceHTTPTimeout = time.Second * 120
const instanceSpiderInterval = time.Second * 120
const instanceErrorInterval = time.Second * 60 * 30
// Instance stores all the information we know about an instance
type Instance struct {
Disabled bool
ErrorCount uint
FSM *fsm.FSM
Fetching bool
HighestID uint
Hostname string
Identified bool
Implementation string
InitialFSMState string
NextFetch time.Time
LastError string
NodeInfoURL string
ServerImplementationString string
ServerVersionString string
SuccessCount uint
UUID uuid.UUID
fetchingLock sync.Mutex
fsmLock sync.Mutex
structLock sync.Mutex
tootDestination chan *toot.Toot
}
// New returns a new instance, argument is a function that operates on the
// new instance
func New(options ...func(i *Instance)) *Instance {
i := new(Instance)
i.UUID = uuid.New()
i.setNextFetchAfter(1 * time.Second)
i.InitialFSMState = "STATUS_UNKNOWN"
for _, opt := range options {
opt(i)
}
i.FSM = fsm.NewFSM(
i.InitialFSMState,
fsm.Events{
{Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"},
{Name: "GOT_NODEINFO_URL", Src: []string{"FETCHING_NODEINFO_URL"}, Dst: "PRE_NODEINFO_FETCH"},
{Name: "BEGIN_NODEINFO_FETCH", Src: []string{"PRE_NODEINFO_FETCH"}, Dst: "FETCHING_NODEINFO"},
{Name: "GOT_NODEINFO", Src: []string{"FETCHING_NODEINFO"}, Dst: "READY_FOR_TOOTFETCH"},
{Name: "FETCH_TIME_REACHED", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "READY_AND_DUE_FETCH"},
{Name: "BEGIN_TOOT_FETCH", Src: []string{"READY_AND_DUE_FETCH"}, Dst: "FETCHING"},
{Name: "WEIRD_NODE_RESPONSE", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "WEIRD_NODE"},
{Name: "EARLY_FETCH_ERROR", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "EARLY_ERROR"},
{Name: "TOOT_FETCH_ERROR", Src: []string{"FETCHING"}, Dst: "TOOT_FETCH_ERROR"},
{Name: "TOOTS_FETCHED", Src: []string{"FETCHING"}, Dst: "READY_FOR_TOOTFETCH"},
},
fsm.Callbacks{
"enter_state": func(e *fsm.Event) { i.fsmEnterState(e) },
},
)
return i
}
// Status returns the instance's state in the FSM
func (i *Instance) Status() string {
i.fsmLock.Lock()
defer i.fsmLock.Unlock()
return i.FSM.Current()
}
// SetTootDestination takes a channel from the manager that all toots
// fetched from this instance should be pushed into. The instance is not
// responsible for deduplication, it should shove all toots on every fetch
// into the channel.
func (i *Instance) SetTootDestination(d chan *toot.Toot) {
i.tootDestination = d
}
// Event is the method that alters the FSM
func (i *Instance) Event(eventname string) {
i.fsmLock.Lock()
defer i.fsmLock.Unlock()
i.FSM.Event(eventname)
}
func (i *Instance) fsmEnterState(e *fsm.Event) {
log.Debug().
Str("hostname", i.Hostname).
Str("state", e.Dst).
Msg("instance changed state")
}
// Lock locks the instance's mutex for reading/writing from the structure
func (i *Instance) Lock() {
i.structLock.Lock()
}
// Unlock unlocks the instance's mutex for reading/writing from the structure
func (i *Instance) Unlock() {
i.structLock.Unlock()
}
func (i *Instance) bumpFetch() {
i.Lock()
defer i.Unlock()
i.NextFetch = time.Now().Add(120 * time.Second)
}
func (i *Instance) setNextFetchAfter(d time.Duration) {
i.Lock()
defer i.Unlock()
i.NextFetch = time.Now().Add(d)
}
// Fetch prepares an instance for fetching. Bad name, fix it.
// FIXME(sneak)
func (i *Instance) Fetch() {
i.fetchingLock.Lock()
defer i.fetchingLock.Unlock()
i.setNextFetchAfter(instanceErrorInterval)
err := i.DetectNodeTypeIfNecessary()
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msg("unable to fetch instance metadata")
return
}
i.setNextFetchAfter(instanceSpiderInterval)
log.Info().
Str("hostname", i.Hostname).
Msg("instance now ready for fetch")
}
// FIXME rename this function
func (i *Instance) dueForFetch() bool {
// this just checks FSM state, the ticker must update it and do time
// calcs
if i.Status() == "READY_AND_DUE_FETCH" {
return true
}
return false
}
func (i *Instance) isNowPastFetchTime() bool {
return time.Now().After(i.NextFetch)
}
// Tick is responsible for pushing idle instance records between states.
// The instances will transition between states when doing stuff (e.g.
// investigating, fetching, et c) as well.
func (i *Instance) Tick() {
if i.Status() == "READY_FOR_TOOTFETCH" {
if i.isNowPastFetchTime() {
i.Event("FETCH_TIME_REACHED")
}
} else if i.Status() == "STATUS_UNKNOWN" {
i.Fetch()
} else if i.Status() == "READY_AND_DUE_FETCH" {
i.fetchRecentToots()
}
}
func (i *Instance) nodeIdentified() bool {
i.Lock()
defer i.Unlock()
if i.Implementation != "" {
return true
}
return false
}
// DetectNodeTypeIfNecessary does some network requests if the node is as
// yet unidenfitied. No-op otherwise.
func (i *Instance) DetectNodeTypeIfNecessary() error {
if !i.nodeIdentified() {
return i.fetchNodeInfo()
}
return nil
}
func (i *Instance) registerError() {
i.Lock()
defer i.Unlock()
i.ErrorCount++
}
func (i *Instance) registerSuccess() {
i.Lock()
defer i.Unlock()
i.SuccessCount++
}
// Up returns true if the success count is >0
func (i *Instance) Up() bool {
i.Lock()
defer i.Unlock()
return i.SuccessCount > 0
}
func (i *Instance) fetchNodeInfoURL() error {
url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.Hostname)
var c = &http.Client{
Timeout: instanceNodeinfoTimeout,
}
log.Debug().
Str("url", url).
Str("hostname", i.Hostname).
Msg("fetching nodeinfo reference URL")
i.Event("BEGIN_NODEINFO_URL_FETCH")
resp, err := c.Get(url)
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msg("unable to fetch nodeinfo, node is down?")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msg("unable to read nodeinfo")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
nir := new(jsonapis.NodeInfoWellKnownResponse)
err = json.Unmarshal(body, &nir)
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msg("unable to parse nodeinfo, node is weird")
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return err
}
for _, item := range nir.Links {
if item.Rel == nodeInfoSchemaVersionTwoName {
log.Debug().
Str("hostname", i.Hostname).
Str("nodeinfourl", item.Href).
Msg("success fetching url for nodeinfo")
i.Lock()
i.NodeInfoURL = item.Href
i.Unlock()
i.registerSuccess()
i.Event("GOT_NODEINFO_URL")
return nil
}
log.Debug().
Str("hostname", i.Hostname).
Str("item-rel", item.Rel).
Str("item-href", item.Href).
Msg("nodeinfo entry")
}
log.Error().
Str("hostname", i.Hostname).
Msg("incomplete nodeinfo")
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return errors.New("incomplete nodeinfo")
}
func (i *Instance) fetchNodeInfo() error {
err := i.fetchNodeInfoURL()
if err != nil {
return err
}
var c = &http.Client{
Timeout: instanceNodeinfoTimeout,
}
//FIXME make sure the nodeinfourl is on the same domain as the instance
//hostname
i.Lock()
url := i.NodeInfoURL
i.Unlock()
i.Event("BEGIN_NODEINFO_FETCH")
resp, err := c.Get(url)
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msgf("unable to fetch nodeinfo data")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Error().
Str("hostname", i.Hostname).
Err(err).
Msgf("unable to read nodeinfo data")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
ni := new(jsonapis.NodeInfoVersionTwoSchema)
err = json.Unmarshal(body, &ni)
if err != nil {
log.Error().
Str("hostname", i.Hostname).
Err(err).
Msgf("unable to parse nodeinfo")
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return err
}
log.Debug().
Str("serverVersion", ni.Software.Version).
Str("software", ni.Software.Name).
Str("hostname", i.Hostname).
Str("nodeInfoURL", i.NodeInfoURL).
Msg("received nodeinfo from instance")
i.Lock()
i.ServerVersionString = ni.Software.Version
i.ServerImplementationString = ni.Software.Name
ni.Software.Name = strings.ToLower(ni.Software.Name)
if ni.Software.Name == "pleroma" {
log.Debug().
Str("hostname", i.Hostname).
Str("software", ni.Software.Name).
Msg("detected server software")
i.Identified = true
i.Implementation = "pleroma"
i.Unlock()
i.registerSuccess()
i.Event("GOT_NODEINFO")
return nil
} else if ni.Software.Name == "mastodon" {
log.Debug().
Str("hostname", i.Hostname).
Str("software", ni.Software.Name).
Msg("detected server software")
i.Identified = true
i.Implementation = "mastodon"
i.Unlock()
i.registerSuccess()
i.Event("GOT_NODEINFO")
return nil
} else {
log.Error().
Str("hostname", i.Hostname).
Str("software", ni.Software.Name).
Msg("FIXME unknown server implementation")
i.Unlock()
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return errors.New("unknown server implementation")
}
}
func (i *Instance) fetchRecentToots() error {
// this would have been about a billion times shorter in python
// it turns out pleroma supports the mastodon api so we'll just use that
// for everything for now
url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true",
i.Hostname)
var c = &http.Client{
Timeout: instanceHTTPTimeout,
}
i.Event("BEGIN_TOOT_FETCH")
// we set the interval now to the error interval regardless here as a
// safety against bugs to avoid fetching too frequently by logic
// bug. if the fetch is successful, we will conditionally re-update the
// next fetch to now+successInterval.
i.setNextFetchAfter(instanceErrorInterval)
resp, err := c.Get(url)
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msgf("unable to fetch recent toots")
i.registerError()
i.Event("TOOT_FETCH_ERROR")
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msgf("unable to read recent toots from response")
i.registerError()
i.Event("TOOT_FETCH_ERROR")
return err
}
tc, err := toot.NewTootCollectionFromMastodonAPIResponse(body, i.Hostname)
if err != nil {
log.Error().
Str("hostname", i.Hostname).
Err(err).
Msgf("unable to parse recent toot list")
i.registerError()
i.Event("TOOT_FETCH_ERROR")
return err
}
log.Info().
Str("hostname", i.Hostname).
Int("tootCount", len(tc)).
Msgf("got and parsed toots")
i.registerSuccess()
i.Event("TOOTS_FETCHED")
i.setNextFetchAfter(instanceSpiderInterval)
// this should go fast as either the channel is buffered bigly or the
// ingester receives fast and does its own buffering, but run it in its
// own goroutine anyway because why not
go i.sendTootsToIngester(tc)
return nil
}
func (i *Instance) sendTootsToIngester(tc []*toot.Toot) {
for _, item := range tc {
i.tootDestination <- item
}
}