package manager import ( "sync" "time" "git.eeqj.de/sneak/feta/instance" "git.eeqj.de/sneak/feta/seeds" "git.eeqj.de/sneak/feta/toot" "github.com/rs/zerolog/log" "github.com/spf13/viper" ) // conform for storing toots type DatabaseStorage interface { ListInstances() ([]*instance.Instance, error) //StoreInstances([]*instance.Instance) error SaveInstance(*instance.Instance) error } // InstanceManager is the main data structure for the goroutine that manages // the list of all known instances, fed by the locator type InstanceManager struct { mu sync.Mutex db DatabaseStorage instances map[string]*instance.Instance newInstanceNotifications chan string tootDestination chan *toot.Toot startup time.Time hostAdderSemaphore chan bool nextDBSave time.Time } // New returns a new InstanceManager for use by the Process func New(db DatabaseStorage) *InstanceManager { im := new(InstanceManager) im.db = db im.hostAdderSemaphore = make(chan bool, viper.GetInt("HostDiscoveryParallelism")) im.instances = make(map[string]*instance.Instance) im.RestoreFromDB() return im } func (im *InstanceManager) RestoreFromDB() { newil, err := im.db.ListInstances() if err != nil { log.Panic(). Err(err). Msg("cannot get instance list from db") } im.lock() defer im.unlock() count := 0 for _, x := range newil { x.SetTootDestination(im.tootDestination) im.instances[x.Hostname] = x count = count + 1 } log.Info(). Int("count", count). Msg("restored instances from database") } func (im *InstanceManager) SaveToDB() { for _, x := range im.ListInstances() { err := im.db.SaveInstance(x) if err != nil { log.Panic(). Err(err). Msg("cannot write to db") } } } // SetTootDestination provides the instancemanager with a channel to the // ingester that it can give to its instances func (im *InstanceManager) SetTootDestination(td chan *toot.Toot) { im.tootDestination = td } func (im *InstanceManager) lock() { im.mu.Lock() } func (im *InstanceManager) unlock() { im.mu.Unlock() } // SetInstanceNotificationChannel is how the Process tells the // InstanceManager about the channel from the InstanceLocator so that the // InstanceLocator can provide it/us (the InstanceManager) with new // instance.Hostnames. We (the manager) deduplicate the list ourselves. func (im *InstanceManager) SetInstanceNotificationChannel(via chan string) { im.lock() defer im.unlock() im.newInstanceNotifications = via } func (im *InstanceManager) receiveSeedInstanceHostnames() { for _, x := range seeds.SeedInstances { go func(tmp string) { im.addInstanceByHostname(tmp) }(x) } } // Manage is the main entrypoint of the InstanceManager, designed to be // called once in its own goroutine. func (im *InstanceManager) Manage() { log.Info().Msg("InstanceManager starting") go func() { im.receiveNewInstanceHostnames() }() im.startup = time.Now() x := im.startup go func() { im.receiveSeedInstanceHostnames() }() for { log.Info().Msg("InstanceManager tick") im.managerLoop() time.Sleep(1 * time.Second) if time.Now().After(x.Add(viper.GetDuration("LogReportInterval"))) { x = time.Now() im.logInstanceReport() } if im.nextDBSave.Before(time.Now()) { im.nextDBSave = time.Now().Add(time.Second * 60) im.SaveToDB() } } } func (im *InstanceManager) managerLoop() { im.lock() il := make([]*instance.Instance, 0) for _, v := range im.instances { il = append(il, v) } im.unlock() // FIXME is this a bug outside of the mutex above? for _, v := range il { go func(i *instance.Instance) { i.Tick() }(v) } } func (im *InstanceManager) hostnameExists(newhn string) bool { im.lock() defer im.unlock() for k := range im.instances { if newhn == k { return true } } return false } func (im *InstanceManager) addInstanceByHostname(newhn string) { if im.hostnameExists(newhn) { // ignore adding new if we already know about it return } // this blocks on the channel size, limiting concurrency im.hostAdderSemaphore <- true i := instance.New(func(x *instance.Instance) { x.Hostname = string(newhn) // set hostname x.SetTootDestination(im.tootDestination) // copy ingester input channel from manager to instance }) // we do node detection under the adderSemaphore to avoid thundering // on startup i.DetectNodeTypeIfNecessary() // pop an item from the buffered channel <-im.hostAdderSemaphore // lock the map to insert im.lock() im.instances[newhn] = i im.unlock() } func (im *InstanceManager) receiveNewInstanceHostnames() { var newhn string for { newhn = <-im.newInstanceNotifications // receive them fast out of the channel, let the adding function lock to add // them one at a time, using a bunch of blocked goroutines as our // modification queue go im.addInstanceByHostname(newhn) } } func (im *InstanceManager) logInstanceReport() { r := im.instanceSummaryReport() sublogger := log.With().Logger() for k, v := range r { sublogger = sublogger.With().Uint(k, v).Logger() } sublogger.Info(). Msg("instance report") } // ListInstances dumps a slice of all Instances the InstanceManager knows // about func (im *InstanceManager) ListInstances() []*instance.Instance { var out []*instance.Instance im.lock() defer im.unlock() for _, v := range im.instances { out = append(out, v) } return out } func (im *InstanceManager) instanceSummaryReport() map[string]uint { r := make(map[string]uint) for _, v := range im.ListInstances() { v.Lock() r[v.Status()]++ v.Unlock() } return r }