feta/instance.go

405 lines
9.0 KiB
Go

package feta
import "encoding/json"
import "fmt"
import "io/ioutil"
import "net/http"
import "strings"
import "sync"
import "time"
import "errors"
//import "github.com/gin-gonic/gin"
import "github.com/looplab/fsm"
import "github.com/rs/zerolog/log"
const NodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0"
const INSTANCE_NODEINFO_TIMEOUT = time.Second * 50
const INSTANCE_HTTP_TIMEOUT = time.Second * 50
const INSTANCE_SPIDER_INTERVAL = time.Second * 60
const INSTANCE_ERROR_INTERVAL = time.Second * 60 * 30
type instanceImplementation int
const (
Unknown instanceImplementation = iota
Mastodon
Pleroma
)
type instance struct {
structLock sync.Mutex
errorCount uint
successCount uint
highestId int
hostname string
identified bool
fetching bool
implementation instanceImplementation
backend *InstanceBackend
nextFetch time.Time
nodeInfoUrl string
serverVersionString string
serverImplementationString string
fetchingLock sync.Mutex
fsm *fsm.FSM
fsmLock sync.Mutex
}
func NewInstance(options ...func(i *instance)) *instance {
i := new(instance)
i.setNextFetchAfter(1 * time.Second)
i.fsm = fsm.NewFSM(
"STATUS_UNKNOWN",
fsm.Events{
{Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"},
{Name: "GOT_NODEINFO_URL", Src: []string{"FETCHING_NODEINFO_URL"}, Dst: "PRE_NODEINFO_FETCH"},
{Name: "BEGIN_NODEINFO_FETCH", Src: []string{"PRE_NODEINFO_FETCH"}, Dst: "FETCHING_NODEINFO"},
{Name: "GOT_NODEINFO", Src: []string{"FETCHING_NODEINFO"}, Dst: "READY_FOR_TOOTFETCH"},
{Name: "FETCH_TIME_REACHED", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "READY_AND_DUE_FETCH"},
{Name: "WEIRD_NODE_RESPONSE", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "WEIRD_NODE"},
{Name: "EARLY_FETCH_ERROR", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "EARLY_ERROR"},
{Name: "TOOT_FETCH_ERROR", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "TOOT_FETCH_ERROR"},
},
fsm.Callbacks{
"enter_state": func(e *fsm.Event) { i.fsmEnterState(e) },
},
)
for _, opt := range options {
opt(i)
}
return i
}
func (i *instance) Status() string {
i.fsmLock.Lock()
defer i.fsmLock.Unlock()
return i.fsm.Current()
}
func (i *instance) Event(eventname string) {
i.fsmLock.Lock()
defer i.fsmLock.Unlock()
i.fsm.Event(eventname)
}
func (i *instance) fsmEnterState(e *fsm.Event) {
log.Debug().
Str("hostname", i.hostname).
Str("state", e.Dst).
Msg("instance changed state")
}
func (i *instance) Lock() {
i.structLock.Lock()
}
func (i *instance) Unlock() {
i.structLock.Unlock()
}
func (i *instance) bumpFetch() {
i.Lock()
defer i.Unlock()
i.nextFetch = time.Now().Add(100 * time.Second)
}
func (i *instance) setNextFetchAfter(d time.Duration) {
i.Lock()
defer i.Unlock()
i.nextFetch = time.Now().Add(d)
}
func (i *instance) Fetch() {
i.fetchingLock.Lock()
defer i.fetchingLock.Unlock()
i.setNextFetchAfter(INSTANCE_ERROR_INTERVAL)
err := i.detectNodeTypeIfNecessary()
if err != nil {
log.Debug().
Str("hostname", i.hostname).
Err(err).
Msg("unable to fetch instance metadata")
return
}
i.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL)
log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", i.hostname)
}
func (i *instance) dueForFetch() bool {
// this just checks FSM state, the ticker must update it and do time
// calcs
if i.Status() == "READY_AND_DUE_FETCH" {
return true
}
return false
}
func (i *instance) isNowPastFetchTime() bool {
return time.Now().After(i.nextFetch)
}
func (i *instance) Tick() {
if i.Status() == "READY_FOR_TOOTFETCH" {
if i.isNowPastFetchTime() {
i.Event("FETCH_TIME_REACHED")
}
} else if i.Status() == "STATUS_UNKNOWN" {
i.Fetch()
}
}
func (i *instance) nodeIdentified() bool {
i.Lock()
defer i.Unlock()
if i.implementation > Unknown {
return true
}
return false
}
func (i *instance) detectNodeTypeIfNecessary() error {
if !i.nodeIdentified() {
return i.fetchNodeInfo()
} else {
return nil
}
}
func (i *instance) registerError() {
i.Lock()
defer i.Unlock()
i.errorCount++
}
func (i *instance) registerSuccess() {
i.Lock()
defer i.Unlock()
i.successCount++
}
func (i *instance) Up() bool {
i.Lock()
defer i.Unlock()
return i.successCount > 0
}
func (i *instance) fetchNodeInfoURL() error {
url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname)
var c = &http.Client{
Timeout: INSTANCE_NODEINFO_TIMEOUT,
}
log.Debug().
Str("url", url).
Str("hostname", i.hostname).
Msg("fetching nodeinfo reference URL")
i.Event("BEGIN_NODEINFO_URL_FETCH")
resp, err := c.Get(url)
if err != nil {
log.Debug().
Str("hostname", i.hostname).
Err(err).
Msg("unable to fetch nodeinfo, node is down?")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Debug().
Str("hostname", i.hostname).
Err(err).
Msg("unable to read nodeinfo")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
nir := new(NodeInfoWellKnownResponse)
err = json.Unmarshal(body, &nir)
if err != nil {
log.Debug().
Str("hostname", i.hostname).
Err(err).
Msg("unable to parse nodeinfo, node is weird")
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return err
}
for _, item := range nir.Links {
if item.Rel == NodeInfoSchemaVersionTwoName {
log.Debug().
Str("hostname", i.hostname).
Str("nodeinfourl", item.Href).
Msg("success fetching url for nodeinfo")
i.Lock()
i.nodeInfoUrl = item.Href
i.Unlock()
i.registerSuccess()
i.Event("GOT_NODEINFO_URL")
return nil
}
log.Debug().
Str("hostname", i.hostname).
Str("item-rel", item.Rel).
Str("item-href", item.Href).
Msg("nodeinfo entry")
}
log.Error().
Str("hostname", i.hostname).
Msg("incomplete nodeinfo")
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return errors.New("incomplete nodeinfo")
}
func (i *instance) fetchNodeInfo() error {
err := i.fetchNodeInfoURL()
if err != nil {
return err
}
var c = &http.Client{
Timeout: INSTANCE_NODEINFO_TIMEOUT,
}
//FIXME make sure the nodeinfourl is on the same domain as the instance
//hostname
i.Lock()
url := i.nodeInfoUrl
i.Unlock()
i.Event("BEGIN_NODEINFO_FETCH")
resp, err := c.Get(url)
if err != nil {
log.Debug().
Str("hostname", i.hostname).
Err(err).
Msgf("unable to fetch nodeinfo data")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Error().
Str("hostname", i.hostname).
Err(err).
Msgf("unable to read nodeinfo data")
i.registerError()
i.Event("EARLY_FETCH_ERROR")
return err
}
ni := new(NodeInfoVersionTwoSchema)
err = json.Unmarshal(body, &ni)
if err != nil {
log.Error().
Str("hostname", i.hostname).
Err(err).
Msgf("unable to parse nodeinfo")
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return err
}
log.Debug().
Str("serverVersion", ni.Software.Version).
Str("software", ni.Software.Name).
Str("hostname", i.hostname).
Str("nodeInfoUrl", i.nodeInfoUrl).
Msg("received nodeinfo from instance")
i.Lock()
i.serverVersionString = ni.Software.Version
i.serverImplementationString = ni.Software.Name
ni.Software.Name = strings.ToLower(ni.Software.Name)
if ni.Software.Name == "pleroma" {
log.Debug().
Str("hostname", i.hostname).
Str("software", ni.Software.Name).
Msg("detected server software")
i.identified = true
i.implementation = Pleroma
i.Unlock()
i.registerSuccess()
i.Event("GOT_NODEINFO")
return nil
} else if ni.Software.Name == "mastodon" {
log.Debug().
Str("hostname", i.hostname).
Str("software", ni.Software.Name).
Msg("detected server software")
i.identified = true
i.implementation = Mastodon
i.Unlock()
i.registerSuccess()
i.Event("GOT_NODEINFO")
return nil
} else {
log.Error().
Str("hostname", i.hostname).
Str("software", ni.Software.Name).
Msg("FIXME unknown server implementation")
i.Unlock()
i.registerError()
i.Event("WEIRD_NODE_RESPONSE")
return errors.New("unknown server implementation")
}
}
/*
func (i *Instance) fetchRecentToots() ([]byte, error) {
i.Lock()
impl := i.impl
i.Unlock()
if impl == Mastodon {
return i.fetchRecentTootsJsonFromMastodon()
} else if impl == Pleroma {
return i.fetchRecentTootsJsonFromPleroma()
} else {
panic("unimplemented")
}
}
*/
/*
func (i *PleromaBackend) fetchRecentToots() ([]byte, error) {
//url :=
//fmt.Sprintf("https://%s/api/statuses/public_and_external_timeline.json?count=100",
//i.hostname)
return nil, nil
}
func (i *MastodonBackend) fetchRecentTootsJsonFromMastodon() ([]byte, error) {
//url :=
//fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true",
//i.hostname)
return nil, nil
}
*/