latest refactoring
This commit is contained in:
		
							parent
							
								
									58954151b5
								
							
						
					
					
						commit
						3b543fe5a5
					
				
							
								
								
									
										2
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								Makefile
									
									
									
									
									
								
							| @ -25,7 +25,7 @@ ifneq ($(UNAME_S),Darwin) | ||||
| 	GOFLAGS = -ldflags "-linkmode external -extldflags -static $(GOLDFLAGS)" | ||||
| endif | ||||
| 
 | ||||
| default: rundebug | ||||
| default: run | ||||
| 
 | ||||
| rundebug: build | ||||
| 	DEBUG=1 ./$(FN) | ||||
|  | ||||
							
								
								
									
										5
									
								
								api.go
									
									
									
									
									
								
							
							
						
						
									
										5
									
								
								api.go
									
									
									
									
									
								
							| @ -48,7 +48,9 @@ func (a *TootArchiverAPIServer) getRouter() *gin.Engine { | ||||
| 	}) | ||||
| 
 | ||||
| 	r.GET("/", func(c *gin.Context) { | ||||
| 		ir := a.archiver.locator.instanceReport() | ||||
| 		ir := a.archiver.manager.instanceReport() | ||||
| 
 | ||||
| 		il := a.archiver.manager.instanceListForApi() | ||||
| 		c.JSON(200, gin.H{ | ||||
| 			// FIXME(sneak) add more stuff here
 | ||||
| 			"status": "ok", | ||||
| @ -59,6 +61,7 @@ func (a *TootArchiverAPIServer) getRouter() *gin.Engine { | ||||
| 				"up":         ir.up, | ||||
| 				"identified": ir.identified, | ||||
| 			}, | ||||
| 			"instanceList": il, | ||||
| 		}) | ||||
| 	}) | ||||
| 
 | ||||
|  | ||||
							
								
								
									
										181
									
								
								instance.go
									
									
									
									
									
								
							
							
						
						
									
										181
									
								
								instance.go
									
									
									
									
									
								
							| @ -7,6 +7,7 @@ import "net/http" | ||||
| import "strings" | ||||
| import "sync" | ||||
| import "time" | ||||
| import "errors" | ||||
| 
 | ||||
| import "github.com/rs/zerolog/log" | ||||
| 
 | ||||
| @ -32,90 +33,123 @@ const ( | ||||
| 	InstanceStatusNone InstanceStatus = iota | ||||
| 	InstanceStatusUnknown | ||||
| 	InstanceStatusAlive | ||||
| 	InstanceStatusIdentified | ||||
| 	InstanceStatusFailure | ||||
| ) | ||||
| 
 | ||||
| type Instance struct { | ||||
| 	sync.Mutex | ||||
| 	sync.RWMutex | ||||
| 	errorCount    uint | ||||
| 	successCount  uint | ||||
| 	highestId     int | ||||
| 	hostName      string | ||||
| 	hostname      string | ||||
| 	identified    bool | ||||
| 	fetching      bool | ||||
| 	impl          InstanceImplementation | ||||
| 	backend       *InstanceBackend | ||||
| 	status        InstanceStatus | ||||
| 	nextCheck     *time.Time | ||||
| 	nextFetch     time.Time | ||||
| 	nodeInfoUrl   string | ||||
| 	serverVersion string | ||||
| } | ||||
| 
 | ||||
