package instance import "encoding/json" import "fmt" import "io/ioutil" import "net/http" import "strings" import "sync" import "time" import "errors" //import "github.com/gin-gonic/gin" import "github.com/looplab/fsm" import "github.com/rs/zerolog/log" import "git.eeqj.de/sneak/feta/storage" import "git.eeqj.de/sneak/feta/toot" import "git.eeqj.de/sneak/feta/jsonapis" const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0" const instanceNodeinfoTimeout = time.Second * 50 const instanceHTTPTimeout = time.Second * 120 const instanceSpiderInterval = time.Second * 120 const instanceErrorInterval = time.Second * 60 * 30 type instanceImplementation int // Hostname is a special type for holding the hostname of an // instance (string) type Hostname string const ( implUnknown instanceImplementation = iota implMastodon implPleroma ) // Instance stores all the information we know about an instance type Instance struct { structLock sync.Mutex tootDestination chan *toot.Toot ErrorCount uint SuccessCount uint highestID int Hostname string Identified bool fetching bool implementation instanceImplementation storageBackend *storage.TootStorageBackend NextFetch time.Time nodeInfoURL string ServerVersionString string ServerImplementationString string fetchingLock sync.Mutex fsm *fsm.FSM fsmLock sync.Mutex } // New returns a new instance, argument is a function that operates on the // new instance func New(options ...func(i *Instance)) *Instance { i := new(Instance) i.setNextFetchAfter(1 * time.Second) i.fsm = fsm.NewFSM( "STATUS_UNKNOWN", fsm.Events{ {Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"}, {Name: "GOT_NODEINFO_URL", Src: []string{"FETCHING_NODEINFO_URL"}, Dst: "PRE_NODEINFO_FETCH"}, {Name: "BEGIN_NODEINFO_FETCH", Src: []string{"PRE_NODEINFO_FETCH"}, Dst: "FETCHING_NODEINFO"}, {Name: "GOT_NODEINFO", Src: []string{"FETCHING_NODEINFO"}, Dst: "READY_FOR_TOOTFETCH"}, {Name: "FETCH_TIME_REACHED", Src: []string{"READY_FOR_TOOTFETCH"}, Dst: "READY_AND_DUE_FETCH"}, {Name: "BEGIN_TOOT_FETCH", Src: []string{"READY_AND_DUE_FETCH"}, Dst: "FETCHING"}, {Name: "WEIRD_NODE_RESPONSE", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "WEIRD_NODE"}, {Name: "EARLY_FETCH_ERROR", Src: []string{"FETCHING_NODEINFO_URL", "PRE_NODEINFO_FETCH", "FETCHING_NODEINFO"}, Dst: "EARLY_ERROR"}, {Name: "TOOT_FETCH_ERROR", Src: []string{"FETCHING"}, Dst: "TOOT_FETCH_ERROR"}, {Name: "TOOTS_FETCHED", Src: []string{"FETCHING"}, Dst: "READY_FOR_TOOTFETCH"}, }, fsm.Callbacks{ "enter_state": func(e *fsm.Event) { i.fsmEnterState(e) }, }, ) for _, opt := range options { opt(i) } return i } // Status returns the instance's state in the FSM func (i *Instance) Status() string { i.fsmLock.Lock() defer i.fsmLock.Unlock() return i.fsm.Current() } // SetTootDestination takes a channel from the manager that all toots // fetched from this instance should be pushed into. The instance is not // responsible for deduplication, it should shove all toots on every fetch // into the channel. func (i *Instance) SetTootDestination(d chan *toot.Toot) { i.tootDestination = d } // Event is the method that alters the FSM func (i *Instance) Event(eventname string) { i.fsmLock.Lock() defer i.fsmLock.Unlock() i.fsm.Event(eventname) } func (i *Instance) fsmEnterState(e *fsm.Event) { log.Debug(). Str("hostname", i.Hostname). Str("state", e.Dst). Msg("instance changed state") } // Lock locks the instance's mutex for reading/writing from the structure func (i *Instance) Lock() { i.structLock.Lock() } // Unlock unlocks the instance's mutex for reading/writing from the structure func (i *Instance) Unlock() { i.structLock.Unlock() } func (i *Instance) bumpFetch() { i.Lock() defer i.Unlock() i.NextFetch = time.Now().Add(120 * time.Second) } func (i *Instance) setNextFetchAfter(d time.Duration) { i.Lock() defer i.Unlock() i.NextFetch = time.Now().Add(d) } // Fetch prepares an instance for fetching. Bad name, fix it. // FIXME(sneak) func (i *Instance) Fetch() { i.fetchingLock.Lock() defer i.fetchingLock.Unlock() i.setNextFetchAfter(instanceErrorInterval) err := i.DetectNodeTypeIfNecessary() if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msg("unable to fetch instance metadata") return } i.setNextFetchAfter(instanceSpiderInterval) log.Info(). Str("hostname", i.Hostname). Msg("instance now ready for fetch") } // FIXME rename this function func (i *Instance) dueForFetch() bool { // this just checks FSM state, the ticker must update it and do time // calcs if i.Status() == "READY_AND_DUE_FETCH" { return true } return false } func (i *Instance) isNowPastFetchTime() bool { return time.Now().After(i.NextFetch) } // Tick is responsible for pushing idle instance records between states. // The instances will transition between states when doing stuff (e.g. // investigating, fetching, et c) as well. func (i *Instance) Tick() { if i.Status() == "READY_FOR_TOOTFETCH" { if i.isNowPastFetchTime() { i.Event("FETCH_TIME_REACHED") } } else if i.Status() == "STATUS_UNKNOWN" { i.Fetch() } else if i.Status() == "READY_AND_DUE_FETCH" { i.fetchRecentToots() } } func (i *Instance) nodeIdentified() bool { i.Lock() defer i.Unlock() if i.implementation > implUnknown { return true } return false } // DetectNodeTypeIfNecessary does some network requests if the node is as // yet unidenfitied. No-op otherwise. func (i *Instance) DetectNodeTypeIfNecessary() error { if !i.nodeIdentified() { return i.fetchNodeInfo() } return nil } func (i *Instance) registerError() { i.Lock() defer i.Unlock() i.ErrorCount++ } func (i *Instance) registerSuccess() { i.Lock() defer i.Unlock() i.SuccessCount++ } // Up returns true if the success count is >0 func (i *Instance) Up() bool { i.Lock() defer i.Unlock() return i.SuccessCount > 0 } func (i *Instance) fetchNodeInfoURL() error { url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.Hostname) var c = &http.Client{ Timeout: instanceNodeinfoTimeout, } log.Debug(). Str("url", url). Str("hostname", i.Hostname). Msg("fetching nodeinfo reference URL") i.Event("BEGIN_NODEINFO_URL_FETCH") resp, err := c.Get(url) if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msg("unable to fetch nodeinfo, node is down?") i.registerError() i.Event("EARLY_FETCH_ERROR") return err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msg("unable to read nodeinfo") i.registerError() i.Event("EARLY_FETCH_ERROR") return err } nir := new(jsonapis.NodeInfoWellKnownResponse) err = json.Unmarshal(body, &nir) if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msg("unable to parse nodeinfo, node is weird") i.registerError() i.Event("WEIRD_NODE_RESPONSE") return err } for _, item := range nir.Links { if item.Rel == nodeInfoSchemaVersionTwoName { log.Debug(). Str("hostname", i.Hostname). Str("nodeinfourl", item.Href). Msg("success fetching url for nodeinfo") i.Lock() i.nodeInfoURL = item.Href i.Unlock() i.registerSuccess() i.Event("GOT_NODEINFO_URL") return nil } log.Debug(). Str("hostname", i.Hostname). Str("item-rel", item.Rel). Str("item-href", item.Href). Msg("nodeinfo entry") } log.Error(). Str("hostname", i.Hostname). Msg("incomplete nodeinfo") i.registerError() i.Event("WEIRD_NODE_RESPONSE") return errors.New("incomplete nodeinfo") } func (i *Instance) fetchNodeInfo() error { err := i.fetchNodeInfoURL() if err != nil { return err } var c = &http.Client{ Timeout: instanceNodeinfoTimeout, } //FIXME make sure the nodeinfourl is on the same domain as the instance //hostname i.Lock() url := i.nodeInfoURL i.Unlock() i.Event("BEGIN_NODEINFO_FETCH") resp, err := c.Get(url) if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msgf("unable to fetch nodeinfo data") i.registerError() i.Event("EARLY_FETCH_ERROR") return err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Error(). Str("hostname", i.Hostname). Err(err). Msgf("unable to read nodeinfo data") i.registerError() i.Event("EARLY_FETCH_ERROR") return err } ni := new(jsonapis.NodeInfoVersionTwoSchema) err = json.Unmarshal(body, &ni) if err != nil { log.Error(). Str("hostname", i.Hostname). Err(err). Msgf("unable to parse nodeinfo") i.registerError() i.Event("WEIRD_NODE_RESPONSE") return err } log.Debug(). Str("serverVersion", ni.Software.Version). Str("software", ni.Software.Name). Str("hostname", i.Hostname). Str("nodeInfoURL", i.nodeInfoURL). Msg("received nodeinfo from instance") i.Lock() i.ServerVersionString = ni.Software.Version i.ServerImplementationString = ni.Software.Name ni.Software.Name = strings.ToLower(ni.Software.Name) if ni.Software.Name == "pleroma" { log.Debug(). Str("hostname", i.Hostname). Str("software", ni.Software.Name). Msg("detected server software") i.Identified = true i.implementation = implPleroma i.Unlock() i.registerSuccess() i.Event("GOT_NODEINFO") return nil } else if ni.Software.Name == "mastodon" { log.Debug(). Str("hostname", i.Hostname). Str("software", ni.Software.Name). Msg("detected server software") i.Identified = true i.implementation = implMastodon i.Unlock() i.registerSuccess() i.Event("GOT_NODEINFO") return nil } else { log.Error(). Str("hostname", i.Hostname). Str("software", ni.Software.Name). Msg("FIXME unknown server implementation") i.Unlock() i.registerError() i.Event("WEIRD_NODE_RESPONSE") return errors.New("unknown server implementation") } } func (i *Instance) fetchRecentToots() error { // this would have been about a billion times shorter in python // it turns out pleroma supports the mastodon api so we'll just use that // for everything for now url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", i.Hostname) var c = &http.Client{ Timeout: instanceHTTPTimeout, } i.Event("BEGIN_TOOT_FETCH") // we set the interval now to the error interval regardless here as a // safety against bugs to avoid fetching too frequently by logic // bug. if the fetch is successful, we will conditionally re-update the // next fetch to now+successInterval. i.setNextFetchAfter(instanceErrorInterval) resp, err := c.Get(url) if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msgf("unable to fetch recent toots") i.registerError() i.Event("TOOT_FETCH_ERROR") return err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Debug(). Str("hostname", i.Hostname). Err(err). Msgf("unable to read recent toots from response") i.registerError() i.Event("TOOT_FETCH_ERROR") return err } tc, err := toot.NewTootCollectionFromMastodonAPIResponse(body, i.Hostname) if err != nil { log.Error(). Str("hostname", i.Hostname). Err(err). Msgf("unable to parse recent toot list") i.registerError() i.Event("TOOT_FETCH_ERROR") return err } log.Info(). Str("hostname", i.Hostname). Int("tootCount", len(tc)). Msgf("got and parsed toots") i.registerSuccess() i.Event("TOOTS_FETCHED") i.setNextFetchAfter(instanceSpiderInterval) // this should go fast as either the channel is buffered bigly or the // ingester receives fast and does its own buffering, but run it in its // own goroutine anyway because why not go i.sendTootsToIngester(tc) return nil } func (i *Instance) sendTootsToIngester(tc []*toot.Toot) { for _, item := range tc { i.tootDestination <- item } }