From b3f672b84af5ec099c75fb61a2d31e60149aee70 Mon Sep 17 00:00:00 2001 From: sneak Date: Fri, 27 Mar 2020 18:17:52 -0700 Subject: [PATCH] builds again --- database/dbmodel.go | 77 ++++++++++++++++++++++++++++++++++++++------ database/manager.go | 2 +- instance/instance.go | 57 ++++++++++++++++---------------- locator/locator.go | 13 +++----- manager/manager.go | 66 ++++++++++++++++++++++++++----------- process/feta.go | 24 ++++---------- 6 files changed, 154 insertions(+), 85 deletions(-) diff --git a/database/dbmodel.go b/database/dbmodel.go index ed9651d..e2a29d0 100644 --- a/database/dbmodel.go +++ b/database/dbmodel.go @@ -6,23 +6,18 @@ import ( "git.eeqj.de/sneak/feta/instance" "github.com/google/uuid" "github.com/jinzhu/gorm" + "github.com/rs/zerolog/log" _ "github.com/jinzhu/gorm/dialects/sqlite" ) -// NB that when you add a model below you must add it to this list! - -func (m *Manager) doMigrations() { - m.db.AutoMigrate(&apinstance{}) -} - -type apinstance struct { +type APInstance struct { gorm.Model - ID uuid.UUID `gorm:"type:uuid;primary_key;"` + UUID uuid.UUID `gorm:"type:uuid;primary_key;"` ErrorCount uint SuccessCount uint - HighestID int - Hostname string + HighestID uint + Hostname string `gorm:"type:varchar(100);unique_index"` Identified bool Fetching bool Disabled bool @@ -31,6 +26,68 @@ type apinstance struct { NodeInfoURL string ServerVersionString string ServerImplementationString string + FSMState string +} + +// NB that when you add a model below you must add it to this list! +func (m *Manager) doMigrations() { + log.Info().Msg("doing database migrations if required") + m.db.AutoMigrate(&APInstance{}) +} + +func (m *Manager) SaveInstance(i *instance.Instance) error { + i.Lock() + defer i.Unlock() + var x APInstance + if m.db.Where("UUID = ?", i.UUID).First(&x).RecordNotFound() { + log.Info(). + Str("hostname", i.Hostname). + Msg("instance not in db, inserting") + // item does not exist in db yet, must insert + ni := APInstance{ + UUID: i.UUID, + Disabled: i.Disabled, + ErrorCount: i.ErrorCount, + FSMState: i.Status(), + Fetching: i.Fetching, + HighestID: i.HighestID, + Hostname: i.Hostname, + Identified: i.Identified, + Implementation: string(i.Implementation), + NextFetch: i.NextFetch, + NodeInfoURL: i.NodeInfoURL, + ServerImplementationString: i.ServerImplementationString, + ServerVersionString: i.ServerVersionString, + SuccessCount: i.SuccessCount, + } + r := m.db.Create(&ni) + return r.Error + } else { + log.Info(). + Str("hostname", i.Hostname). + Str("id", i.UUID.String()). + Msg("instance found in db, updating") + // exists in db, update db + var ei APInstance + // EI EI uh-oh + m.db.Where("UUID = ?", i.UUID).First(&ei) + ei.Disabled = i.Disabled + ei.ErrorCount = i.ErrorCount + ei.FSMState = i.Status() + ei.Fetching = i.Fetching + ei.HighestID = i.HighestID + ei.Hostname = i.Hostname + ei.Identified = i.Identified + ei.Implementation = string(i.Implementation) + ei.NextFetch = i.NextFetch + ei.NodeInfoURL = i.NodeInfoURL + ei.ServerImplementationString = i.ServerImplementationString + ei.ServerVersionString = i.ServerVersionString + ei.SuccessCount = i.SuccessCount + + r := m.db.Save(&ei) + return r.Error + } } func (m *Manager) ListInstances() ([]*instance.Instance, error) { diff --git a/database/manager.go b/database/manager.go index 1ec2fa2..4e8ea74 100644 --- a/database/manager.go +++ b/database/manager.go @@ -23,11 +23,11 @@ func New() *Manager { } func (m *Manager) init() { + m.open() m.db.LogMode(false) if viper.GetBool("Debug") { m.db.LogMode(true) } - m.open() } func mkdirp(p string) error { diff --git a/instance/instance.go b/instance/instance.go index aee444c..71f116b 100644 --- a/instance/instance.go +++ b/instance/instance.go @@ -28,10 +28,6 @@ const instanceErrorInterval = time.Second * 60 * 30 type instanceImplementation int -// Hostname is a special type for holding the hostname of an -// instance (string) -type Hostname string - const ( implUnknown instanceImplementation = iota implMastodon @@ -40,34 +36,35 @@ const ( // Instance stores all the information we know about an instance type Instance struct { - Identifier uuid.UUID + Disabled bool + ErrorCount uint + FSM *fsm.FSM + Fetching bool + HighestID uint + Hostname string + UUID uuid.UUID + Identified bool + Implementation instanceImplementation + NextFetch time.Time + NodeInfoURL string + ServerImplementationString string + ServerVersionString string + SuccessCount uint + fetchingLock sync.Mutex + fsmLock sync.Mutex + storageBackend *storage.TootStorageBackend structLock sync.Mutex tootDestination chan *toot.Toot - ErrorCount uint - SuccessCount uint - highestID int - Hostname string - Identified bool - fetching bool - disabled bool - implementation instanceImplementation - storageBackend *storage.TootStorageBackend - NextFetch time.Time - nodeInfoURL string - ServerVersionString string - ServerImplementationString string - fetchingLock sync.Mutex - fsm *fsm.FSM - fsmLock sync.Mutex } // New returns a new instance, argument is a function that operates on the // new instance func New(options ...func(i *Instance)) *Instance { i := new(Instance) + i.UUID = uuid.New() i.setNextFetchAfter(1 * time.Second) - i.fsm = fsm.NewFSM( + i.FSM = fsm.NewFSM( "STATUS_UNKNOWN", fsm.Events{ {Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"}, @@ -96,7 +93,7 @@ func New(options ...func(i *Instance)) *Instance { func (i *Instance) Status() string { i.fsmLock.Lock() defer i.fsmLock.Unlock() - return i.fsm.Current() + return i.FSM.Current() } // SetTootDestination takes a channel from the manager that all toots @@ -111,7 +108,7 @@ func (i *Instance) SetTootDestination(d chan *toot.Toot) { func (i *Instance) Event(eventname string) { i.fsmLock.Lock() defer i.fsmLock.Unlock() - i.fsm.Event(eventname) + i.FSM.Event(eventname) } func (i *Instance) fsmEnterState(e *fsm.Event) { @@ -198,7 +195,7 @@ func (i *Instance) Tick() { func (i *Instance) nodeIdentified() bool { i.Lock() defer i.Unlock() - if i.implementation > implUnknown { + if i.Implementation > implUnknown { return true } return false @@ -288,7 +285,7 @@ func (i *Instance) fetchNodeInfoURL() error { Msg("success fetching url for nodeinfo") i.Lock() - i.nodeInfoURL = item.Href + i.NodeInfoURL = item.Href i.Unlock() i.registerSuccess() i.Event("GOT_NODEINFO_URL") @@ -323,7 +320,7 @@ func (i *Instance) fetchNodeInfo() error { //FIXME make sure the nodeinfourl is on the same domain as the instance //hostname i.Lock() - url := i.nodeInfoURL + url := i.NodeInfoURL i.Unlock() i.Event("BEGIN_NODEINFO_FETCH") @@ -368,7 +365,7 @@ func (i *Instance) fetchNodeInfo() error { Str("serverVersion", ni.Software.Version). Str("software", ni.Software.Name). Str("hostname", i.Hostname). - Str("nodeInfoURL", i.nodeInfoURL). + Str("nodeInfoURL", i.NodeInfoURL). Msg("received nodeinfo from instance") i.Lock() @@ -382,7 +379,7 @@ func (i *Instance) fetchNodeInfo() error { Str("software", ni.Software.Name). Msg("detected server software") i.Identified = true - i.implementation = implPleroma + i.Implementation = implPleroma i.Unlock() i.registerSuccess() i.Event("GOT_NODEINFO") @@ -393,7 +390,7 @@ func (i *Instance) fetchNodeInfo() error { Str("software", ni.Software.Name). Msg("detected server software") i.Identified = true - i.implementation = implMastodon + i.Implementation = implMastodon i.Unlock() i.registerSuccess() i.Event("GOT_NODEINFO") diff --git a/locator/locator.go b/locator/locator.go index c00bfa2..7363c8e 100644 --- a/locator/locator.go +++ b/locator/locator.go @@ -7,15 +7,12 @@ import ( "sync" "time" - "git.eeqj.de/sneak/feta/instance" "git.eeqj.de/sneak/feta/jsonapis" "github.com/rs/zerolog/log" "github.com/spf13/viper" "golang.org/x/sync/semaphore" ) -//import "git.eeqj.de/sneak/feta" - // IndexAPITimeout is the timeout for fetching json instance lists // from the listing servers const IndexAPITimeout = time.Second * 60 * 3 @@ -39,7 +36,7 @@ const pleromaIndexURL = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api type InstanceLocator struct { pleromaIndexNextRefresh *time.Time mastodonIndexNextRefresh *time.Time - reportInstanceVia chan instance.Hostname + reportInstanceVia chan string mu sync.Mutex } @@ -62,13 +59,13 @@ func (il *InstanceLocator) unlock() { // SetInstanceNotificationChannel is the way the instanceLocator returns // newly discovered instances back to the manager for query/addition -func (il *InstanceLocator) SetInstanceNotificationChannel(via chan instance.Hostname) { +func (il *InstanceLocator) SetInstanceNotificationChannel(via chan string) { il.lock() defer il.unlock() il.reportInstanceVia = via } -func (il *InstanceLocator) addInstance(hostname instance.Hostname) { +func (il *InstanceLocator) addInstance(hostname string) { // receiver (InstanceManager) is responsible for de-duping against its // map, we just locate and spray, it manages il.reportInstanceVia <- hostname @@ -201,7 +198,7 @@ func (il *InstanceLocator) locateMastodon() { Msg("received hosts from mastodon index") for k := range hosts { - il.addInstance(instance.Hostname(k)) + il.addInstance(k) } } @@ -269,7 +266,7 @@ func (il *InstanceLocator) locatePleroma() { Msg("received hosts from pleroma index") for k := range hosts { - il.addInstance(instance.Hostname(k)) + il.addInstance(k) } } diff --git a/manager/manager.go b/manager/manager.go index e37183a..922b3f6 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -14,7 +14,8 @@ import ( // conform for storing toots type DatabaseStorage interface { ListInstances() ([]*instance.Instance, error) - StoreInstances([]*instance.Instance) error + //StoreInstances([]*instance.Instance) error + SaveInstance(*instance.Instance) error } // InstanceManager is the main data structure for the goroutine that manages @@ -22,21 +23,47 @@ type DatabaseStorage interface { type InstanceManager struct { mu sync.Mutex db DatabaseStorage - instances map[instance.Hostname]*instance.Instance - newInstanceNotifications chan instance.Hostname + instances map[string]*instance.Instance + newInstanceNotifications chan string tootDestination chan *toot.Toot startup time.Time - hostDiscoveryParallelism int hostAdderSemaphore chan bool + nextDBSave time.Time } // New returns a new InstanceManager for use by the Process -func New() *InstanceManager { - i := new(InstanceManager) - i.hostDiscoveryParallelism = viper.GetInt("HostDiscoveryParallelism") - i.hostAdderSemaphore = make(chan bool, i.hostDiscoveryParallelism) - i.instances = make(map[instance.Hostname]*instance.Instance) - return i +func New(db DatabaseStorage) *InstanceManager { + im := new(InstanceManager) + im.db = db + im.hostAdderSemaphore = make(chan bool, viper.GetInt("HostDiscoveryParallelism")) + im.instances = make(map[string]*instance.Instance) + im.RestoreFromDB() + return im +} + +func (im *InstanceManager) RestoreFromDB() { + newil, err := im.db.ListInstances() + if err != nil { + log.Panic(). + Err(err). + Msg("cannot get instance list from db") + } + im.lock() + defer im.unlock() + for _, x := range newil { + im.instances[x.Hostname] = x + } +} + +func (im *InstanceManager) SaveToDB() { + for _, x := range im.ListInstances() { + err := im.db.SaveInstance(x) + if err != nil { + log.Panic(). + Err(err). + Msg("cannot write to db") + } + } } // SetTootDestination provides the instancemanager with a channel to the @@ -57,7 +84,7 @@ func (im *InstanceManager) unlock() { // InstanceManager about the channel from the InstanceLocator so that the // InstanceLocator can provide it/us (the InstanceManager) with new // instance.Hostnames. We (the manager) deduplicate the list ourselves. -func (im *InstanceManager) SetInstanceNotificationChannel(via chan instance.Hostname) { +func (im *InstanceManager) SetInstanceNotificationChannel(via chan string) { im.lock() defer im.unlock() im.newInstanceNotifications = via @@ -65,9 +92,9 @@ func (im *InstanceManager) SetInstanceNotificationChannel(via chan instance.Host func (im *InstanceManager) receiveSeedInstanceHostnames() { for _, x := range seeds.SeedInstances { - go func(tmp instance.Hostname) { + go func(tmp string) { im.addInstanceByHostname(tmp) - }(instance.Hostname(x)) + }(x) } } @@ -94,6 +121,11 @@ func (im *InstanceManager) Manage() { x = time.Now() im.logInstanceReport() } + + if im.nextDBSave.Before(time.Now()) { + im.nextDBSave = time.Now().Add(time.Second * 60) + im.SaveToDB() + } } } @@ -113,7 +145,7 @@ func (im *InstanceManager) managerLoop() { } } -func (im *InstanceManager) hostnameExists(newhn instance.Hostname) bool { +func (im *InstanceManager) hostnameExists(newhn string) bool { im.lock() defer im.unlock() for k := range im.instances { @@ -124,7 +156,7 @@ func (im *InstanceManager) hostnameExists(newhn instance.Hostname) bool { return false } -func (im *InstanceManager) addInstanceByHostname(newhn instance.Hostname) { +func (im *InstanceManager) addInstanceByHostname(newhn string) { if im.hostnameExists(newhn) { // ignore adding new if we already know about it return @@ -152,7 +184,7 @@ func (im *InstanceManager) addInstanceByHostname(newhn instance.Hostname) { } func (im *InstanceManager) receiveNewInstanceHostnames() { - var newhn instance.Hostname + var newhn string for { newhn = <-im.newInstanceNotifications // receive them fast out of the channel, let the adding function lock to add @@ -178,7 +210,6 @@ func (im *InstanceManager) logInstanceReport() { // ListInstances dumps a slice of all Instances the InstanceManager knows // about func (im *InstanceManager) ListInstances() []*instance.Instance { - // FIXME make this pull from db var out []*instance.Instance im.lock() defer im.unlock() @@ -189,7 +220,6 @@ func (im *InstanceManager) ListInstances() []*instance.Instance { } func (im *InstanceManager) instanceSummaryReport() map[string]uint { - // FIXME make this pull from db r := make(map[string]uint) for _, v := range im.ListInstances() { v.Lock() diff --git a/process/feta.go b/process/feta.go index 9cfc607..8464661 100644 --- a/process/feta.go +++ b/process/feta.go @@ -6,7 +6,6 @@ import ( "git.eeqj.de/sneak/feta/database" "git.eeqj.de/sneak/feta/ingester" - "git.eeqj.de/sneak/feta/instance" "git.eeqj.de/sneak/feta/locator" "git.eeqj.de/sneak/feta/manager" "git.eeqj.de/sneak/feta/storage" @@ -82,7 +81,6 @@ func (f *Feta) identify() { } func (f *Feta) setupDatabase() { - f.dbm = database.New() } func (f *Feta) setupLogging() { @@ -118,20 +116,6 @@ func (f *Feta) uptime() time.Duration { return time.Since(f.startup) } -/* -func (f *Feta) setupDatabase() { - var err error - f.db, err = gorm.Open("sqlite3", "feta.sqlite") - - - if err != nil { - panic(err) - } - - //f.databaseMigrations() -} -*/ - func (f *Feta) runForever() int { f.startup = time.Now() @@ -139,10 +123,14 @@ func (f *Feta) runForever() int { // FIXME move this channel creation into the manager's constructor // and add getters/setters on the manager/locator - newInstanceHostnameNotifications := make(chan instance.Hostname) + newInstanceHostnameNotifications := make(chan string) + + f.dbm = database.New() f.locator = locator.New() - f.manager = manager.New() + + f.manager = manager.New(f.dbm) + f.ingester = ingester.NewTootIngester() home := os.Getenv("HOME")