feta/instance.go

404 lines
9.0 KiB
Go
Raw Normal View History

2019-11-05 02:53:11 +00:00
package feta
2019-10-24 10:38:16 +00:00
2019-11-02 06:56:17 +00:00
import "encoding/json"
import "fmt"
import "io/ioutil"
2019-11-02 06:56:17 +00:00
import "net/http"
import "strings"
2019-11-03 10:56:50 +00:00
import "sync"
2019-11-02 06:56:17 +00:00
import "time"
2019-11-03 18:00:01 +00:00
import "errors"
2019-11-02 06:56:17 +00:00
2019-11-06 07:03:42 +00:00
//import "github.com/gin-gonic/gin"
import "github.com/looplab/fsm"
2019-11-02 06:56:17 +00:00
import "github.com/rs/zerolog/log"
2019-10-24 10:38:16 +00:00
const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0"
const instanceNodeinfoTimeout = time.Second * 50
2019-11-05 23:32:09 +00:00
const instanceHTTPTimeout = time.Second * 50
const instanceSpiderInterval = time.Second * 120
const instanceErrorInterval = time.Second * 60 * 30
2019-11-06 07:03:42 +00:00
type instanceImplementation int
const (
implUnknown instanceImplementation = iota
implMastodon
implPleroma
)
2019-11-06 07:03:42 +00:00
type instance struct {
2019-11-06 00:46:52 +00:00
structLock sync.Mutex
2019-11-05 23:32:09 +00:00
errorCount uint
successCount uint
highestID int
2019-11-05 23:32:09 +00:00
hostname string
identified bool
fetching bool
2019-11-06 07:03:42 +00:00
implementation instanceImplementation
backend *instanceBackend
2019-11-05 23:32:09 +00:00
nextFetch time.Time
nodeInfoURL string
2019-11-05 23:32:09 +00:00
serverVersionString string
serverImplementationString string
fetchingLock sync.Mutex
2019-11-06 07:03:42 +00:00
fsm *fsm.FSM
fsmLock sync.Mutex
2019-10-24 10:38:16 +00:00
}
func newInstance(options ...func(i *instance)) *instance {
2019-11-06 07:03:42 +00:00
i := new(instance)
i.setNextFetchAfter(1 * time.Second)
i.fsm = fsm.NewFSM(
"STATUS_UNKNOWN",
fsm.Events{
{Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"},
{Name: "GOT_NODEINFO_URL", Src: []string{"FETCHING_NODEINFO_URL"}, Dst: "PRE_NODEINFO_FETCH"},
{Name: "BEGIN_NODEINFO_FETCH", Src: []string{"PRE_NODEINFO_FETCH"}, Dst: "FETCHING_NODEINFO"},
{Name: "GOT_NODEINFO", Src: []string{"FETCHING_NODEINFO"}, Dst: "READY_FOR_TOOTFETCH"},
{Name: "FETCH_TIME_REACHED", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "READY_AND_DUE_FETCH"},
{Name: "WEIRD_NODE_RESPONSE", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "WEIRD_NODE"},
{Name: "EARLY_FETCH_ERROR", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "EARLY_ERROR"},
{Name: "TOOT_FETCH_ERROR", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "TOOT_FETCH_ERROR"},
},
fsm.Callbacks{
"enter_state": func(e *fsm.Event) { i.fsmEnterState(e) },
},
)
for _, opt := range options {
opt(i)
}
return i
2019-10-24 11:56:44 +00:00
}
2019-11-06 07:03:42 +00:00
func (i *instance) Status() string {
i.fsmLock.Lock()
defer i.fsmLock.Unlock()
return i.fsm.Current()
2019-11-06 00:46:52 +00:00
}
2019-11-06 07:03:42 +00:00
func (i *instance) Event(eventname string) {
i.fsmLock.Lock()
defer i.fsmLock.Unlock()
i.fsm.Event(eventname)
2019-11-06 00:46:52 +00:00
}
2019-11-06 07:03:42 +00:00
func (i *instance) fsmEnterState(e *fsm.Event) {
log.Debug().
Str("hostname", i.hostname).
Str("state", e.Dst).
Msg("instance changed state")
2019-11-03 10:56:50 +00:00
}
2019-11-06 07:03:42 +00:00
func (i *instance) Lock() {
i.structLock.Lock()
2019-11-03 10:56:50 +00:00
}
2019-11-06 07:03:42 +00:00
func (i *instance) Unlock() {
i.structLock.Unlock()
}
2019-11-05 23:32:09 +00:00
2019-11-06 07:03:42 +00:00
func (i *instance) bumpFetch() {
i.Lock()
defer i.Unlock()
i.nextFetch = time.Now().Add(120 * time.Second)
2019-11-06 07:03:42 +00:00
}
func (i *instance) setNextFetchAfter(d time.Duration) {
i.Lock()
defer i.Unlock()
i.nextFetch = time.Now().Add(d)
}
func (i *instance) Fetch() {
i.fetchingLock.Lock()
defer i.fetchingLock.Unlock()
2019-11-06 00:46:52 +00:00
i.setNextFetchAfter(instanceErrorInterval)
2019-11-06 07:03:42 +00:00
err := i.detectNodeTypeIfNecessary()
2019-11-03 18:00:01 +00:00
if err != nil {
log.Debug().
2019-11-06 07:03:42 +00:00
Str("hostname", i.hostname).
2019-11-03 18:00:01 +00:00
Err(err).
Msg("unable to fetch instance metadata")
2019-10-24 11:56:44 +00:00
return
}
2019-11-03 18:00:01 +00:00
i.setNextFetchAfter(instanceSpiderInterval)
2019-11-06 07:03:42 +00:00
log.Info().Msgf("i (%s) IS NOW READY FOR FETCH", i.hostname)
2019-11-03 18:00:01 +00:00
}
2019-11-06 07:03:42 +00:00
func (i *instance) dueForFetch() bool {
// this just checks FSM state, the ticker must update it and do time
// calcs
if i.Status() == "READY_AND_DUE_FETCH" {
return true
}
return false
}
func (i *instance) isNowPastFetchTime() bool {
return time.Now().After(i.nextFetch)
}
func (i *instance) Tick() {
if i.Status() == "READY_FOR_TOOTFETCH" {
if i.isNowPastFetchTime() {
i.Event("FETCH_TIME_REACHED")
}
} else if i.Status() == "STATUS_UNKNOWN" {
i.Fetch()
2019-11-06 00:46:52 +00:00
}
2019-11-04 17:07:04 +00:00
}
2019-11-06 07:03:42 +00:00
func (i *instance) nodeIdentified() bool {
i.Lock()
defer i.Unlock()
if i.implementation > implUnknown {
2019-11-04 17:07:04 +00:00
return true
2019-11-03 18:00:01 +00:00
}
2019-11-04 17:07:04 +00:00
return false
2019-11-03 18:00:01 +00:00
}
2019-11-06 07:03:42 +00:00
func (i *instance) detectNodeTypeIfNecessary() error {
if !i.nodeIdentified() {
return i.fetchNodeInfo()
2019-12-14 15:49:35 +00:00
}
return nil
2019-10-24 11:56:44 +00:00
}
2019-11-06 07:03:42 +00:00
func (i *instance) registerError() {
i.Lock()
defer i.Unlock()
i.errorCount++
2019-11-04 17:07:04 +00:00
}
2019-11-06 07:03:42 +00:00
func (i *instance) registerSuccess() {
i.Lock()
defer i.Unlock()
i.successCount++
}
2019-11-06 07:03:42 +00:00
func (i *instance) Up() bool {
2019-11-03 18:00:01 +00:00
i.Lock()
defer i.Unlock()
return i.successCount > 0
}
2019-11-06 07:03:42 +00:00
func (i *instance) fetchNodeInfoURL() error {
2019-11-03 18:00:01 +00:00
url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname)
2019-10-24 11:56:44 +00:00
var c = &http.Client{
Timeout: instanceNodeinfoTimeout,
2019-10-24 11:56:44 +00:00
}
log.Debug().
Str("url", url).
2019-11-03 18:00:01 +00:00
Str("hostname", i.hostname).
Msg("fetching nodeinfo reference URL")
2019-11-06 07:03:42 +00:00
i.Event("BEGIN_NODEINFO_URL_FETCH")
resp, err := c.Get(url)
2019-10-24 11:56:44 +00:00
if err != nil {
log.Debug().
2019-11-03 18:00:01 +00:00
Str("hostname", i.hostname).
Err(err).
Msg("unable to fetch nodeinfo, node is down?")
i.registerError()
2019-11-06 07:03:42 +00:00
i.Event("EARLY_FETCH_ERROR")
2019-11-03 18:00:01 +00:00
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Debug().
2019-11-03 18:00:01 +00:00
Str("hostname", i.hostname).
Err(err).
Msg("unable to read nodeinfo")
i.registerError()
2019-11-06 07:03:42 +00:00
i.Event("EARLY_FETCH_ERROR")
2019-11-03 18:00:01 +00:00
return err
}
nir := new(nodeInfoWellKnownResponse)
err = json.Unmarshal(body, &nir)
if err != nil {
2019-11-03 18:00:01 +00:00
log.Debug().
Str("hostname", i.hostname).
Err(err).
Msg("unable to parse nodeinfo, node is weird")
i.registerError()
2019-11-06 07:03:42 +00:00
i.Event("WEIRD_NODE_RESPONSE")
2019-11-03 18:00:01 +00:00
return err
}
for _, item := range nir.Links {
if item.Rel == nodeInfoSchemaVersionTwoName {
2019-11-03 18:00:01 +00:00
log.Debug().
Str("hostname", i.hostname).
Str("nodeinfourl", item.Href).
Msg("success fetching url for nodeinfo")
i.Lock()
i.nodeInfoURL = item.Href
i.Unlock()
i.registerSuccess()
2019-11-06 07:03:42 +00:00
i.Event("GOT_NODEINFO_URL")
2019-11-03 18:00:01 +00:00
return nil
}
2019-11-03 18:00:01 +00:00
log.Debug().
Str("hostname", i.hostname).
Str("item-rel", item.Rel).
Str("item-href", item.Href).
2019-11-06 07:03:42 +00:00
Msg("nodeinfo entry")
}
log.Error().
2019-11-03 18:00:01 +00:00
Str("hostname", i.hostname).
Msg("incomplete nodeinfo")
i.registerError()
2019-11-06 07:03:42 +00:00
i.Event("WEIRD_NODE_RESPONSE")
2019-11-03 18:00:01 +00:00
return errors.New("incomplete nodeinfo")
}
2019-11-06 07:03:42 +00:00
func (i *instance) fetchNodeInfo() error {
2019-11-03 18:00:01 +00:00
err := i.fetchNodeInfoURL()
2019-11-03 10:56:50 +00:00
2019-11-03 18:00:01 +00:00
if err != nil {
return err
2019-10-24 11:56:44 +00:00
}
var c = &http.Client{
Timeout: instanceNodeinfoTimeout,
}
//FIXME make sure the nodeinfourl is on the same domain as the instance
//hostname
2019-11-06 00:46:52 +00:00
i.Lock()
url := i.nodeInfoURL
2019-11-06 00:46:52 +00:00
i.Unlock()
2019-11-03 10:56:50 +00:00
2019-11-06 07:03:42 +00:00
i.Event("BEGIN_NODEINFO_FETCH")
2019-11-03 10:56:50 +00:00
resp, err := c.Get(url)
if err != nil {
2019-11-03 18:00:01 +00:00
log.Debug().
Str("hostname", i.hostname).
Err(err).
Msgf("unable to fetch nodeinfo data")
i.registerError()
2019-11-06 07:03:42 +00:00
i.Event("EARLY_FETCH_ERROR")
2019-11-03 18:00:01 +00:00
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Error().
2019-11-03 18:00:01 +00:00
Str("hostname", i.hostname).
Err(err).
Msgf("unable to read nodeinfo data")
i.registerError()
2019-11-06 07:03:42 +00:00
i.Event("EARLY_FETCH_ERROR")
2019-11-03 18:00:01 +00:00
return err
}
ni := new(nodeInfoVersionTwoSchema)
err = json.Unmarshal(body, &ni)
if err != nil {
log.Error().
2019-11-03 18:00:01 +00:00
Str("hostname", i.hostname).
Err(err).
Msgf("unable to parse nodeinfo")
i.registerError()
2019-11-06 07:03:42 +00:00
i.Event("WEIRD_NODE_RESPONSE")
2019-11-03 18:00:01 +00:00
return err
}
2019-11-03 18:00:01 +00:00
log.Debug().
Str("serverVersion", ni.Software.Version).
Str("software", ni.Software.Name).
2019-11-03 18:00:01 +00:00
Str("hostname", i.hostname).
Str("nodeInfoURL", i.nodeInfoURL).
Msg("received nodeinfo from instance")
i.Lock()
2019-11-05 23:32:09 +00:00
i.serverVersionString = ni.Software.Version
i.serverImplementationString = ni.Software.Name
ni.Software.Name = strings.ToLower(ni.Software.Name)
if ni.Software.Name == "pleroma" {
2019-11-03 18:00:01 +00:00
log.Debug().
Str("hostname", i.hostname).
Str("software", ni.Software.Name).
Msg("detected server software")
i.identified = true
i.implementation = implPleroma
2019-11-05 23:32:09 +00:00
i.Unlock()
i.registerSuccess()
2019-11-06 07:03:42 +00:00
i.Event("GOT_NODEINFO")
2019-11-03 18:00:01 +00:00
return nil
} else if ni.Software.Name == "mastodon" {
2019-11-06 00:46:52 +00:00
log.Debug().
Str("hostname", i.hostname).
Str("software", ni.Software.Name).
Msg("detected server software")
i.identified = true
i.implementation = implMastodon
2019-11-05 23:32:09 +00:00
i.Unlock()
i.registerSuccess()
2019-11-06 07:03:42 +00:00
i.Event("GOT_NODEINFO")
2019-11-03 18:00:01 +00:00
return nil
} else {
log.Error().
2019-11-03 18:00:01 +00:00
Str("hostname", i.hostname).
Str("software", ni.Software.Name).
2019-11-03 18:00:01 +00:00
Msg("FIXME unknown server implementation")
2019-11-05 23:32:09 +00:00
i.Unlock()
i.registerError()
2019-11-06 07:03:42 +00:00
i.Event("WEIRD_NODE_RESPONSE")
return errors.New("unknown server implementation")
}
2019-10-24 11:56:44 +00:00
}
2019-11-03 13:17:00 +00:00
/*
2019-10-24 11:56:44 +00:00
func (i *Instance) fetchRecentToots() ([]byte, error) {
2019-11-03 10:56:50 +00:00
i.Lock()
impl := i.impl
i.Unlock()
if impl == Mastodon {
2019-10-24 11:56:44 +00:00
return i.fetchRecentTootsJsonFromMastodon()
} else if impl == Pleroma {
return i.fetchRecentTootsJsonFromPleroma()
2019-10-24 11:56:44 +00:00
} else {
2019-11-03 10:56:50 +00:00
panic("unimplemented")
2019-10-24 11:56:44 +00:00
}
}
2019-11-03 13:17:00 +00:00
*/
2019-10-24 11:56:44 +00:00
2019-11-03 13:17:00 +00:00
/*
2019-11-06 07:03:42 +00:00
func (i *PleromaBackend) fetchRecentToots() ([]byte, error) {
2019-11-03 18:00:01 +00:00
//url :=
//fmt.Sprintf("https://%s/api/statuses/public_and_external_timeline.json?count=100",
//i.hostname)
2019-10-24 11:56:44 +00:00
return nil, nil
}
2019-10-24 10:38:16 +00:00
2019-11-06 07:03:42 +00:00
func (i *MastodonBackend) fetchRecentTootsJsonFromMastodon() ([]byte, error) {
2019-11-03 18:00:01 +00:00
//url :=
//fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true",
//i.hostname)
2019-10-24 11:56:44 +00:00
return nil, nil
}
2019-11-03 13:17:00 +00:00
*/