feta/instance/instance.go

485 lines
12 KiB
Go

package instance
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"sync"
"time"
"git.eeqj.de/sneak/feta/jsonapis"
"git.eeqj.de/sneak/feta/storage"
"git.eeqj.de/sneak/feta/toot"
"github.com/google/uuid"
"github.com/looplab/fsm"
"github.com/rs/zerolog/log"
)
//import "github.com/gin-gonic/gin"
const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0"
const instanceNodeinfoTimeout = time.Second * 50
const instanceHTTPTimeout = time.Second * 120
const instanceSpiderInterval = time.Second * 120
const instanceErrorInterval = time.Second * 60 * 30
type instanceImplementation int
const (
implUnknown instanceImplementation = iota
implMastodon
implPleroma
)
// Instance stores all the information we know about an instance
type Instance struct {
Disabled bool
ErrorCount uint
FSM *fsm.FSM
Fetching bool
HighestID uint
Hostname string
UUID uuid.UUID
Identified bool
Implementation instanceImplementation
NextFetch time.Time
NodeInfoURL string
ServerImplementationString string
ServerVersionString string
SuccessCount uint
fetchingLock sync.Mutex
fsmLock sync.Mutex
storageBackend *storage.TootStorageBackend
structLock sync.Mutex
tootDestination chan *toot.Toot
}
// New returns a new instance, argument is a function that operates on the
// new instance
func New(options ...func(i *Instance)) *Instance {
i := new(Instance)
i.UUID = uuid.New()
i.setNextFetchAfter(1 * time.Second)
i.FSM = fsm.NewFSM(
"STATUS_UNKNOWN",
fsm.Events{
{Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"},
{Name: "GOT_NODEINFO_URL", Src: []string{"FETCHING_NODEINFO_URL"}, Dst: "PRE_NODEINFO_FETCH"},
{Name: "BEGIN_NODEINFO_FETCH", Src: []string{"PRE_NODEINFO_FETCH"}, Dst: "FETCHING_NODEINFO"},
{Name: "GOT_NODEINFO", Src: []string{"FETCHING_NODEINFO"}, Dst: "READY_FOR_TOOTFETCH"},
{Name: "FETCH_TIME_REACHED", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "READY_AND_DUE_FETCH"},
{Name: "BEGIN_TOOT_FETCH", Src: []string{"READY_AND_DUE_FETCH"}, Dst: "FETCHING"},
{Name: "WEIRD_NODE_RESPONSE", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "WEIRD_NODE"},
{Name: "EARLY_FETCH_ERROR", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "EARLY_ERROR"},
{Name: "TOOT_FETCH_ERROR", Src: []string{"FETCHING"}, Dst: "TOOT_FETCH_ERROR"},
{Name: "TOOTS_FETCHED", Src: []string{"FETCHING"}, Dst: "READY_FOR_TOOTFETCH"},
},
fsm.Callbacks{
"enter_state": func(e *fsm.Event) { i.fsmEnterState(e) },
},
)
for _, opt := range options {
opt(i)
}
return i
}
// Status returns the instance's state in the FSM
func (i *Instance) Status() string {
i.fsmLock.Lock()
defer i.fsmLock.Unlock()
return i.FSM.Current()
}
// SetTootDestination takes a channel from the manager that all toots
// fetched from this instance should be pushed into. The instance is not
// responsible for deduplication, it should shove all toots on every fetch
// into the channel.
func (i *Instance) SetTootDestination(d chan *toot.Toot) {
i.tootDestination = d
}
// Event is the method that alters the FSM
func (i *Instance) Event(eventname string) {
i.fsmLock.Lock()
defer i.fsmLock.Unlock()
i.FSM.Event(eventname)
}
func (i *Instance) fsmEnterState(e *fsm.Event) {
log.Debug().
Str("hostname", i.Hostname).
Str("state", e.Dst).
Msg("instance changed state")
}
// Lock locks the instance's mutex for reading/writing from the structure
func (i *Instance) Lock() {
i.structLock.Lock()
}
// Unlock unlocks the instance's mutex for reading/writing from the structure
func (i *Instance) Unlock() {
i.structLock.Unlock()
}
func (i *Instance) bumpFetch() {
i.Lock()
defer i.Unlock()
i.NextFetch = time.Now().Add(120 * time.Second)
}
func (i *Instance) setNextFetchAfter(d time.Duration) {
i.Lock()
defer i.Unlock()
i.NextFetch = time.Now().Add(d)
}
// Fetch prepares an instance for fetching. Bad name, fix it.
// FIXME(sneak)
func (i *Instance) Fetch() {
i.fetchingLock.Lock()
defer i.fetchingLock.Unlock()
i.setNextFetchAfter(instanceErrorInterval)
err := i.DetectNodeTypeIfNecessary()
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msg("unable to fetch instance metadata")
return
}
i.setNextFetchAfter(instanceSpiderInterval)
log.Info().
Str("hostname", i.Hostname).
Msg("instance now ready for fetch")
}
// FIXME rename this function
func (i *Instance) dueForFetch() bool {
// this just checks FSM state, the ticker must update it and do time
// calcs
if i.Status() == "READY_AND_DUE_FETCH" {
return true
}
return false
}
func (i *Instance) isNowPastFetchTime() bool {
return time.Now().After(i.NextFetch)
}
// Tick is responsible for pushing idle instance records between states.
// The instances will transition between states when doing stuff (e.g.
// investigating, fetching, et c) as well.
func (i *Instance) Tick() {
if i.Status() == "READY_FOR_TOOTFETCH" {
if i.isNowPastFetchTime() {
i.Event("FETCH_TIME_REACHED")
}
} else if i.Status() == "STATUS_UNKNOWN" {
i.Fetch()
} else if i.Status() == "READY_AND_DUE_FETCH" {
i.fetchRecentToots()
}
}
func (i *Instance) nodeIdentified() bool {
i.Lock()
defer i.Unlock()
if i.Implementation > implUnknown {
return true
}
return false
}
// DetectNodeTypeIfNecessary does some network requests if the node is as
// yet unidenfitied. No-op otherwise.
func (i *Instance) DetectNodeTypeIfNecessary() error {
if !i.nodeIdentified() {
return i.fetchNodeInfo()
}
return nil
}
func (i *Instance) registerError() {
i.Lock()
defer i.Unlock()
i.ErrorCount++
}
func (i *Instance) registerSuccess() {
i.Lock()
defer i.Unlock()
i.SuccessCount++
}
// Up returns true if the success count is >0
func (i *Instance) Up() bool {
i.Lock()
defer i.Unlock()
return i.SuccessCount > 0
}
func (i *Instance) fetchNodeInfoURL() error {
url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.Hostname)
var c = &http.Client{
Timeout: instanceNodeinfoTimeout,
}
log.Debug().
Str("url", url).
Str("hostname", i.Hostname).
Msg("fetching nodeinfo reference URL")
i.Event("BEGIN_NODEINFO_URL_FETCH")
resp, err := c.Get(url)
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msg("unable to fetch nodeinfo, node is down?")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msg("unable to read nodeinfo")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
nir := new(jsonapis.NodeInfoWellKnownResponse)
err = json.Unmarshal(body, &nir)
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msg("unable to parse nodeinfo, node is weird")
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return err
}
for _, item := range nir.Links {
if item.Rel == nodeInfoSchemaVersionTwoName {
log.Debug().
Str("hostname", i.Hostname).
Str("nodeinfourl", item.Href).
Msg("success fetching url for nodeinfo")
i.Lock()
i.NodeInfoURL = item.Href
i.Unlock()
i.registerSuccess()
i.Event("GOT_NODEINFO_URL")
return nil
}
log.Debug().
Str("hostname", i.Hostname).
Str("item-rel", item.Rel).
Str("item-href", item.Href).
Msg("nodeinfo entry")
}
log.Error().
Str("hostname", i.Hostname).
Msg("incomplete nodeinfo")
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return errors.New("incomplete nodeinfo")
}
func (i *Instance) fetchNodeInfo() error {
err := i.fetchNodeInfoURL()
if err != nil {
return err
}
var c = &http.Client{
Timeout: instanceNodeinfoTimeout,
}
//FIXME make sure the nodeinfourl is on the same domain as the instance
//hostname
i.Lock()
url := i.NodeInfoURL
i.Unlock()
i.Event("BEGIN_NODEINFO_FETCH")
resp, err := c.Get(url)
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msgf("unable to fetch nodeinfo data")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Error().
Str("hostname", i.Hostname).
Err(err).
Msgf("unable to read nodeinfo data")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
ni := new(jsonapis.NodeInfoVersionTwoSchema)
err = json.Unmarshal(body, &ni)
if err != nil {
log.Error().
Str("hostname", i.Hostname).
Err(err).
Msgf("unable to parse nodeinfo")
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return err
}
log.Debug().
Str("serverVersion", ni.Software.Version).
Str("software", ni.Software.Name).
Str("hostname", i.Hostname).
Str("nodeInfoURL", i.NodeInfoURL).
Msg("received nodeinfo from instance")
i.Lock()
i.ServerVersionString = ni.Software.Version
i.ServerImplementationString = ni.Software.Name
ni.Software.Name = strings.ToLower(ni.Software.Name)
if ni.Software.Name == "pleroma" {
log.Debug().
Str("hostname", i.Hostname).
Str("software", ni.Software.Name).
Msg("detected server software")
i.Identified = true
i.Implementation = implPleroma
i.Unlock()
i.registerSuccess()
i.Event("GOT_NODEINFO")
return nil
} else if ni.Software.Name == "mastodon" {
log.Debug().
Str("hostname", i.Hostname).
Str("software", ni.Software.Name).
Msg("detected server software")
i.Identified = true
i.Implementation = implMastodon
i.Unlock()
i.registerSuccess()
i.Event("GOT_NODEINFO")
return nil
} else {
log.Error().
Str("hostname", i.Hostname).
Str("software", ni.Software.Name).
Msg("FIXME unknown server implementation")
i.Unlock()
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return errors.New("unknown server implementation")
}
}
func (i *Instance) fetchRecentToots() error {
// this would have been about a billion times shorter in python
// it turns out pleroma supports the mastodon api so we'll just use that
// for everything for now
url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true",
i.Hostname)
var c = &http.Client{
Timeout: instanceHTTPTimeout,
}
i.Event("BEGIN_TOOT_FETCH")
// we set the interval now to the error interval regardless here as a
// safety against bugs to avoid fetching too frequently by logic
// bug. if the fetch is successful, we will conditionally re-update the
// next fetch to now+successInterval.
i.setNextFetchAfter(instanceErrorInterval)
resp, err := c.Get(url)
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msgf("unable to fetch recent toots")
i.registerError()
i.Event("TOOT_FETCH_ERROR")
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Debug().
Str("hostname", i.Hostname).
Err(err).
Msgf("unable to read recent toots from response")
i.registerError()
i.Event("TOOT_FETCH_ERROR")
return err
}
tc, err := toot.NewTootCollectionFromMastodonAPIResponse(body, i.Hostname)
if err != nil {
log.Error().
Str("hostname", i.Hostname).
Err(err).
Msgf("unable to parse recent toot list")
i.registerError()
i.Event("TOOT_FETCH_ERROR")
return err
}
log.Info().
Str("hostname", i.Hostname).
Int("tootCount", len(tc)).
Msgf("got and parsed toots")
i.registerSuccess()
i.Event("TOOTS_FETCHED")
i.setNextFetchAfter(instanceSpiderInterval)
// this should go fast as either the channel is buffered bigly or the
// ingester receives fast and does its own buffering, but run it in its
// own goroutine anyway because why not
go i.sendTootsToIngester(tc)
return nil
}
func (i *Instance) sendTootsToIngester(tc []*toot.Toot) {
for _, item := range tc {
i.tootDestination <- item
}
}