package manager import "sync" import "time" import "runtime" //import "github.com/gin-gonic/gin" import "github.com/rs/zerolog/log" import "github.com/sneak/feta/toot" import "github.com/sneak/feta/seeds" import "github.com/sneak/feta/instance" const hostDiscoveryParallelism = 5 // LogReportInterval defines how long between logging internal // stats/reporting for user supervision var LogReportInterval = time.Second * 10 // InstanceManager is the main data structure for the goroutine that manages // the list of all known instances, fed by the locator type InstanceManager struct { mu sync.Mutex instances map[instance.Hostname]*instance.Instance newInstanceNotifications chan instance.Hostname tootDestination chan *toot.Toot startup time.Time hostAdderSemaphore chan bool } // New returns a new InstanceManager for use by the Process func New() *InstanceManager { i := new(InstanceManager) i.hostAdderSemaphore = make(chan bool, hostDiscoveryParallelism) i.instances = make(map[instance.Hostname]*instance.Instance) return i } // SetTootDestination provides the instancemanager with a channel to the // ingester that it can give to its instances func (im *InstanceManager) SetTootDestination(td chan *toot.Toot) { im.tootDestination = td } func (im *InstanceManager) logCaller(msg string) { fpcs := make([]uintptr, 1) // Skip 2 levels to get the caller n := runtime.Callers(3, fpcs) if n == 0 { log.Debug().Msg("MSG: NO CALLER") } caller := runtime.FuncForPC(fpcs[0] - 1) if caller == nil { log.Debug().Msg("MSG CALLER WAS NIL") } // Print the file name and line number filename, line := caller.FileLine(fpcs[0] - 1) function := caller.Name() log.Debug(). Str("filename", filename). Int("linenum", line). Str("function", function). Msg(msg) } func (im *InstanceManager) lock() { im.mu.Lock() } func (im *InstanceManager) unlock() { im.mu.Unlock() } // SetInstanceNotificationChannel is how the Process tells the // InstanceManager about the channel from the InstanceLocator so that the // InstanceLocator can provide it/us (the InstanceManager) with new // instance.Hostnames. We (the manager) deduplicate the list ourselves. func (im *InstanceManager) SetInstanceNotificationChannel(via chan instance.Hostname) { im.lock() defer im.unlock() im.newInstanceNotifications = via } func (im *InstanceManager) receiveSeedInstanceHostnames() { for _, x := range seeds.SeedInstances { go func(tmp instance.Hostname) { im.addInstanceByHostname(tmp) }(instance.Hostname(x)) } } // Manage is the main entrypoint of the InstanceManager, designed to be // called once in its own goroutine. func (im *InstanceManager) Manage() { log.Info().Msg("InstanceManager starting") go func() { im.receiveNewInstanceHostnames() }() im.startup = time.Now() x := im.startup go func() { im.receiveSeedInstanceHostnames() }() for { log.Info().Msg("InstanceManager tick") im.managerLoop() time.Sleep(1 * time.Second) if time.Now().After(x.Add(LogReportInterval)) { x = time.Now() im.logInstanceReport() } } } func (im *InstanceManager) managerLoop() { im.lock() il := make([]*instance.Instance, 0) for _, v := range im.instances { il = append(il, v) } im.unlock() // FIXME is this a bug outside of the mutex above? for _, v := range il { go func(i *instance.Instance) { i.Tick() }(v) } } func (im *InstanceManager) hostnameExists(newhn instance.Hostname) bool { im.lock() defer im.unlock() for k := range im.instances { if newhn == k { return true } } return false } func (im *InstanceManager) addInstanceByHostname(newhn instance.Hostname) { if im.hostnameExists(newhn) { // ignore adding new if we already know about it return } // this blocks on the channel size, limiting concurrency im.hostAdderSemaphore <- true i := instance.New(func(x *instance.Instance) { x.Hostname = string(newhn) // set hostname x.SetTootDestination(im.tootDestination) // copy ingester input channel from manager to instance }) // we do node detection under the adderSemaphore to avoid thundering // on startup i.DetectNodeTypeIfNecessary() // pop an item from the buffered channel <-im.hostAdderSemaphore // lock the map to insert im.lock() im.instances[newhn] = i im.unlock() } func (im *InstanceManager) receiveNewInstanceHostnames() { var newhn instance.Hostname for { newhn = <-im.newInstanceNotifications // receive them fast out of the channel, let the adding function lock to add // them one at a time, using a bunch of blocked goroutines as our // modification queue go im.addInstanceByHostname(newhn) } } func (im *InstanceManager) logInstanceReport() { r := im.instanceSummaryReport() sublogger := log.With().Logger() for k, v := range r { sublogger = sublogger.With().Uint(k, v).Logger() } sublogger.Info(). Msg("instance report") } // ListInstances dumps a slice of all Instances the InstanceManager knows // about func (im *InstanceManager) ListInstances() []*instance.Instance { var out []*instance.Instance im.lock() defer im.unlock() for _, v := range im.instances { out = append(out, v) } return out } func (im *InstanceManager) instanceSummaryReport() map[string]uint { r := make(map[string]uint) for _, v := range im.ListInstances() { v.Lock() r[v.Status()]++ v.Unlock() } return r }