feta/manager.go

191 lines
4.1 KiB
Go
Raw Normal View History

2019-11-05 02:53:11 +00:00
package feta
2019-11-03 13:17:00 +00:00
import "sync"
import "time"
2019-11-03 18:00:01 +00:00
import "runtime"
2019-11-03 13:17:00 +00:00
2019-11-05 23:32:09 +00:00
//import "github.com/gin-gonic/gin"
2019-11-03 18:00:01 +00:00
import "github.com/rs/zerolog/log"
2019-11-03 13:17:00 +00:00
const hostDiscoveryParallelism = 20
2019-11-09 06:13:19 +00:00
type instanceBackend interface {
2019-11-03 13:17:00 +00:00
//FIXME
}
// InstanceManager is the main data structure for the goroutine that manages
// the list of all known instances, fed by the locator
2019-11-03 13:17:00 +00:00
type InstanceManager struct {
2019-11-03 18:00:01 +00:00
mu sync.Mutex
2019-11-06 07:03:42 +00:00
instances map[InstanceHostname]*instance
2019-11-03 13:17:00 +00:00
newInstanceNotifications chan InstanceHostname
2019-12-14 16:34:13 +00:00
tootDestination chan *toot
2019-11-03 18:00:01 +00:00
startup time.Time
2019-11-06 00:46:52 +00:00
hostAdderSemaphore chan bool
2019-11-03 13:17:00 +00:00
}
func newInstanceManager() *InstanceManager {
2019-11-03 13:17:00 +00:00
i := new(InstanceManager)
i.hostAdderSemaphore = make(chan bool, hostDiscoveryParallelism)
2019-11-06 07:03:42 +00:00
i.instances = make(map[InstanceHostname]*instance)
2019-11-03 13:17:00 +00:00
return i
}
2019-12-14 16:34:13 +00:00
func (im *InstanceManager) setTootDestination(td chan *toot) {
im.tootDestination = td
}
func (im *InstanceManager) logCaller(msg string) {
2019-11-03 18:00:01 +00:00
fpcs := make([]uintptr, 1)
// Skip 2 levels to get the caller
n := runtime.Callers(3, fpcs)
if n == 0 {
log.Debug().Msg("MSG: NO CALLER")
}
caller := runtime.FuncForPC(fpcs[0] - 1)
if caller == nil {
log.Debug().Msg("MSG CALLER WAS NIL")
}
// Print the file name and line number
filename, line := caller.FileLine(fpcs[0] - 1)
function := caller.Name()
log.Debug().
Str("filename", filename).
Int("linenum", line).
Str("function", function).
Msg(msg)
}
func (im *InstanceManager) lock() {
im.mu.Lock()
2019-11-03 18:00:01 +00:00
}
func (im *InstanceManager) unlock() {
im.mu.Unlock()
2019-11-03 18:00:01 +00:00
}
2019-12-14 16:34:13 +00:00
func (im *InstanceManager) setInstanceNotificationChannel(via chan InstanceHostname) {
im.lock()
defer im.unlock()
im.newInstanceNotifications = via
2019-11-03 13:17:00 +00:00
}
func (im *InstanceManager) manage() {
2019-11-03 18:00:01 +00:00
log.Info().Msg("InstanceManager starting")
2019-11-05 23:32:09 +00:00
go func() {
im.receiveNewInstanceHostnames()
2019-11-05 23:32:09 +00:00
}()
im.startup = time.Now()
x := im.startup
2019-11-03 13:17:00 +00:00
for {
2019-11-03 18:00:01 +00:00
log.Info().Msg("InstanceManager tick")
im.managerLoop()
2019-11-03 13:17:00 +00:00
time.Sleep(1 * time.Second)
if time.Now().After(x.Add(LogReportInterval)) {
2019-11-06 07:03:42 +00:00
x = time.Now()
im.logInstanceReport()
2019-11-06 07:03:42 +00:00
}
2019-11-03 13:17:00 +00:00
}
}
2019-11-03 18:00:01 +00:00
func (im *InstanceManager) managerLoop() {
im.lock()
2019-11-06 07:03:42 +00:00
il := make([]*instance, 0)
for _, v := range im.instances {
2019-11-05 23:32:09 +00:00
il = append(il, v)
2019-11-04 17:07:04 +00:00
}
im.unlock()
2019-11-05 23:32:09 +00:00
2019-12-14 15:49:35 +00:00
// FIXME is this a bug outside of the mutex above?
2019-11-05 23:32:09 +00:00
for _, v := range il {
2019-11-06 07:03:42 +00:00
go func(i *instance) {
i.Tick()
}(v)
2019-11-05 23:32:09 +00:00
}
2019-11-04 17:07:04 +00:00
}
func (im *InstanceManager) hostnameExists(newhn InstanceHostname) bool {
im.lock()
defer im.unlock()
for k := range im.instances {
2019-11-03 18:00:01 +00:00
if newhn == k {
return true
}
}
return false
}
func (im *InstanceManager) addInstanceByHostname(newhn InstanceHostname) {
if im.hostnameExists(newhn) {
2019-12-14 15:49:35 +00:00
// ignore adding new if we already know about it
2019-11-04 17:07:04 +00:00
return
2019-11-03 18:00:01 +00:00
}
2019-11-05 23:32:09 +00:00
2019-11-06 00:46:52 +00:00
// this blocks on the channel size, limiting concurrency
im.hostAdderSemaphore <- true
2019-11-06 00:46:52 +00:00
i := newInstance(func(x *instance) {
2019-11-06 07:03:42 +00:00
x.hostname = string(newhn)
2019-12-14 16:34:13 +00:00
x.setTootDestination(im.tootDestination)
2019-11-06 07:03:42 +00:00
})
2019-11-05 23:32:09 +00:00
// we do node detection under the addLock to avoid thundering
// on startup
i.detectNodeTypeIfNecessary()
2019-11-06 00:46:52 +00:00
// pop an item from the buffered channel
<-im.hostAdderSemaphore
2019-11-06 00:46:52 +00:00
// lock the map to insert
im.lock()
im.instances[newhn] = i
im.unlock()
2019-11-06 00:46:52 +00:00
2019-11-03 18:00:01 +00:00
}
func (im *InstanceManager) receiveNewInstanceHostnames() {
2019-11-03 18:00:01 +00:00
var newhn InstanceHostname
for {
newhn = <-im.newInstanceNotifications
2019-11-05 23:32:09 +00:00
// receive them fast out of the channel, let the adding function lock to add
// them one at a time, using a bunch of blocked goroutines as our
2019-12-14 15:49:35 +00:00
// modification queue
go im.addInstanceByHostname(newhn)
2019-11-03 18:00:01 +00:00
}
}
func (im *InstanceManager) logInstanceReport() {
r := im.instanceSummaryReport()
2019-11-03 18:00:01 +00:00
2019-11-06 07:03:42 +00:00
sublogger := log.With().Logger()
2019-11-03 18:00:01 +00:00
2019-11-06 07:03:42 +00:00
for k, v := range r {
sublogger = sublogger.With().Uint(k, v).Logger()
}
2019-11-03 18:00:01 +00:00
2019-11-06 07:03:42 +00:00
sublogger.Info().
Msg("instance report")
2019-11-03 18:00:01 +00:00
}
func (im *InstanceManager) listInstances() []*instance {
2019-11-06 07:03:42 +00:00
var out []*instance
im.lock()
defer im.unlock()
for _, v := range im.instances {
2019-11-04 17:07:04 +00:00
out = append(out, v)
}
return out
}
2019-11-06 07:03:42 +00:00
func (im *InstanceManager) instanceSummaryReport() map[string]uint {
r := make(map[string]uint)
for _, v := range im.listInstances() {
v.Lock()
r[v.Status()]++
v.Unlock()
2019-11-03 18:00:01 +00:00
}
return r
}