package instance import ( "encoding/json" "errors" "fmt" "io/ioutil" "net/http" "strings" "sync" "time" "git.eeqj.de/sneak/feta/jsonapis" "git.eeqj.de/sneak/feta/toot" "github.com/google/uuid" "github.com/looplab/fsm" "github.com/rs/zerolog/log" ) const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0" const instanceNodeinfoTimeout = time.Second * 60 * 2 // 2m const instanceHTTPTimeout = time.Second * 60 * 2 // 2m const instanceSpiderInterval = time.Second * 60 * 2 // 2m const instanceErrorInterval = time.Second * 60 * 60 // 1h const instancePersistentErrorInterval = time.Second * 86400 // 1d const zeroInterval = time.Second * 0 // 0s // Instance stores all the information we know about an instance type Instance struct { Disabled bool ErrorCount uint ConsecutiveErrorCount uint FSM *fsm.FSM Fetching bool HighestID uint Hostname string Identified bool Implementation string InitialFSMState string NextFetch time.Time LastError string NodeInfoURL string ServerImplementationString string ServerVersionString string SuccessCount uint UUID uuid.UUID fetchingLock sync.Mutex fsmLock sync.Mutex structLock sync.Mutex tootDestination chan *toot.Toot } // New returns a new instance, argument is a function that operates on the // new instance func New(options ...func(i *Instance)) *Instance { i := new(Instance) i.UUID = uuid.New() i.setNextFetchAfter(1 * time.Second) i.InitialFSMState = "STATUS_UNKNOWN" for _, opt := range options { opt(i) } if i.InitialFSMState == "FETCHING" { i.InitialFSMState = "READY_FOR_TOOTFETCH" } i.FSM = fsm.NewFSM( i.InitialFSMState, fsm.Events{ {Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"}, {Name: "GOT_NODEINFO_URL", Src: []string{"FETCHING_NODEINFO_URL"}, Dst: "PRE_NODEINFO_FETCH"}, {Name: "BEGIN_NODEINFO_FETCH", Src: []string{"PRE_NODEINFO_FETCH"}, Dst: "FETCHING_NODEINFO"}, {Name: "GOT_NODEINFO", Src: []string{"FETCHING_NODEINFO"}, Dst: "READY_FOR_TOOTFETCH"}, {Name: "FETCH_TIME_REACHED", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "READY_AND_DUE_FETCH"}, {Name: "BEGIN_TOOT_FETCH", Src: []string{"READY_AND_DUE_FETCH"}, Dst: "FETCHING"}, {Name: "WEIRD_NODE_RESPONSE", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "WEIRD_NODE"}, {Name: "EARLY_FETCH_ERROR", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "EARLY_ERROR"}, {Name: "TOOT_FETCH_ERROR", Src: []string{"FETCHING"}, Dst: "TOOT_FETCH_ERROR"}, {Name: "TOOTS_FETCHED", Src: []string{"FETCHING"}, Dst: "READY_FOR_TOOTFETCH"}, {Name: "DISABLEMENT", Src: []string{"WEIRD_NODE", "EARLY_ERROR", "TOOT_FETCH_ERROR"}, Dst: "DISABLED"}, }, fsm.Callbacks{ "enter_state": func(e *fsm.Event) { i.fsmEnterState(e) }, }, ) return i } // Status returns the instance's state in the FSM func (i *Instance) Status() string { i.fsmLock.Lock() defer i.fsmLock.Unlock() return i.FSM.Current() } // SetTootDestination takes a channel from the manager that all toots // fetched from this instance should be pushed into. The instance is not // responsible for deduplication, it should shove all toots on every fetch // into the channel. func (i *Instance) SetTootDestination(d chan *toot.Toot) { i.tootDestination = d } // Event is the method that alters the FSM func (i *Instance) Event(eventname string) { i.fsmLock.Lock() defer i.fsmLock.Unlock() i.FSM.Event(eventname) } func (i *Instance) fsmEnterState(e *fsm.Event) { log.Debug(). Str("hostname", i.Hostname). Str("state", e.Dst). Msg("instance changed state") } // Lock locks the instance's mutex for reading/writing from the structure func (i *Instance) Lock() { i.structLock.Lock() } // Unlock unlocks the instance's mutex for reading/writing from the structure func (i *Instance) Unlock() { i.structLock.Unlock() } func (i *Instance) bumpFetchError() { i.Lock() probablyDead := i.ConsecutiveErrorCount > 3 shouldDisable := i.ConsecutiveErrorCount > 6 i.Unlock() if shouldDisable { // auf wiedersehen, felicia i.Lock() i.Disabled = true i.Unlock() i.Event("DISABLEMENT") return } if probablyDead { // if three consecutive fetch errors happen, only try once per day: i.setNextFetchAfter(instancePersistentErrorInterval) } else { // otherwise give them 1h i.setNextFetchAfter(instanceErrorInterval) } } func (i *Instance) bumpFetchSuccess() { i.setNextFetchAfter(instanceSpiderInterval) } func (i *Instance) scheduleFetchImmediate() { i.setNextFetchAfter(zeroInterval) } func (i *Instance) setNextFetchAfter(d time.Duration) { i.Lock() defer i.Unlock() i.NextFetch = time.Now().Add(d) } // Fetch prepares an instance for fetching. Bad name, fix it. // FIXME(sneak) func (i *Instance) Fetch() { i.fetchingLock.Lock() defer i.fetchingLock.Unlock() i.bumpFetchError() err := i.DetectNodeTypeIfNecessary() if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msg("unable to fetch instance metadata") return } i.scheduleFetchImmediate() log.Info(). Str("hostname", i.Hostname). Msg("instance now ready for fetch") } // FIXME rename this function func (i *Instance) dueForFetch() bool { // this just checks FSM state, the ticker must update it and do time // calcs if i.Status() == "READY_AND_DUE_FETCH" { return true } return false } func (i *Instance) isNowPastFetchTime() bool { return time.Now().After(i.NextFetch) } // Tick is responsible for pushing idle instance records between states. // The instances will transition between states when doing stuff (e.g. // investigating, fetching, et c) as well. func (i *Instance) Tick() { if i.Status() == "READY_FOR_TOOTFETCH" { if i.isNowPastFetchTime() { i.Event("FETCH_TIME_REACHED") } } else if i.Status() == "STATUS_UNKNOWN" { i.Fetch() } else if i.Status() == "READY_AND_DUE_FETCH" { i.fetchRecentToots() } } func (i *Instance) nodeIdentified() bool { i.Lock() defer i.Unlock() if i.Implementation != "" { return true } return false } // DetectNodeTypeIfNecessary does some network requests if the node is as // yet unidenfitied. No-op otherwise. func (i *Instance) DetectNodeTypeIfNecessary() error { if !i.nodeIdentified() { return i.fetchNodeInfo() } return nil } func (i *Instance) registerError() { i.Lock() defer i.Unlock() i.ErrorCount++ i.ConsecutiveErrorCount++ } func (i *Instance) registerSuccess() { i.Lock() defer i.Unlock() i.SuccessCount++ i.ConsecutiveErrorCount = 0 } // Up returns true if the success count is >0 func (i *Instance) Up() bool { i.Lock() defer i.Unlock() return i.SuccessCount > 0 } func (i *Instance) fetchNodeInfoURL() error { url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.Hostname) var c = &http.Client{ Timeout: instanceNodeinfoTimeout, } log.Debug(). Str("url", url). Str("hostname", i.Hostname). Msg("fetching nodeinfo reference URL") i.Event("BEGIN_NODEINFO_URL_FETCH") resp, err := c.Get(url) if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msg("unable to fetch nodeinfo, node is down?") i.registerError() i.Event("EARLY_FETCH_ERROR") return err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msg("unable to read nodeinfo") i.registerError() i.Event("EARLY_FETCH_ERROR") return err } nir := new(jsonapis.NodeInfoWellKnownResponse) err = json.Unmarshal(body, &nir) if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msg("unable to parse nodeinfo, node is weird") i.registerError() i.Event("WEIRD_NODE_RESPONSE") return err } for _, item := range nir.Links { if item.Rel == nodeInfoSchemaVersionTwoName { log.Debug(). Str("hostname", i.Hostname). Str("nodeinfourl", item.Href). Msg("success fetching url for nodeinfo") i.Lock() i.NodeInfoURL = item.Href i.Unlock() i.registerSuccess() i.Event("GOT_NODEINFO_URL") return nil } log.Debug(). Str("hostname", i.Hostname). Str("item-rel", item.Rel). Str("item-href", item.Href). Msg("nodeinfo entry") } log.Error(). Str("hostname", i.Hostname). Msg("incomplete nodeinfo") i.registerError() i.Event("WEIRD_NODE_RESPONSE") return errors.New("incomplete nodeinfo") } func (i *Instance) fetchNodeInfo() error { err := i.fetchNodeInfoURL() if err != nil { return err } var c = &http.Client{ Timeout: instanceNodeinfoTimeout, } //FIXME make sure the nodeinfourl is on the same domain as the instance //hostname i.Lock() url := i.NodeInfoURL i.Unlock() i.Event("BEGIN_NODEINFO_FETCH") resp, err := c.Get(url) if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msgf("unable to fetch nodeinfo data") i.registerError() i.Event("EARLY_FETCH_ERROR") return err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Error(). Str("hostname", i.Hostname). Err(err). Msgf("unable to read nodeinfo data") i.registerError() i.Event("EARLY_FETCH_ERROR") return err } ni := new(jsonapis.NodeInfoVersionTwoSchema) err = json.Unmarshal(body, &ni) if err != nil { log.Error(). Str("hostname", i.Hostname). Err(err). Msgf("unable to parse nodeinfo") i.registerError() i.Event("WEIRD_NODE_RESPONSE") return err } log.Debug(). Str("serverVersion", ni.Software.Version). Str("software", ni.Software.Name). Str("hostname", i.Hostname). Str("nodeInfoURL", i.NodeInfoURL). Msg("received nodeinfo from instance") i.Lock() i.ServerVersionString = ni.Software.Version i.ServerImplementationString = ni.Software.Name ni.Software.Name = strings.ToLower(ni.Software.Name) if ni.Software.Name == "pleroma" { log.Debug(). Str("hostname", i.Hostname). Str("software", ni.Software.Name). Msg("detected server software") i.Identified = true i.Implementation = "pleroma" i.Unlock() i.registerSuccess() i.Event("GOT_NODEINFO") return nil } else if ni.Software.Name == "mastodon" { log.Debug(). Str("hostname", i.Hostname). Str("software", ni.Software.Name). Msg("detected server software") i.Identified = true i.Implementation = "mastodon" i.Unlock() i.registerSuccess() i.Event("GOT_NODEINFO") return nil } else { log.Error(). Str("hostname", i.Hostname). Str("software", ni.Software.Name). Msg("FIXME unknown server implementation") i.Unlock() i.registerError() i.Event("WEIRD_NODE_RESPONSE") return errors.New("unknown server implementation") } } func (i *Instance) fetchRecentToots() error { // this would have been about a billion times shorter in python // it turns out pleroma supports the mastodon api so we'll just use that // for everything for now // FIXME would be nice to support non-https url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", i.Hostname) // FIXME support broken/expired certs var c = &http.Client{ Timeout: instanceHTTPTimeout, } i.Event("BEGIN_TOOT_FETCH") // we set the interval now to the error interval regardless here as a // safety against bugs to avoid fetching too frequently by logic // bug. if the fetch is successful, we will conditionally re-update the // next fetch to now+successInterval. i.setNextFetchAfter(instanceErrorInterval) resp, err := c.Get(url) if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msgf("unable to fetch recent toots") i.registerError() i.Event("TOOT_FETCH_ERROR") return err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msgf("unable to read recent toots from response") i.registerError() i.Event("TOOT_FETCH_ERROR") return err } tc, err := toot.NewTootCollectionFromMastodonAPIResponse(body, i.Hostname) if err != nil { log.Error(). Str("hostname", i.Hostname). Err(err). Msgf("unable to parse recent toot list") i.registerError() i.Event("TOOT_FETCH_ERROR") return err } log.Info(). Str("hostname", i.Hostname). Int("tootCount", len(tc)). Msgf("got and parsed toots") i.registerSuccess() i.Event("TOOTS_FETCHED") i.bumpFetchSuccess() // this should go fast as either the channel is buffered bigly or the // ingester receives fast and does its own buffering, but run it in its // own goroutine anyway because why not go i.sendTootsToIngester(tc) return nil } func (i *Instance) sendTootsToIngester(tc []*toot.Toot) { for _, item := range tc { i.tootDestination <- item } }