package manager import ( "sync" "time" "git.eeqj.de/sneak/feta/instance" "git.eeqj.de/sneak/feta/seeds" "git.eeqj.de/sneak/feta/toot" "github.com/rs/zerolog/log" "github.com/spf13/viper" ) // conform for storing toots type DatabaseStorage interface { ListInstances() ([]*instance.Instance, error) StoreInstances([]*instance.Instance) error } // InstanceManager is the main data structure for the goroutine that manages // the list of all known instances, fed by the locator type InstanceManager struct { mu sync.Mutex db DatabaseStorage instances map[instance.Hostname]*instance.Instance newInstanceNotifications chan instance.Hostname tootDestination chan *toot.Toot startup time.Time hostDiscoveryParallelism int hostAdderSemaphore chan bool } // New returns a new InstanceManager for use by the Process func New() *InstanceManager { i := new(InstanceManager) i.hostDiscoveryParallelism = viper.GetInt("HostDiscoveryParallelism") i.hostAdderSemaphore = make(chan bool, i.hostDiscoveryParallelism) i.instances = make(map[instance.Hostname]*instance.Instance) return i } // SetTootDestination provides the instancemanager with a channel to the // ingester that it can give to its instances func (im *InstanceManager) SetTootDestination(td chan *toot.Toot) { im.tootDestination = td } func (im *InstanceManager) lock() { im.mu.Lock() } func (im *InstanceManager) unlock() { im.mu.Unlock() } // SetInstanceNotificationChannel is how the Process tells the // InstanceManager about the channel from the InstanceLocator so that the // InstanceLocator can provide it/us (the InstanceManager) with new // instance.Hostnames. We (the manager) deduplicate the list ourselves. func (im *InstanceManager) SetInstanceNotificationChannel(via chan instance.Hostname) { im.lock() defer im.unlock() im.newInstanceNotifications = via } func (im *InstanceManager) receiveSeedInstanceHostnames() { for _, x := range seeds.SeedInstances { go func(tmp instance.Hostname) { im.addInstanceByHostname(tmp) }(instance.Hostname(x)) } } // Manage is the main entrypoint of the InstanceManager, designed to be // called once in its own goroutine. func (im *InstanceManager) Manage() { log.Info().Msg("InstanceManager starting") go func() { im.receiveNewInstanceHostnames() }() im.startup = time.Now() x := im.startup go func() { im.receiveSeedInstanceHostnames() }() for { log.Info().Msg("InstanceManager tick") im.managerLoop() time.Sleep(1 * time.Second) if time.Now().After(x.Add(viper.GetDuration("LogReportInterval"))) { x = time.Now() im.logInstanceReport() } } } func (im *InstanceManager) managerLoop() { im.lock() il := make([]*instance.Instance, 0) for _, v := range im.instances { il = append(il, v) } im.unlock() // FIXME is this a bug outside of the mutex above? for _, v := range il { go func(i *instance.Instance) { i.Tick() }(v) } } func (im *InstanceManager) hostnameExists(newhn instance.Hostname) bool { im.lock() defer im.unlock() for k := range im.instances { if newhn == k { return true } } return false } func (im *InstanceManager) addInstanceByHostname(newhn instance.Hostname) { if im.hostnameExists(newhn) { // ignore adding new if we already know about it return } // this blocks on the channel size, limiting concurrency im.hostAdderSemaphore <- true i := instance.New(func(x *instance.Instance) { x.Hostname = string(newhn) // set hostname x.SetTootDestination(im.tootDestination) // copy ingester input channel from manager to instance }) // we do node detection under the adderSemaphore to avoid thundering // on startup i.DetectNodeTypeIfNecessary() // pop an item from the buffered channel <-im.hostAdderSemaphore // lock the map to insert im.lock() im.instances[newhn] = i im.unlock() } func (im *InstanceManager) receiveNewInstanceHostnames() { var newhn instance.Hostname for { newhn = <-im.newInstanceNotifications // receive them fast out of the channel, let the adding function lock to add // them one at a time, using a bunch of blocked goroutines as our // modification queue go im.addInstanceByHostname(newhn) } } func (im *InstanceManager) logInstanceReport() { r := im.instanceSummaryReport() sublogger := log.With().Logger() for k, v := range r { sublogger = sublogger.With().Uint(k, v).Logger() } sublogger.Info(). Msg("instance report") } // ListInstances dumps a slice of all Instances the InstanceManager knows // about func (im *InstanceManager) ListInstances() []*instance.Instance { // FIXME make this pull from db var out []*instance.Instance im.lock() defer im.unlock() for _, v := range im.instances { out = append(out, v) } return out } func (im *InstanceManager) instanceSummaryReport() map[string]uint { // FIXME make this pull from db r := make(map[string]uint) for _, v := range im.ListInstances() { v.Lock() r[v.Status()]++ v.Unlock() } return r }