| func NewInstance(hostname string) *Instance { | ||||
| func NewInstance(hostname InstanceHostname) *Instance { | ||||
| 	i := new(Instance) | ||||
| 	i.hostName = hostname | ||||
| 	i.hostname = string(hostname) | ||||
| 	i.status = InstanceStatusUnknown | ||||
| 	t := time.Now().Add(-1 * time.Second) | ||||
| 	i.nextCheck = &t | ||||
| 	i.nextFetch = time.Now().Add(-1 * time.Second) | ||||
| 	// FIXME make checks detect the node type instead of in the constructor
 | ||||
| 	return i | ||||
| } | ||||
| 
 | ||||
| func (i *Instance) setNextCheck(d time.Duration) { | ||||
| func (i *Instance) setNextFetchAfter(d time.Duration) { | ||||
| 	i.Lock() | ||||
| 	defer i.Unlock() | ||||
| 	then := time.Now().Add(d) | ||||
| 	i.nextCheck = &then | ||||
| 	i.nextFetch = time.Now().Add(d) | ||||
| } | ||||
| 
 | ||||
| func (i *Instance) dueForCheck() bool { | ||||
| 	i.Lock() | ||||
| 	defer i.Unlock() | ||||
| 	return i.nextCheck.Before(time.Now()) | ||||
| func (self *Instance) setFetching(f bool) { | ||||
| 	self.Lock() | ||||
| 	defer self.Unlock() | ||||
| 	self.fetching = f | ||||
| } | ||||
| 
 | ||||
| func (i *Instance) detectNodeType() { | ||||
| 	i.Lock() | ||||
| 	if i.impl > Unknown { | ||||
| 		i.Unlock() | ||||
| func (self *Instance) Fetch() { | ||||
| 	self.setFetching(true) | ||||
| 	defer self.setFetching(false) | ||||
| 
 | ||||
| 	err := self.detectNodeTypeIfNecessary() | ||||
| 	if err != nil { | ||||
| 		self.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) | ||||
| 		log.Debug(). | ||||
| 			Str("hostname", self.hostname). | ||||
| 			Err(err). | ||||
| 			Msg("unable to fetch instance metadata") | ||||
| 		return | ||||
| 	} | ||||
| 	i.Unlock() | ||||
| 	i.fetchNodeInfo() | ||||
| 
 | ||||
| 	//self.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL)
 | ||||
| 	log.Info().Msgf("i (%s) should check for toots", self.hostname) | ||||
| } | ||||
| 
 | ||||
| func (self *Instance) dueForFetch() bool { | ||||
| 	self.RLock() | ||||
| 	defer self.RUnlock() | ||||
| 	if self.fetching { | ||||
| 		return false | ||||
| 	} | ||||
| 	return self.nextFetch.Before(time.Now()) | ||||
| } | ||||
| 
 | ||||
| func (i *Instance) detectNodeTypeIfNecessary() error { | ||||
| 	i.RLock() | ||||
| 	if i.impl > Unknown { | ||||
| 		i.RUnlock() | ||||
| 		return nil | ||||
| 	} | ||||
| 	i.RUnlock() | ||||
| 	return i.fetchNodeInfo() | ||||
| } | ||||
| 
 | ||||
| func (i *Instance) registerError() { | ||||
| 	i.setNextCheck(INSTANCE_ERROR_INTERVAL) | ||||
| 	i.setNextFetchAfter(INSTANCE_ERROR_INTERVAL) | ||||
| 	i.Lock() | ||||
| 	defer i.Unlock() | ||||
| 	i.errorCount++ | ||||
| } | ||||
| 
 | ||||
| func (i *Instance) registerSuccess() { | ||||
| 	i.setNextCheck(INSTANCE_SPIDER_INTERVAL) | ||||
| 	i.setNextFetchAfter(INSTANCE_SPIDER_INTERVAL) | ||||
| 	i.Lock() | ||||
| 	defer i.Unlock() | ||||
| 	i.successCount++ | ||||
| } | ||||
| 
 | ||||
| func (i *Instance) fetchNodeInfoURL() { | ||||
| 	url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostName) | ||||
| func (i *Instance) Up() bool { | ||||
| 	i.Lock() | ||||
| 	defer i.Unlock() | ||||
| 	return i.successCount > 0 | ||||
| } | ||||
| 
 | ||||
| func (i *Instance) fetchNodeInfoURL() error { | ||||
| 	url := fmt.Sprintf("https://%s/.well-known/nodeinfo", i.hostname) | ||||
| 	var c = &http.Client{ | ||||
| 		Timeout: INSTANCE_HTTP_TIMEOUT, | ||||
| 	} | ||||
| 
 | ||||
| 	log.Debug(). | ||||
| 		Str("url", url). | ||||
| 		Str("hostname", i.hostName). | ||||
| 		Str("hostname", i.hostname). | ||||
| 		Msg("fetching nodeinfo reference URL") | ||||
| 
 | ||||
| 	resp, err := c.Get(url) | ||||
| 	if err != nil { | ||||
| 		log.Debug(). | ||||
| 			Str("hostname", i.hostName). | ||||
| 			Str("hostname", i.hostname). | ||||
| 			Err(err). | ||||
| 			Msg("unable to fetch nodeinfo, node is down?") | ||||
| 		i.registerError() | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	defer resp.Body.Close() | ||||
| @ -123,28 +157,28 @@ func (i *Instance) fetchNodeInfoURL() { | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		log.Debug(). | ||||
| 			Str("hostname", i.hostName). | ||||
| 			Str("hostname", i.hostname). | ||||
| 			Err(err). | ||||
| 			Msg("unable to read nodeinfo") | ||||
| 		i.registerError() | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	nir := new(NodeInfoWellKnownResponse) | ||||
| 	err = json.Unmarshal(body, &nir) | ||||
| 	if err != nil { | ||||
| 		log.Error(). | ||||
| 			Str("hostname", i.hostName). | ||||
| 		log.Debug(). | ||||
| 			Str("hostname", i.hostname). | ||||
| 			Err(err). | ||||
| 			Msg("unable to parse nodeinfo, node is weird") | ||||
| 		i.registerError() | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	for _, item := range nir.Links { | ||||
| 		if item.Rel == NodeInfoSchemaVersionTwoName { | ||||
| 			log.Info(). | ||||
| 				Str("hostname", i.hostName). | ||||
| 			log.Debug(). | ||||
| 				Str("hostname", i.hostname). | ||||
| 				Str("nodeinfourl", item.Href). | ||||
| 				Msg("success fetching url for nodeinfo") | ||||
| 
 | ||||
| @ -152,32 +186,27 @@ func (i *Instance) fetchNodeInfoURL() { | ||||
| 			i.nodeInfoUrl = item.Href | ||||
| 			i.Unlock() | ||||
| 			i.registerSuccess() | ||||
| 			return | ||||
| 			return nil | ||||
| 		} | ||||
| 		log.Debug(). | ||||
| 			Str("hostname", i.hostname). | ||||
| 			Str("item-rel", item.Rel). | ||||
| 			Str("item-href", item.Href). | ||||
| 			Msg("found key in nodeinfo") | ||||
| 	} | ||||
| 
 | ||||
| 	log.Error(). | ||||
| 		Str("hostname", i.hostName). | ||||
| 		Str("hostname", i.hostname). | ||||
| 		Msg("incomplete nodeinfo") | ||||
| 	i.registerError() | ||||
| 	return | ||||
| 	return errors.New("incomplete nodeinfo") | ||||
| } | ||||
| 
 | ||||
| func (i *Instance) fetchNodeInfo() { | ||||
| 	i.fetchNodeInfoURL() | ||||
| func (i *Instance) fetchNodeInfo() error { | ||||
| 	err := i.fetchNodeInfoURL() | ||||
| 
 | ||||
| 	i.Lock() | ||||
| 	failure := false | ||||
| 	if i.nodeInfoUrl == "" { | ||||
| 		log.Error(). | ||||
| 			Str("hostname", i.hostName). | ||||
| 			Msg("unable to fetch nodeinfo as nodeinfo URL cannot be determined") | ||||
| 		failure = true | ||||
| 	} | ||||
| 	i.Unlock() | ||||
| 
 | ||||
| 	if failure == true { | ||||
| 		return | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	var c = &http.Client{ | ||||
| @ -186,19 +215,19 @@ func (i *Instance) fetchNodeInfo() { | ||||
| 
 | ||||
| 	//FIXME make sure the nodeinfourl is on the same domain as the instance
 | ||||
| 	//hostname
 | ||||
| 	i.Lock() | ||||
| 	i.RLock() | ||||
| 	url := i.nodeInfoUrl | ||||
| 	i.Unlock() | ||||
| 	i.RUnlock() | ||||
| 
 | ||||
| 	resp, err := c.Get(url) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		log.Error(). | ||||
| 			Str("hostname", i.hostName). | ||||
| 		log.Debug(). | ||||
| 			Str("hostname", i.hostname). | ||||
| 			Err(err). | ||||
| 			Msgf("unable to fetch nodeinfo data") | ||||
| 		i.registerError() | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	defer resp.Body.Close() | ||||
| @ -206,28 +235,28 @@ func (i *Instance) fetchNodeInfo() { | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		log.Error(). | ||||
| 			Str("hostname", i.hostName). | ||||
| 			Str("hostname", i.hostname). | ||||
| 			Err(err). | ||||
| 			Msgf("unable to read nodeinfo data") | ||||
| 		i.registerError() | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	ni := new(NodeInfoVersionTwoSchema) | ||||
| 	err = json.Unmarshal(body, &ni) | ||||
| 	if err != nil { | ||||
| 		log.Error(). | ||||
| 			Str("hostname", i.hostName). | ||||
| 			Str("hostname", i.hostname). | ||||
| 			Err(err). | ||||
| 			Msgf("unable to parse nodeinfo") | ||||
| 		i.registerError() | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	log.Info(). | ||||
| 	log.Debug(). | ||||
| 		Str("serverVersion", ni.Software.Version). | ||||
| 		Str("software", ni.Software.Name). | ||||
| 		Str("hostName", i.hostName). | ||||
| 		Str("hostname", i.hostname). | ||||
| 		Str("nodeInfoUrl", i.nodeInfoUrl). | ||||
| 		Msg("received nodeinfo from instance") | ||||
| 
 | ||||
| @ -238,31 +267,29 @@ func (i *Instance) fetchNodeInfo() { | ||||
| 	ni.Software.Name = strings.ToLower(ni.Software.Name) | ||||
| 
 | ||||
| 	if ni.Software.Name == "pleroma" { | ||||
| 		log.Info(). | ||||
| 			Str("hostname", i.hostName). | ||||
| 		log.Debug(). | ||||
| 			Str("hostname", i.hostname). | ||||
| 			Str("software", ni.Software.Name). | ||||
| 			Msg("detected server software") | ||||
| 		i.registerSuccess() | ||||
| 		i.identified = true | ||||
| 		i.impl = Pleroma | ||||
| 		i.status = InstanceStatusAlive | ||||
| 		i.status = InstanceStatusIdentified | ||||
| 		return nil | ||||
| 	} else if ni.Software.Name == "mastodon" { | ||||
| 		log.Info(). | ||||
| 			Str("hostname", i.hostName). | ||||
| 			Str("software", ni.Software.Name). | ||||
| 			Msg("detected server software") | ||||
| 		i.registerSuccess() | ||||
| 		i.identified = true | ||||
| 		i.impl = Mastodon | ||||
| 		i.status = InstanceStatusAlive | ||||
| 		i.status = InstanceStatusIdentified | ||||
| 		return nil | ||||
| 	} else { | ||||
| 		log.Error(). | ||||
| 			Str("hostname", i.hostName). | ||||
| 			Str("hostname", i.hostname). | ||||
| 			Str("software", ni.Software.Name). | ||||
| 			Msg("unknown implementation on server") | ||||
| 			Msg("FIXME unknown server implementation") | ||||
| 		i.registerError() | ||||
| 		return errors.New("FIXME unknown server implementation") | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
| 
 | ||||
| /* | ||||
| @ -283,12 +310,16 @@ func (i *Instance) fetchRecentToots() ([]byte, error) { | ||||
| 
 | ||||
| /* | ||||
| func (self *PleromaBackend) fetchRecentToots() ([]byte, error) { | ||||
| 	//url := fmt.Sprintf("https://%s/api/statuses/public_and_external_timeline.json?count=100", i.hostName)
 | ||||
| 	//url :=
 | ||||
| 	//fmt.Sprintf("https://%s/api/statuses/public_and_external_timeline.json?count=100",
 | ||||
| 	//i.hostname)
 | ||||
| 	return nil, nil | ||||
| } | ||||
| 
 | ||||
| func (self *MastodonBackend) fetchRecentTootsJsonFromMastodon() ([]byte, error) { | ||||
| 	//url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true", i.hostName)
 | ||||
| 	//url :=
 | ||||
| 	//fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true",
 | ||||
| 	//i.hostname)
 | ||||
| 	return nil, nil | ||||
| } | ||||
| */ | ||||
|  | ||||
| @ -1,7 +1,6 @@ | ||||
| package main | ||||
| 
 | ||||
| import "encoding/json" | ||||
| import "fmt" | ||||
| import "io/ioutil" | ||||
| import "net/http" | ||||
| import "time" | ||||
| @ -11,6 +10,8 @@ import "github.com/rs/zerolog/log" | ||||
| 
 | ||||
| const INDEX_API_TIMEOUT = time.Second * 60 | ||||
| 
 | ||||
| var USER_AGENT = "https://github.com/sneak/feta indexer bot; sneak@sneak.berlin for feedback" | ||||
| 
 | ||||
| // check with indices only hourly
 | ||||
| var INDEX_CHECK_INTERVAL = time.Second * 60 * 60 | ||||
| 
 | ||||
| @ -26,13 +27,11 @@ type InstanceLocator struct { | ||||
| 	pleromaIndexNextRefresh  *time.Time | ||||
| 	mastodonIndexNextRefresh *time.Time | ||||
| 	reportInstanceVia        chan InstanceHostname | ||||
| 	instances                map[string]*Instance | ||||
| 	sync.Mutex | ||||
| } | ||||
| 
 | ||||
| func NewInstanceLocator() *InstanceLocator { | ||||
| 	i := new(InstanceLocator) | ||||
| 	i.instances = make(map[string]*Instance) | ||||
| 	n := time.Now() | ||||
| 	i.pleromaIndexNextRefresh = &n | ||||
| 	i.mastodonIndexNextRefresh = &n | ||||
| @ -45,20 +44,17 @@ func (self *InstanceLocator) AddInstanceNotificationChannel(via chan InstanceHos | ||||
| 	self.reportInstanceVia = via | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceLocator) addInstance(hostname string) { | ||||
| 	self.Lock() | ||||
| 	defer self.Unlock() | ||||
| 	// only add it if we haven't seen the hostname before
 | ||||
| 	if self.instances[hostname] == nil { | ||||
| 		log.Info().Str("hostname", hostname).Msgf("adding discovered instance") | ||||
| 		self.instances[hostname] = NewInstance(hostname) | ||||
| 	} | ||||
| 
 | ||||
| func (self *InstanceLocator) addInstance(hostname InstanceHostname) { | ||||
| 	// receiver (InstanceManager) is responsible for de-duping against its
 | ||||
| 	// map
 | ||||
| 	self.reportInstanceVia <- hostname | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceLocator) Locate() { | ||||
| 	log.Info().Msg("InstanceLocator starting") | ||||
| 	x := time.Now() | ||||
| 	for { | ||||
| 		log.Info().Msg("InstanceLocator tick") | ||||
| 		if self.pleromaIndexNextRefresh.Before(time.Now()) { | ||||
| 			self.locatePleroma() | ||||
| 		} | ||||
| @ -74,61 +70,24 @@ func (self *InstanceLocator) Locate() { | ||||
| 			log.Debug(). | ||||
| 				Str("nextPleromaIndexFetch", self.pleromaIndexNextRefresh.Format(time.RFC3339)). | ||||
| 				Send() | ||||
| 			self.logInstanceReport() | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceLocator) logInstanceReport() { | ||||
| 	r := self.instanceReport() | ||||
| 	log.Info(). | ||||
| 		Uint("up", r.up). | ||||
| 		Uint("total", r.total). | ||||
| 		Uint("identified", r.identified). | ||||
| 		Msg("instance report") | ||||
| } | ||||
| 
 | ||||
| type InstanceLocatorReport struct { | ||||
| 	up         uint | ||||
| 	identified uint | ||||
| 	total      uint | ||||
| } | ||||
| 
 | ||||
| func (r *InstanceLocatorReport) String() string { | ||||
| 	return fmt.Sprintf("up=%d identified=%d total=%d", r.up, r.identified, r.total) | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceLocator) NumInstances() uint { | ||||
| 	return self.instanceReport().total | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceLocator) instanceReport() *InstanceLocatorReport { | ||||
| 	self.Lock() | ||||
| 	defer self.Unlock() | ||||
| 	r := new(InstanceLocatorReport) | ||||
| 
 | ||||
| 	r.total = uint(len(self.instances)) | ||||
| 
 | ||||
| 	for _, elem := range self.instances { | ||||
| 		if elem.identified == true { | ||||
| 			r.identified = r.identified + 1 | ||||
| 		} | ||||
| 
 | ||||
| 		if elem.status == InstanceStatusAlive { | ||||
| 			r.up = r.up + 1 | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return r | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceLocator) locateMastodon() { | ||||
| 	var c = &http.Client{ | ||||
| 		Timeout: INDEX_API_TIMEOUT, | ||||
| 	} | ||||
| 
 | ||||
| 	resp, err := c.Get(mastodonIndexUrl) | ||||
| 	req, err := http.NewRequest("GET", mastodonIndexUrl, nil) | ||||
| 	if err != nil { | ||||
| 		panic(err) | ||||
| 	} | ||||
| 
 | ||||
| 	req.Header.Set("User-Agent", USER_AGENT) | ||||
| 
 | ||||
| 	resp, err := c.Do(req) | ||||
| 	if err != nil { | ||||
| 		log.Error().Msgf("unable to fetch mastodon instance list: %s", err) | ||||
| 		t := time.Now().Add(INDEX_ERROR_INTERVAL) | ||||
| @ -162,7 +121,7 @@ func (self *InstanceLocator) locateMastodon() { | ||||
| 	} | ||||
| 
 | ||||
| 	for _, instance := range mi.Instances { | ||||
| 		self.addInstance(instance.Name) | ||||
| 		self.addInstance(InstanceHostname(instance.Name)) | ||||
| 	} | ||||
| 
 | ||||
| 	t := time.Now().Add(INDEX_CHECK_INTERVAL) | ||||
| @ -175,7 +134,14 @@ func (self *InstanceLocator) locatePleroma() { | ||||
| 	var c = &http.Client{ | ||||
| 		Timeout: INDEX_API_TIMEOUT, | ||||
| 	} | ||||
| 	resp, err := c.Get(pleromaIndexUrl) | ||||
| 
 | ||||
| 	req, err := http.NewRequest("GET", pleromaIndexUrl, nil) | ||||
| 	if err != nil { | ||||
| 		panic(err) | ||||
| 	} | ||||
| 	req.Header.Set("User-Agent", USER_AGENT) | ||||
| 
 | ||||
| 	resp, err := c.Do(req) | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		log.Error().Msgf("unable to fetch pleroma instance list: %s", err) | ||||
| @ -211,7 +177,7 @@ func (self *InstanceLocator) locatePleroma() { | ||||
| 	} | ||||
| 
 | ||||
| 	for _, instance := range *pi { | ||||
| 		self.addInstance(instance.Domain) | ||||
| 		self.addInstance(InstanceHostname(instance.Domain)) | ||||
| 	} | ||||
| 	t := time.Now().Add(INDEX_CHECK_INTERVAL) | ||||
| 	self.Lock() | ||||
|  | ||||
| @ -2,24 +2,63 @@ package main | ||||
| 
 | ||||
| import "sync" | ||||
| import "time" | ||||
| import "fmt" | ||||
| import "runtime" | ||||
| 
 | ||||
| //import "github.com/rs/zerolog/log"
 | ||||
| import "github.com/rs/zerolog/log" | ||||
| 
 | ||||
| type InstanceBackend interface { | ||||
| 	//FIXME
 | ||||
| } | ||||
| 
 | ||||
| type InstanceManager struct { | ||||
| 	sync.Mutex | ||||
| 	mu                       sync.Mutex | ||||
| 	instances                map[InstanceHostname]*Instance | ||||
| 	newInstanceNotifications chan InstanceHostname | ||||
| 	startup                  time.Time | ||||
| } | ||||
| 
 | ||||
| func NewInstanceManager() *InstanceManager { | ||||
| 	i := new(InstanceManager) | ||||
| 	i.instances = make(map[InstanceHostname]*Instance) | ||||
| 	return i | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceManager) logCaller(msg string) { | ||||
| 	fpcs := make([]uintptr, 1) | ||||
| 	// Skip 2 levels to get the caller
 | ||||
| 	n := runtime.Callers(3, fpcs) | ||||
| 	if n == 0 { | ||||
| 		log.Debug().Msg("MSG: NO CALLER") | ||||
| 	} | ||||
| 
 | ||||
| 	caller := runtime.FuncForPC(fpcs[0] - 1) | ||||
| 	if caller == nil { | ||||
| 		log.Debug().Msg("MSG CALLER WAS NIL") | ||||
| 	} | ||||
| 
 | ||||
| 	// Print the file name and line number
 | ||||
| 	filename, line := caller.FileLine(fpcs[0] - 1) | ||||
| 	function := caller.Name() | ||||
| 
 | ||||
| 	log.Debug(). | ||||
| 		Str("filename", filename). | ||||
| 		Int("linenum", line). | ||||
| 		Str("function", function). | ||||
| 		Msg(msg) | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceManager) Lock() { | ||||
| 	self.logCaller("instancemanager attempting to lock") | ||||
| 	self.mu.Lock() | ||||
| 	self.logCaller("instancemanager locked") | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceManager) Unlock() { | ||||
| 	self.mu.Unlock() | ||||
| 	self.logCaller("instancemanager unlocked") | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceManager) AddInstanceNotificationChannel(via chan InstanceHostname) { | ||||
| 	self.Lock() | ||||
| 	defer self.Unlock() | ||||
| @ -27,8 +66,121 @@ func (self *InstanceManager) AddInstanceNotificationChannel(via chan InstanceHos | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceManager) Manage() { | ||||
| 	self.managerLoop() | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceManager) managerLoop() { | ||||
| 	log.Info().Msg("InstanceManager starting") | ||||
| 	go self.receiveNewInstanceHostnames() | ||||
| 	self.startup = time.Now() | ||||
| 	for { | ||||
| 		// FIXME(sneak)
 | ||||
| 		log.Info().Msg("InstanceManager tick") | ||||
| 		self.Lock() | ||||
| 		for _, v := range self.instances { | ||||
| 			go func() { | ||||
| 				if v.dueForFetch() { | ||||
| 					v.Fetch() | ||||
| 				} | ||||
| 			}() | ||||
| 		} | ||||
| 		self.Unlock() | ||||
| 		time.Sleep(1 * time.Second) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceManager) hostnameExists(newhn InstanceHostname) bool { | ||||
| 	self.Lock() | ||||
| 	for k, _ := range self.instances { | ||||
| 		if newhn == k { | ||||
| 			self.Unlock() | ||||
| 			return true | ||||
| 		} | ||||
| 	} | ||||
| 	self.Unlock() | ||||
| 	return false | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceManager) addInstanceByHostname(newhn InstanceHostname) { | ||||
| 	// only add it if we haven't seen the hostname before
 | ||||
| 	if !self.hostnameExists(newhn) { | ||||
| 		i := NewInstance(newhn) | ||||
| 		self.Lock() | ||||
| 		self.instances[newhn] = i | ||||
| 		self.Unlock() | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceManager) receiveNewInstanceHostnames() { | ||||
| 	var newhn InstanceHostname | ||||
| 	for { | ||||
| 		newhn = <-self.newInstanceNotifications | ||||
| 		self.addInstanceByHostname(newhn) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceManager) logInstanceReport() { | ||||
| 	r := self.instanceReport() | ||||
| 	log.Info(). | ||||
| 		Uint("up", r.up). | ||||
| 		Uint("total", r.total). | ||||
| 		Uint("identified", r.identified). | ||||
| 		Msg("instance report") | ||||
| } | ||||
| 
 | ||||
| type InstanceReport struct { | ||||
| 	up         uint | ||||
| 	identified uint | ||||
| 	total      uint | ||||
| } | ||||
| 
 | ||||
| func (r *InstanceReport) String() string { | ||||
| 	return fmt.Sprintf("up=%d identified=%d total=%d", r.up, r.identified, r.total) | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceManager) NumInstances() uint { | ||||
| 	return self.instanceReport().total | ||||
| } | ||||
| 
 | ||||
| type InstanceListReport []*InstanceDetail | ||||
| 
 | ||||
| type InstanceDetail struct { | ||||
| 	hostname  string | ||||
| 	up        bool | ||||
| 	nextFetch string | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceManager) instanceListForApi() InstanceListReport { | ||||
| 	var output InstanceListReport | ||||
| 	self.Lock() | ||||
| 	defer self.Unlock() | ||||
| 	for _, v := range self.instances { | ||||
| 		id := &InstanceDetail{ | ||||
| 			hostname: v.hostname, | ||||
| 		} | ||||
| 		id.up = v.Up() | ||||
| 		id.nextFetch = string(time.Now().Sub(v.nextFetch)) | ||||
| 		output = append(output, id) | ||||
| 		fmt.Printf("%s", output) | ||||
| 	} | ||||
| 	return output | ||||
| } | ||||
| 
 | ||||
| func (self *InstanceManager) instanceReport() *InstanceReport { | ||||
| 	self.Lock() | ||||
| 	defer self.Unlock() | ||||
| 	r := new(InstanceReport) | ||||
| 
 | ||||
| 	r.total = uint(len(self.instances)) | ||||
| 
 | ||||
| 	for _, elem := range self.instances { | ||||
| 		if elem.identified == true { | ||||
| 			r.identified = r.identified + 1 | ||||
| 		} | ||||
| 
 | ||||
| 		if elem.status == InstanceStatusAlive { | ||||
| 			r.up = r.up + 1 | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return r | ||||
| } | ||||
|  | ||||
							
								
								
									
										6
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								main.go
									
									
									
									
									
								
							| @ -2,6 +2,7 @@ package main | ||||
| 
 | ||||
| import "os" | ||||
| import "sync" | ||||
| import "time" | ||||
| 
 | ||||
| import "github.com/rs/zerolog" | ||||
| import "github.com/rs/zerolog/log" | ||||
| @ -19,6 +20,11 @@ func app() int { | ||||
| 
 | ||||
| 	identify() | ||||
| 
 | ||||
| 	// always log in UTC
 | ||||
| 	zerolog.TimestampFunc = func() time.Time { | ||||
| 		return time.Now().UTC() | ||||
| 	} | ||||
| 
 | ||||
| 	zerolog.SetGlobalLevel(zerolog.InfoLevel) | ||||
| 	if os.Getenv("DEBUG") != "" { | ||||
| 		zerolog.SetGlobalLevel(zerolog.DebugLevel) | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user