Compare commits

..

2 Commits

Author SHA1 Message Date
2ecd833726 now actually does something
Some checks failed
continuous-integration/drone/push Build is failing
2020-03-27 19:57:58 -07:00
b3f672b84a builds again 2020-03-27 18:17:52 -07:00
15 changed files with 357 additions and 166 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@ feta
output/ output/
feta.sqlite feta.sqlite
.lintsetup .lintsetup
out

View File

@ -2,9 +2,20 @@
archives the fediverse archives the fediverse
# todo
* scan toots for mentions and feed to locator
* put toots in a separate db file
* test with a real database
* save instances to store more often
* verify instances load properly on startup
* do some simple in-memory dedupe for toot storage
* make some templates using pongo2 and a simple website
* update APIs
# status # status
[![CircleCI](https://circleci.com/gh/sneak/feta.svg?style=svg)](https://circleci.com/gh/sneak/feta) [![Build Status](https://drone.datavi.be/api/badges/sneak/feta/status.svg)](https://drone.datavi.be/sneak/feta)
# ethics statement # ethics statement

View File

@ -1,41 +0,0 @@
package database
import (
"time"
"git.eeqj.de/sneak/feta/instance"
"github.com/google/uuid"
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/sqlite"
)
// NB that when you add a model below you must add it to this list!
func (m *Manager) doMigrations() {
m.db.AutoMigrate(&apinstance{})
}
type apinstance struct {
gorm.Model
ID uuid.UUID `gorm:"type:uuid;primary_key;"`
ErrorCount uint
SuccessCount uint
HighestID int
Hostname string
Identified bool
Fetching bool
Disabled bool
Implementation string
NextFetch time.Time
NodeInfoURL string
ServerVersionString string
ServerImplementationString string
}
func (m *Manager) ListInstances() ([]*instance.Instance, error) {
output := make([]*instance.Instance, 0)
// FIXME have this produce a list of Instance
return output, nil
}

92
database/imconnector.go Normal file
View File

@ -0,0 +1,92 @@
package database
import (
"git.eeqj.de/sneak/feta/instance"
"github.com/rs/zerolog/log"
_ "github.com/jinzhu/gorm/dialects/sqlite"
)
func (m *Manager) SaveInstance(i *instance.Instance) error {
i.Lock()
defer i.Unlock()
var x APInstance
if m.db.Where("UUID = ?", i.UUID).First(&x).RecordNotFound() {
log.Info().
Str("hostname", i.Hostname).
Msg("instance not in db, inserting")
// item does not exist in db yet, must insert
ni := APInstance{
UUID: i.UUID,
Disabled: i.Disabled,
ErrorCount: i.ErrorCount,
FSMState: i.Status(),
Fetching: i.Fetching,
HighestID: i.HighestID,
Hostname: i.Hostname,
Identified: i.Identified,
Implementation: i.Implementation,
NextFetch: i.NextFetch,
NodeInfoURL: i.NodeInfoURL,
ServerImplementationString: i.ServerImplementationString,
ServerVersionString: i.ServerVersionString,
SuccessCount: i.SuccessCount,
}
r := m.db.Create(&ni)
return r.Error
} else {
log.Info().
Str("hostname", i.Hostname).
Str("id", i.UUID.String()).
Msg("instance found in db, updating")
// exists in db, update db
var ei APInstance
// EI EI uh-oh
m.db.Where("UUID = ?", i.UUID).First(&ei)
ei.Disabled = i.Disabled
ei.ErrorCount = i.ErrorCount
ei.FSMState = i.Status()
ei.Fetching = i.Fetching
ei.HighestID = i.HighestID
ei.Hostname = i.Hostname
ei.Identified = i.Identified
ei.Implementation = string(i.Implementation)
ei.NextFetch = i.NextFetch
ei.NodeInfoURL = i.NodeInfoURL
ei.ServerImplementationString = i.ServerImplementationString
ei.ServerVersionString = i.ServerVersionString
ei.SuccessCount = i.SuccessCount
r := m.db.Save(&ei)
return r.Error
}
}
func (m *Manager) ListInstances() ([]*instance.Instance, error) {
output := make([]*instance.Instance, 0)
var results []APInstance
m.db.Find(&results)
for _, i := range results {
newinst := instance.New(func(x *instance.Instance) {
x.UUID = i.UUID
x.Disabled = i.Disabled
x.ErrorCount = i.ErrorCount
x.InitialFSMState = i.FSMState
x.Fetching = i.Fetching
x.HighestID = i.HighestID
x.Hostname = i.Hostname
x.Identified = i.Identified
x.Implementation = i.Implementation
x.NextFetch = i.NextFetch
x.NodeInfoURL = i.NodeInfoURL
x.ServerImplementationString = i.ServerImplementationString
x.ServerVersionString = i.ServerVersionString
x.SuccessCount = i.SuccessCount
})
output = append(output, newinst)
}
return output, nil
}

View File

@ -23,11 +23,11 @@ func New() *Manager {
} }
func (m *Manager) init() { func (m *Manager) init() {
m.open()
m.db.LogMode(false) m.db.LogMode(false)
if viper.GetBool("Debug") { if viper.GetBool("Debug") {
m.db.LogMode(true) m.db.LogMode(true)
} }
m.open()
} }
func mkdirp(p string) error { func mkdirp(p string) error {

49
database/model.go Normal file
View File

@ -0,0 +1,49 @@
package database
import (
"time"
"github.com/google/uuid"
"github.com/jinzhu/gorm"
"github.com/rs/zerolog/log"
_ "github.com/jinzhu/gorm/dialects/sqlite"
)
type StoredToot struct {
gorm.Model
UUID uuid.UUID `gorm:"type:uuid;primary_key;"`
//Original string `sql:"type:text"`
Original []byte
Hash string `gorm:"unique_index"`
ServerCreated time.Time
Acct string
Content []byte
URL string
Hostname string
}
type APInstance struct {
gorm.Model
UUID uuid.UUID `gorm:"type:uuid;primary_key;"`
ErrorCount uint
SuccessCount uint
HighestID uint
Hostname string `gorm:"type:varchar(100);unique_index"`
Identified bool
Fetching bool
Disabled bool
Implementation string
NextFetch time.Time
NodeInfoURL string
ServerVersionString string
ServerImplementationString string
FSMState string
}
// NB that when you add a model below you must add it to this list!
func (m *Manager) doMigrations() {
log.Info().Msg("doing database migrations if required")
m.db.AutoMigrate(&APInstance{})
m.db.AutoMigrate(&StoredToot{})
}

View File

@ -0,0 +1,47 @@
package database
import (
"fmt"
"strings"
"git.eeqj.de/sneak/feta/toot"
"github.com/google/uuid"
_ "github.com/jinzhu/gorm/dialects/sqlite"
)
func (m *Manager) TootExists(t *toot.Toot) bool {
var try StoredToot
if m.db.Where("Hash = ?", t.GetHash()).First(&try).RecordNotFound() {
return false
} else {
return true
}
}
func (m *Manager) StoreToot(t *toot.Toot) error {
nt := new(StoredToot)
nt.UUID = uuid.New()
nt.ServerCreated = t.Parsed.CreatedAt
nt.Original = t.Original
// FIXME normalize this, check for @ and append hostname if none
nt.Acct = fmt.Sprintf("%s@%s", t.Parsed.Account.Acct, strings.ToLower(t.FromHost))
nt.URL = t.Parsed.URL
nt.Content = t.Parsed.Content
nt.Hostname = strings.ToLower(t.FromHost)
nt.Hash = t.GetHash()
r := m.db.Create(&nt)
//panic(fmt.Sprintf("%+v", t))
return r.Error
}
func (m *Manager) StoreToots(tc []*toot.Toot) error {
for _, item := range tc {
err := m.StoreToot(item)
if err != nil {
return err
}
}
return nil
}

View File

@ -1,9 +1,12 @@
package ingester package ingester
import "time" import (
import "github.com/rs/zerolog/log" "time"
import "git.eeqj.de/sneak/feta/toot"
import "git.eeqj.de/sneak/feta/storage" "git.eeqj.de/sneak/feta/storage"
"git.eeqj.de/sneak/feta/toot"
"github.com/rs/zerolog/log"
)
// TootIngester is the data structure for the ingester process that is // TootIngester is the data structure for the ingester process that is
// responsible for storing the discovered toots // responsible for storing the discovered toots
@ -15,7 +18,7 @@ type TootIngester struct {
type seenTootMemo struct { type seenTootMemo struct {
lastSeen time.Time lastSeen time.Time
tootHash toot.Hash tootHash string
} }
// NewTootIngester returns a fresh TootIngester for your use // NewTootIngester returns a fresh TootIngester for your use
@ -55,5 +58,5 @@ func (ti *TootIngester) storeToot(t *toot.Toot) {
if ti.storageBackend == nil { if ti.storageBackend == nil {
panic("no storage backend") panic("no storage backend")
} }
ti.storageBackend.StoreToot(*t) ti.storageBackend.StoreToot(t)
} }

View File

@ -11,7 +11,6 @@ import (
"time" "time"
"git.eeqj.de/sneak/feta/jsonapis" "git.eeqj.de/sneak/feta/jsonapis"
"git.eeqj.de/sneak/feta/storage"
"git.eeqj.de/sneak/feta/toot" "git.eeqj.de/sneak/feta/toot"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/looplab/fsm" "github.com/looplab/fsm"
@ -26,49 +25,43 @@ const instanceHTTPTimeout = time.Second * 120
const instanceSpiderInterval = time.Second * 120 const instanceSpiderInterval = time.Second * 120
const instanceErrorInterval = time.Second * 60 * 30 const instanceErrorInterval = time.Second * 60 * 30
type instanceImplementation int
// Hostname is a special type for holding the hostname of an
// instance (string)
type Hostname string
const (
implUnknown instanceImplementation = iota
implMastodon
implPleroma
)
// Instance stores all the information we know about an instance // Instance stores all the information we know about an instance
type Instance struct { type Instance struct {
Identifier uuid.UUID Disabled bool
structLock sync.Mutex
tootDestination chan *toot.Toot
ErrorCount uint ErrorCount uint
SuccessCount uint FSM *fsm.FSM
highestID int Fetching bool
HighestID uint
Hostname string Hostname string
Identified bool Identified bool
fetching bool Implementation string
disabled bool InitialFSMState string
implementation instanceImplementation
storageBackend *storage.TootStorageBackend
NextFetch time.Time NextFetch time.Time
nodeInfoURL string NodeInfoURL string
ServerVersionString string
ServerImplementationString string ServerImplementationString string
ServerVersionString string
SuccessCount uint
UUID uuid.UUID
fetchingLock sync.Mutex fetchingLock sync.Mutex
fsm *fsm.FSM
fsmLock sync.Mutex fsmLock sync.Mutex
structLock sync.Mutex
tootDestination chan *toot.Toot
} }
// New returns a new instance, argument is a function that operates on the // New returns a new instance, argument is a function that operates on the
// new instance // new instance
func New(options ...func(i *Instance)) *Instance { func New(options ...func(i *Instance)) *Instance {
i := new(Instance) i := new(Instance)
i.UUID = uuid.New()
i.setNextFetchAfter(1 * time.Second) i.setNextFetchAfter(1 * time.Second)
i.InitialFSMState = "STATUS_UNKNOWN"
i.fsm = fsm.NewFSM( for _, opt := range options {
"STATUS_UNKNOWN", opt(i)
}
i.FSM = fsm.NewFSM(
i.InitialFSMState,
fsm.Events{ fsm.Events{
{Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"}, {Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"},
{Name: "GOT_NODEINFO_URL", Src: []string{"FETCHING_NODEINFO_URL"}, Dst: "PRE_NODEINFO_FETCH"}, {Name: "GOT_NODEINFO_URL", Src: []string{"FETCHING_NODEINFO_URL"}, Dst: "PRE_NODEINFO_FETCH"},
@ -85,10 +78,6 @@ func New(options ...func(i *Instance)) *Instance {
"enter_state": func(e *fsm.Event) { i.fsmEnterState(e) }, "enter_state": func(e *fsm.Event) { i.fsmEnterState(e) },
}, },
) )
for _, opt := range options {
opt(i)
}
return i return i
} }
@ -96,7 +85,7 @@ func New(options ...func(i *Instance)) *Instance {
func (i *Instance) Status() string { func (i *Instance) Status() string {
i.fsmLock.Lock() i.fsmLock.Lock()
defer i.fsmLock.Unlock() defer i.fsmLock.Unlock()
return i.fsm.Current() return i.FSM.Current()
} }
// SetTootDestination takes a channel from the manager that all toots // SetTootDestination takes a channel from the manager that all toots
@ -111,7 +100,7 @@ func (i *Instance) SetTootDestination(d chan *toot.Toot) {
func (i *Instance) Event(eventname string) { func (i *Instance) Event(eventname string) {
i.fsmLock.Lock() i.fsmLock.Lock()
defer i.fsmLock.Unlock() defer i.fsmLock.Unlock()
i.fsm.Event(eventname) i.FSM.Event(eventname)
} }
func (i *Instance) fsmEnterState(e *fsm.Event) { func (i *Instance) fsmEnterState(e *fsm.Event) {
@ -198,7 +187,7 @@ func (i *Instance) Tick() {
func (i *Instance) nodeIdentified() bool { func (i *Instance) nodeIdentified() bool {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
if i.implementation > implUnknown { if i.Implementation != "" {
return true return true
} }
return false return false
@ -288,7 +277,7 @@ func (i *Instance) fetchNodeInfoURL() error {
Msg("success fetching url for nodeinfo") Msg("success fetching url for nodeinfo")
i.Lock() i.Lock()
i.nodeInfoURL = item.Href i.NodeInfoURL = item.Href
i.Unlock() i.Unlock()
i.registerSuccess() i.registerSuccess()
i.Event("GOT_NODEINFO_URL") i.Event("GOT_NODEINFO_URL")
@ -323,7 +312,7 @@ func (i *Instance) fetchNodeInfo() error {
//FIXME make sure the nodeinfourl is on the same domain as the instance //FIXME make sure the nodeinfourl is on the same domain as the instance
//hostname //hostname
i.Lock() i.Lock()
url := i.nodeInfoURL url := i.NodeInfoURL
i.Unlock() i.Unlock()
i.Event("BEGIN_NODEINFO_FETCH") i.Event("BEGIN_NODEINFO_FETCH")
@ -368,7 +357,7 @@ func (i *Instance) fetchNodeInfo() error {
Str("serverVersion", ni.Software.Version). Str("serverVersion", ni.Software.Version).
Str("software", ni.Software.Name). Str("software", ni.Software.Name).
Str("hostname", i.Hostname). Str("hostname", i.Hostname).
Str("nodeInfoURL", i.nodeInfoURL). Str("nodeInfoURL", i.NodeInfoURL).
Msg("received nodeinfo from instance") Msg("received nodeinfo from instance")
i.Lock() i.Lock()
@ -382,7 +371,7 @@ func (i *Instance) fetchNodeInfo() error {
Str("software", ni.Software.Name). Str("software", ni.Software.Name).
Msg("detected server software") Msg("detected server software")
i.Identified = true i.Identified = true
i.implementation = implPleroma i.Implementation = "pleroma"
i.Unlock() i.Unlock()
i.registerSuccess() i.registerSuccess()
i.Event("GOT_NODEINFO") i.Event("GOT_NODEINFO")
@ -393,7 +382,7 @@ func (i *Instance) fetchNodeInfo() error {
Str("software", ni.Software.Name). Str("software", ni.Software.Name).
Msg("detected server software") Msg("detected server software")
i.Identified = true i.Identified = true
i.implementation = implMastodon i.Implementation = "mastodon"
i.Unlock() i.Unlock()
i.registerSuccess() i.registerSuccess()
i.Event("GOT_NODEINFO") i.Event("GOT_NODEINFO")

View File

@ -7,15 +7,12 @@ import (
"sync" "sync"
"time" "time"
"git.eeqj.de/sneak/feta/instance"
"git.eeqj.de/sneak/feta/jsonapis" "git.eeqj.de/sneak/feta/jsonapis"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/spf13/viper" "github.com/spf13/viper"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
) )
//import "git.eeqj.de/sneak/feta"
// IndexAPITimeout is the timeout for fetching json instance lists // IndexAPITimeout is the timeout for fetching json instance lists
// from the listing servers // from the listing servers
const IndexAPITimeout = time.Second * 60 * 3 const IndexAPITimeout = time.Second * 60 * 3
@ -39,7 +36,7 @@ const pleromaIndexURL = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api
type InstanceLocator struct { type InstanceLocator struct {
pleromaIndexNextRefresh *time.Time pleromaIndexNextRefresh *time.Time
mastodonIndexNextRefresh *time.Time mastodonIndexNextRefresh *time.Time
reportInstanceVia chan instance.Hostname reportInstanceVia chan string
mu sync.Mutex mu sync.Mutex
} }
@ -62,13 +59,13 @@ func (il *InstanceLocator) unlock() {
// SetInstanceNotificationChannel is the way the instanceLocator returns // SetInstanceNotificationChannel is the way the instanceLocator returns
// newly discovered instances back to the manager for query/addition // newly discovered instances back to the manager for query/addition
func (il *InstanceLocator) SetInstanceNotificationChannel(via chan instance.Hostname) { func (il *InstanceLocator) SetInstanceNotificationChannel(via chan string) {
il.lock() il.lock()
defer il.unlock() defer il.unlock()
il.reportInstanceVia = via il.reportInstanceVia = via
} }
func (il *InstanceLocator) addInstance(hostname instance.Hostname) { func (il *InstanceLocator) addInstance(hostname string) {
// receiver (InstanceManager) is responsible for de-duping against its // receiver (InstanceManager) is responsible for de-duping against its
// map, we just locate and spray, it manages // map, we just locate and spray, it manages
il.reportInstanceVia <- hostname il.reportInstanceVia <- hostname
@ -201,7 +198,7 @@ func (il *InstanceLocator) locateMastodon() {
Msg("received hosts from mastodon index") Msg("received hosts from mastodon index")
for k := range hosts { for k := range hosts {
il.addInstance(instance.Hostname(k)) il.addInstance(k)
} }
} }
@ -269,7 +266,7 @@ func (il *InstanceLocator) locatePleroma() {
Msg("received hosts from pleroma index") Msg("received hosts from pleroma index")
for k := range hosts { for k := range hosts {
il.addInstance(instance.Hostname(k)) il.addInstance(k)
} }
} }

View File

@ -14,7 +14,8 @@ import (
// conform for storing toots // conform for storing toots
type DatabaseStorage interface { type DatabaseStorage interface {
ListInstances() ([]*instance.Instance, error) ListInstances() ([]*instance.Instance, error)
StoreInstances([]*instance.Instance) error //StoreInstances([]*instance.Instance) error
SaveInstance(*instance.Instance) error
} }
// InstanceManager is the main data structure for the goroutine that manages // InstanceManager is the main data structure for the goroutine that manages
@ -22,21 +23,53 @@ type DatabaseStorage interface {
type InstanceManager struct { type InstanceManager struct {
mu sync.Mutex mu sync.Mutex
db DatabaseStorage db DatabaseStorage
instances map[instance.Hostname]*instance.Instance instances map[string]*instance.Instance
newInstanceNotifications chan instance.Hostname newInstanceNotifications chan string
tootDestination chan *toot.Toot tootDestination chan *toot.Toot
startup time.Time startup time.Time
hostDiscoveryParallelism int
hostAdderSemaphore chan bool hostAdderSemaphore chan bool
nextDBSave time.Time
} }
// New returns a new InstanceManager for use by the Process // New returns a new InstanceManager for use by the Process
func New() *InstanceManager { func New(db DatabaseStorage) *InstanceManager {
i := new(InstanceManager) im := new(InstanceManager)
i.hostDiscoveryParallelism = viper.GetInt("HostDiscoveryParallelism") im.db = db
i.hostAdderSemaphore = make(chan bool, i.hostDiscoveryParallelism) im.hostAdderSemaphore = make(chan bool, viper.GetInt("HostDiscoveryParallelism"))
i.instances = make(map[instance.Hostname]*instance.Instance) im.instances = make(map[string]*instance.Instance)
return i im.RestoreFromDB()
return im
}
func (im *InstanceManager) RestoreFromDB() {
newil, err := im.db.ListInstances()
if err != nil {
log.Panic().
Err(err).
Msg("cannot get instance list from db")
}
im.lock()
defer im.unlock()
count := 0
for _, x := range newil {
x.SetTootDestination(im.tootDestination)
im.instances[x.Hostname] = x
count = count + 1
}
log.Info().
Int("count", count).
Msg("restored instances from database")
}
func (im *InstanceManager) SaveToDB() {
for _, x := range im.ListInstances() {
err := im.db.SaveInstance(x)
if err != nil {
log.Panic().
Err(err).
Msg("cannot write to db")
}
}
} }
// SetTootDestination provides the instancemanager with a channel to the // SetTootDestination provides the instancemanager with a channel to the
@ -57,7 +90,7 @@ func (im *InstanceManager) unlock() {
// InstanceManager about the channel from the InstanceLocator so that the // InstanceManager about the channel from the InstanceLocator so that the
// InstanceLocator can provide it/us (the InstanceManager) with new // InstanceLocator can provide it/us (the InstanceManager) with new
// instance.Hostnames. We (the manager) deduplicate the list ourselves. // instance.Hostnames. We (the manager) deduplicate the list ourselves.
func (im *InstanceManager) SetInstanceNotificationChannel(via chan instance.Hostname) { func (im *InstanceManager) SetInstanceNotificationChannel(via chan string) {
im.lock() im.lock()
defer im.unlock() defer im.unlock()
im.newInstanceNotifications = via im.newInstanceNotifications = via
@ -65,9 +98,9 @@ func (im *InstanceManager) SetInstanceNotificationChannel(via chan instance.Host
func (im *InstanceManager) receiveSeedInstanceHostnames() { func (im *InstanceManager) receiveSeedInstanceHostnames() {
for _, x := range seeds.SeedInstances { for _, x := range seeds.SeedInstances {
go func(tmp instance.Hostname) { go func(tmp string) {
im.addInstanceByHostname(tmp) im.addInstanceByHostname(tmp)
}(instance.Hostname(x)) }(x)
} }
} }
@ -94,6 +127,11 @@ func (im *InstanceManager) Manage() {
x = time.Now() x = time.Now()
im.logInstanceReport() im.logInstanceReport()
} }
if im.nextDBSave.Before(time.Now()) {
im.nextDBSave = time.Now().Add(time.Second * 60)
im.SaveToDB()
}
} }
} }
@ -113,7 +151,7 @@ func (im *InstanceManager) managerLoop() {
} }
} }
func (im *InstanceManager) hostnameExists(newhn instance.Hostname) bool { func (im *InstanceManager) hostnameExists(newhn string) bool {
im.lock() im.lock()
defer im.unlock() defer im.unlock()
for k := range im.instances { for k := range im.instances {
@ -124,7 +162,7 @@ func (im *InstanceManager) hostnameExists(newhn instance.Hostname) bool {
return false return false
} }
func (im *InstanceManager) addInstanceByHostname(newhn instance.Hostname) { func (im *InstanceManager) addInstanceByHostname(newhn string) {
if im.hostnameExists(newhn) { if im.hostnameExists(newhn) {
// ignore adding new if we already know about it // ignore adding new if we already know about it
return return
@ -152,7 +190,7 @@ func (im *InstanceManager) addInstanceByHostname(newhn instance.Hostname) {
} }
func (im *InstanceManager) receiveNewInstanceHostnames() { func (im *InstanceManager) receiveNewInstanceHostnames() {
var newhn instance.Hostname var newhn string
for { for {
newhn = <-im.newInstanceNotifications newhn = <-im.newInstanceNotifications
// receive them fast out of the channel, let the adding function lock to add // receive them fast out of the channel, let the adding function lock to add
@ -178,7 +216,6 @@ func (im *InstanceManager) logInstanceReport() {
// ListInstances dumps a slice of all Instances the InstanceManager knows // ListInstances dumps a slice of all Instances the InstanceManager knows
// about // about
func (im *InstanceManager) ListInstances() []*instance.Instance { func (im *InstanceManager) ListInstances() []*instance.Instance {
// FIXME make this pull from db
var out []*instance.Instance var out []*instance.Instance
im.lock() im.lock()
defer im.unlock() defer im.unlock()
@ -189,7 +226,6 @@ func (im *InstanceManager) ListInstances() []*instance.Instance {
} }
func (im *InstanceManager) instanceSummaryReport() map[string]uint { func (im *InstanceManager) instanceSummaryReport() map[string]uint {
// FIXME make this pull from db
r := make(map[string]uint) r := make(map[string]uint)
for _, v := range im.ListInstances() { for _, v := range im.ListInstances() {
v.Lock() v.Lock()

View File

@ -6,7 +6,6 @@ import (
"git.eeqj.de/sneak/feta/database" "git.eeqj.de/sneak/feta/database"
"git.eeqj.de/sneak/feta/ingester" "git.eeqj.de/sneak/feta/ingester"
"git.eeqj.de/sneak/feta/instance"
"git.eeqj.de/sneak/feta/locator" "git.eeqj.de/sneak/feta/locator"
"git.eeqj.de/sneak/feta/manager" "git.eeqj.de/sneak/feta/manager"
"git.eeqj.de/sneak/feta/storage" "git.eeqj.de/sneak/feta/storage"
@ -53,6 +52,8 @@ func (f *Feta) configure() {
viper.AutomaticEnv() viper.AutomaticEnv()
viper.SetDefault("Debug", false) viper.SetDefault("Debug", false)
viper.SetDefault("TootsToDisk", false)
viper.SetDefault("TootsToDB", true)
viper.SetDefault("HostDiscoveryParallelism", 5) viper.SetDefault("HostDiscoveryParallelism", 5)
viper.SetDefault("FSStorageLocation", os.ExpandEnv("$HOME/Library/ApplicationSupport/feta/tootarchive.d")) viper.SetDefault("FSStorageLocation", os.ExpandEnv("$HOME/Library/ApplicationSupport/feta/tootarchive.d"))
viper.SetDefault("DBStorageLocation", os.ExpandEnv("$HOME/Library/ApplicationSupport/feta/feta.state.db")) viper.SetDefault("DBStorageLocation", os.ExpandEnv("$HOME/Library/ApplicationSupport/feta/feta.state.db"))
@ -82,7 +83,6 @@ func (f *Feta) identify() {
} }
func (f *Feta) setupDatabase() { func (f *Feta) setupDatabase() {
f.dbm = database.New()
} }
func (f *Feta) setupLogging() { func (f *Feta) setupLogging() {
@ -118,20 +118,6 @@ func (f *Feta) uptime() time.Duration {
return time.Since(f.startup) return time.Since(f.startup)
} }
/*
func (f *Feta) setupDatabase() {
var err error
f.db, err = gorm.Open("sqlite3", "feta.sqlite")
if err != nil {
panic(err)
}
//f.databaseMigrations()
}
*/
func (f *Feta) runForever() int { func (f *Feta) runForever() int {
f.startup = time.Now() f.startup = time.Now()
@ -139,10 +125,14 @@ func (f *Feta) runForever() int {
// FIXME move this channel creation into the manager's constructor // FIXME move this channel creation into the manager's constructor
// and add getters/setters on the manager/locator // and add getters/setters on the manager/locator
newInstanceHostnameNotifications := make(chan instance.Hostname) newInstanceHostnameNotifications := make(chan string)
f.dbm = database.New()
f.locator = locator.New() f.locator = locator.New()
f.manager = manager.New()
f.manager = manager.New(f.dbm)
f.ingester = ingester.NewTootIngester() f.ingester = ingester.NewTootIngester()
home := os.Getenv("HOME") home := os.Getenv("HOME")
@ -150,8 +140,14 @@ func (f *Feta) runForever() int {
panic("can't find home directory") panic("can't find home directory")
} }
diskBackend := storage.NewTootFSStorage(home + "/.local/feta") if viper.GetBool("TootsToDB") {
f.ingester.SetStorageBackend(f.dbm)
} else if viper.GetBool("TootsToDisk") {
diskBackend := storage.NewTootFSStorage(viper.GetString("FSStorageLocation"))
f.ingester.SetStorageBackend(diskBackend) f.ingester.SetStorageBackend(diskBackend)
} else {
log.Info().Msg("toots will not be saved to disk")
}
f.api = new(Server) f.api = new(Server)
f.api.SetFeta(f) // api needs to get to us to access data f.api.SetFeta(f) // api needs to get to us to access data

View File

@ -11,18 +11,6 @@ import (
"git.eeqj.de/sneak/feta/toot" "git.eeqj.de/sneak/feta/toot"
) )
// TootStorageBackend is the interface to which storage backends must
// conform for storing toots
type TootStorageBackend interface {
TootExists(t toot.Toot) bool
StoreToot(t toot.Toot) error
StoreToots(tc []*toot.Toot) error
}
type TootDBStorage struct {
db string
}
// TootFSStorage is a TootStorageBackend that writes to the local // TootFSStorage is a TootStorageBackend that writes to the local
// filesystem. // filesystem.
type TootFSStorage struct { type TootFSStorage struct {
@ -41,7 +29,7 @@ func NewTootFSStorage(root string) *TootFSStorage {
func (ts *TootFSStorage) StoreToots(tc []*toot.Toot) error { func (ts *TootFSStorage) StoreToots(tc []*toot.Toot) error {
var returnErrors []string var returnErrors []string
for _, item := range tc { for _, item := range tc {
err := ts.StoreToot(*item) err := ts.StoreToot(item)
if err != nil { if err != nil {
returnErrors = append(returnErrors, err.Error()) returnErrors = append(returnErrors, err.Error())
continue continue
@ -56,7 +44,7 @@ func (ts *TootFSStorage) StoreToots(tc []*toot.Toot) error {
// TootExists checks to see if we have already written a toot to disk or // TootExists checks to see if we have already written a toot to disk or
// not. Note that the ingester de-dupes with a table in memory so that this // not. Note that the ingester de-dupes with a table in memory so that this
// will only really get used on app restarts // will only really get used on app restarts
func (ts *TootFSStorage) TootExists(t toot.Toot) bool { func (ts *TootFSStorage) TootExists(t *toot.Toot) bool {
path := t.DiskStoragePath() path := t.DiskStoragePath()
full := ts.root + "/" + path full := ts.root + "/" + path
_, err := os.Stat(full) _, err := os.Stat(full)
@ -67,7 +55,7 @@ func (ts *TootFSStorage) TootExists(t toot.Toot) bool {
} }
// StoreToot writes a single toot to disk // StoreToot writes a single toot to disk
func (ts *TootFSStorage) StoreToot(t toot.Toot) error { func (ts *TootFSStorage) StoreToot(t *toot.Toot) error {
path := t.DiskStoragePath() path := t.DiskStoragePath()
full := ts.root + "/" + path full := ts.root + "/" + path
dir := filepath.Dir(full) dir := filepath.Dir(full)
@ -82,7 +70,7 @@ func (ts *TootFSStorage) StoreToot(t toot.Toot) error {
// toots in ram forever until the computer fills up and catches fire and explodes // toots in ram forever until the computer fills up and catches fire and explodes
type TootMemoryStorage struct { type TootMemoryStorage struct {
sync.Mutex sync.Mutex
toots map[toot.Hash]toot.Toot toots map[string]*toot.Toot
//maxSize uint // FIXME support eviction //maxSize uint // FIXME support eviction
} }
@ -90,12 +78,12 @@ type TootMemoryStorage struct {
// ram forever // ram forever
func NewTootMemoryStorage() *TootMemoryStorage { func NewTootMemoryStorage() *TootMemoryStorage {
ts := new(TootMemoryStorage) ts := new(TootMemoryStorage)
ts.toots = make(map[toot.Hash]toot.Toot) ts.toots = make(map[string]*toot.Toot)
return ts return ts
} }
// StoreToot saves a single toot into an in-memory hashtable // StoreToot saves a single toot into an in-memory hashtable
func (ts *TootMemoryStorage) StoreToot(t toot.Toot) { func (ts *TootMemoryStorage) StoreToot(t *toot.Toot) {
if ts.TootExists(t) { if ts.TootExists(t) {
return return
} }
@ -106,7 +94,7 @@ func (ts *TootMemoryStorage) StoreToot(t toot.Toot) {
} }
// TootExists checks to see if we have a toot in memory already // TootExists checks to see if we have a toot in memory already
func (ts *TootMemoryStorage) TootExists(t toot.Toot) bool { func (ts *TootMemoryStorage) TootExists(t *toot.Toot) bool {
ts.Lock() ts.Lock()
defer ts.Unlock() defer ts.Unlock()
if _, ok := ts.toots[t.Hash]; ok { //this syntax is so gross if _, ok := ts.toots[t.Hash]; ok { //this syntax is so gross

13
storage/interface.go Normal file
View File

@ -0,0 +1,13 @@
package storage
import (
"git.eeqj.de/sneak/feta/toot"
)
// TootStorageBackend is the interface to which storage backends must
// conform for storing toots
type TootStorageBackend interface {
TootExists(t *toot.Toot) bool
StoreToot(t *toot.Toot) error
StoreToots(tc []*toot.Toot) error
}

View File

@ -1,27 +1,30 @@
package toot package toot
import "fmt" import (
import "encoding/json" "encoding/json"
import "errors" "errors"
import "strings" "fmt"
import "git.eeqj.de/sneak/feta/jsonapis" "strings"
"git.eeqj.de/sneak/feta/jsonapis"
"github.com/rs/zerolog/log"
//import "github.com/davecgh/go-spew/spew" //import "github.com/davecgh/go-spew/spew"
import "github.com/rs/zerolog/log"
//import "encoding/hex" //import "encoding/hex"
import mh "github.com/multiformats/go-multihash" mh "github.com/multiformats/go-multihash"
import mhopts "github.com/multiformats/go-multihash/opts"
mhopts "github.com/multiformats/go-multihash/opts"
)
// Hash is a type for storing a string-based base58 multihash of a // Hash is a type for storing a string-based base58 multihash of a
// toot's identity // toot's identity
type Hash string
// Toot is an object we use internally for storing a discovered toot // Toot is an object we use internally for storing a discovered toot
type Toot struct { type Toot struct {
Original []byte Original []byte
Parsed *jsonapis.APISerializedToot Parsed *jsonapis.APISerializedToot
Hash Hash Hash string
FromHost string FromHost string
} }
@ -111,7 +114,14 @@ func (t *Toot) identityHashInput() string {
) )
} }
func (t *Toot) GetHash() string {
if t.Hash == "" {
t.calcHash()
}
return t.Hash
}
func (t *Toot) calcHash() { func (t *Toot) calcHash() {
hi := t.identityHashInput() hi := t.identityHashInput()
t.Hash = Hash(t.multiHash([]byte(hi))) t.Hash = string(t.multiHash([]byte(hi)))
} }