feta/manager/manager.go

237 lines
5.5 KiB
Go

package manager
import (
"sync"
"time"
"git.eeqj.de/sneak/feta/instance"
"git.eeqj.de/sneak/feta/seeds"
"git.eeqj.de/sneak/feta/toot"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
)
// conform for storing toots
type DatabaseStorage interface {
ListInstances() ([]*instance.Instance, error)
//StoreInstances([]*instance.Instance) error
SaveInstance(*instance.Instance) error
}
// InstanceManager is the main data structure for the goroutine that manages
// the list of all known instances, fed by the locator
type InstanceManager struct {
mu sync.Mutex
db DatabaseStorage
instances map[string]*instance.Instance
newInstanceNotifications chan string
tootDestination chan *toot.Toot
startup time.Time
hostAdderSemaphore chan bool
nextDBSave time.Time
}
// New returns a new InstanceManager for use by the Process
func New(db DatabaseStorage) *InstanceManager {
im := new(InstanceManager)
im.db = db
im.hostAdderSemaphore = make(chan bool, viper.GetInt("HostDiscoveryParallelism"))
im.instances = make(map[string]*instance.Instance)
im.RestoreFromDB()
return im
}
func (im *InstanceManager) RestoreFromDB() {
newil, err := im.db.ListInstances()
if err != nil {
log.Panic().
Err(err).
Msg("cannot get instance list from db")
}
im.lock()
defer im.unlock()
count := 0
for _, x := range newil {
x.SetTootDestination(im.tootDestination)
im.instances[x.Hostname] = x
count = count + 1
}
log.Info().
Int("count", count).
Msg("restored instances from database")
}
func (im *InstanceManager) SaveToDB() {
for _, x := range im.ListInstances() {
err := im.db.SaveInstance(x)
if err != nil {
log.Panic().
Err(err).
Msg("cannot write to db")
}
}
}
// SetTootDestination provides the instancemanager with a channel to the
// ingester that it can give to its instances
func (im *InstanceManager) SetTootDestination(td chan *toot.Toot) {
im.tootDestination = td
}
func (im *InstanceManager) lock() {
im.mu.Lock()
}
func (im *InstanceManager) unlock() {
im.mu.Unlock()
}
// SetInstanceNotificationChannel is how the Process tells the
// InstanceManager about the channel from the InstanceLocator so that the
// InstanceLocator can provide it/us (the InstanceManager) with new
// instance.Hostnames. We (the manager) deduplicate the list ourselves.
func (im *InstanceManager) SetInstanceNotificationChannel(via chan string) {
im.lock()
defer im.unlock()
im.newInstanceNotifications = via
}
func (im *InstanceManager) receiveSeedInstanceHostnames() {
for _, x := range seeds.SeedInstances {
go func(tmp string) {
im.addInstanceByHostname(tmp)
}(x)
}
}
// Manage is the main entrypoint of the InstanceManager, designed to be
// called once in its own goroutine.
func (im *InstanceManager) Manage() {
log.Info().Msg("InstanceManager starting")
go func() {
im.receiveNewInstanceHostnames()
}()
im.startup = time.Now()
x := im.startup
go func() {
im.receiveSeedInstanceHostnames()
}()
for {
log.Info().Msg("InstanceManager tick")
im.managerLoop()
time.Sleep(1 * time.Second)
if time.Now().After(x.Add(viper.GetDuration("LogReportInterval"))) {
x = time.Now()
im.logInstanceReport()
}
if im.nextDBSave.Before(time.Now()) {
im.nextDBSave = time.Now().Add(time.Second * 60)
im.SaveToDB()
}
}
}
func (im *InstanceManager) managerLoop() {
im.lock()
il := make([]*instance.Instance, 0)
for _, v := range im.instances {
il = append(il, v)
}
im.unlock()
// FIXME is this a bug outside of the mutex above?
for _, v := range il {
go func(i *instance.Instance) {
i.Tick()
}(v)
}
}
func (im *InstanceManager) hostnameExists(newhn string) bool {
im.lock()
defer im.unlock()
for k := range im.instances {
if newhn == k {
return true
}
}
return false
}
func (im *InstanceManager) addInstanceByHostname(newhn string) {
if im.hostnameExists(newhn) {
// ignore adding new if we already know about it
return
}
// this blocks on the channel size, limiting concurrency
im.hostAdderSemaphore <- true
i := instance.New(func(x *instance.Instance) {
x.Hostname = string(newhn) // set hostname
x.SetTootDestination(im.tootDestination) // copy ingester input channel from manager to instance
})
// we do node detection under the adderSemaphore to avoid thundering
// on startup
i.DetectNodeTypeIfNecessary()
// pop an item from the buffered channel
<-im.hostAdderSemaphore
// lock the map to insert
im.lock()
im.instances[newhn] = i
im.unlock()
}
func (im *InstanceManager) receiveNewInstanceHostnames() {
var newhn string
for {
newhn = <-im.newInstanceNotifications
// receive them fast out of the channel, let the adding function lock to add
// them one at a time, using a bunch of blocked goroutines as our
// modification queue
go im.addInstanceByHostname(newhn)
}
}
func (im *InstanceManager) logInstanceReport() {
r := im.instanceSummaryReport()
sublogger := log.With().Logger()
for k, v := range r {
sublogger = sublogger.With().Uint(k, v).Logger()
}
sublogger.Info().
Msg("instance report")
}
// ListInstances dumps a slice of all Instances the InstanceManager knows
// about
func (im *InstanceManager) ListInstances() []*instance.Instance {
var out []*instance.Instance
im.lock()
defer im.unlock()
for _, v := range im.instances {
out = append(out, v)
}
return out
}
func (im *InstanceManager) instanceSummaryReport() map[string]uint {
r := make(map[string]uint)
for _, v := range im.ListInstances() {
v.Lock()
r[v.Status()]++
v.Unlock()
}
return r
}