Comparer les révisions

..

Pas de révisions en commun. "2ecd833726039b24455c54b0488071a01994522e" et "84b19fb14e555d9c630ca4ff9e639f50b42651fa" ont des historiques entièrement différents.

15 fichiers modifiés avec 171 ajouts et 362 suppressions

1
.gitignore externe
Voir le fichier

@ -2,4 +2,3 @@ feta
output/ output/
feta.sqlite feta.sqlite
.lintsetup .lintsetup
out

Voir le fichier

@ -2,20 +2,9 @@
archives the fediverse archives the fediverse
# todo
* scan toots for mentions and feed to locator
* put toots in a separate db file
* test with a real database
* save instances to store more often
* verify instances load properly on startup
* do some simple in-memory dedupe for toot storage
* make some templates using pongo2 and a simple website
* update APIs
# status # status
[![Build Status](https://drone.datavi.be/api/badges/sneak/feta/status.svg)](https://drone.datavi.be/sneak/feta) [![CircleCI](https://circleci.com/gh/sneak/feta.svg?style=svg)](https://circleci.com/gh/sneak/feta)
# ethics statement # ethics statement

41
database/dbmodel.go Fichier normal
Voir le fichier

@ -0,0 +1,41 @@
package database
import (
"time"
"git.eeqj.de/sneak/feta/instance"
"github.com/google/uuid"
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/sqlite"
)
// NB that when you add a model below you must add it to this list!
func (m *Manager) doMigrations() {
m.db.AutoMigrate(&apinstance{})
}
type apinstance struct {
gorm.Model
ID uuid.UUID `gorm:"type:uuid;primary_key;"`
ErrorCount uint
SuccessCount uint
HighestID int
Hostname string
Identified bool
Fetching bool
Disabled bool
Implementation string
NextFetch time.Time
NodeInfoURL string
ServerVersionString string
ServerImplementationString string
}
func (m *Manager) ListInstances() ([]*instance.Instance, error) {
output := make([]*instance.Instance, 0)
// FIXME have this produce a list of Instance
return output, nil
}

Voir le fichier

@ -1,92 +0,0 @@
package database
import (
"git.eeqj.de/sneak/feta/instance"
"github.com/rs/zerolog/log"
_ "github.com/jinzhu/gorm/dialects/sqlite"
)
func (m *Manager) SaveInstance(i *instance.Instance) error {
i.Lock()
defer i.Unlock()
var x APInstance
if m.db.Where("UUID = ?", i.UUID).First(&x).RecordNotFound() {
log.Info().
Str("hostname", i.Hostname).
Msg("instance not in db, inserting")
// item does not exist in db yet, must insert
ni := APInstance{
UUID: i.UUID,
Disabled: i.Disabled,
ErrorCount: i.ErrorCount,
FSMState: i.Status(),
Fetching: i.Fetching,
HighestID: i.HighestID,
Hostname: i.Hostname,
Identified: i.Identified,
Implementation: i.Implementation,
NextFetch: i.NextFetch,
NodeInfoURL: i.NodeInfoURL,
ServerImplementationString: i.ServerImplementationString,
ServerVersionString: i.ServerVersionString,
SuccessCount: i.SuccessCount,
}
r := m.db.Create(&ni)
return r.Error
} else {
log.Info().
Str("hostname", i.Hostname).
Str("id", i.UUID.String()).
Msg("instance found in db, updating")
// exists in db, update db
var ei APInstance
// EI EI uh-oh
m.db.Where("UUID = ?", i.UUID).First(&ei)
ei.Disabled = i.Disabled
ei.ErrorCount = i.ErrorCount
ei.FSMState = i.Status()
ei.Fetching = i.Fetching
ei.HighestID = i.HighestID
ei.Hostname = i.Hostname
ei.Identified = i.Identified
ei.Implementation = string(i.Implementation)
ei.NextFetch = i.NextFetch
ei.NodeInfoURL = i.NodeInfoURL
ei.ServerImplementationString = i.ServerImplementationString
ei.ServerVersionString = i.ServerVersionString
ei.SuccessCount = i.SuccessCount
r := m.db.Save(&ei)
return r.Error
}
}
func (m *Manager) ListInstances() ([]*instance.Instance, error) {
output := make([]*instance.Instance, 0)
var results []APInstance
m.db.Find(&results)
for _, i := range results {
newinst := instance.New(func(x *instance.Instance) {
x.UUID = i.UUID
x.Disabled = i.Disabled
x.ErrorCount = i.ErrorCount
x.InitialFSMState = i.FSMState
x.Fetching = i.Fetching
x.HighestID = i.HighestID
x.Hostname = i.Hostname
x.Identified = i.Identified
x.Implementation = i.Implementation
x.NextFetch = i.NextFetch
x.NodeInfoURL = i.NodeInfoURL
x.ServerImplementationString = i.ServerImplementationString
x.ServerVersionString = i.ServerVersionString
x.SuccessCount = i.SuccessCount
})
output = append(output, newinst)
}
return output, nil
}

Voir le fichier

@ -23,11 +23,11 @@ func New() *Manager {
} }
func (m *Manager) init() { func (m *Manager) init() {
m.open()
m.db.LogMode(false) m.db.LogMode(false)
if viper.GetBool("Debug") { if viper.GetBool("Debug") {
m.db.LogMode(true) m.db.LogMode(true)
} }
m.open()
} }
func mkdirp(p string) error { func mkdirp(p string) error {

Voir le fichier

@ -1,49 +0,0 @@
package database
import (
"time"
"github.com/google/uuid"
"github.com/jinzhu/gorm"
"github.com/rs/zerolog/log"
_ "github.com/jinzhu/gorm/dialects/sqlite"
)
type StoredToot struct {
gorm.Model
UUID uuid.UUID `gorm:"type:uuid;primary_key;"`
//Original string `sql:"type:text"`
Original []byte
Hash string `gorm:"unique_index"`
ServerCreated time.Time
Acct string
Content []byte
URL string
Hostname string
}
type APInstance struct {
gorm.Model
UUID uuid.UUID `gorm:"type:uuid;primary_key;"`
ErrorCount uint
SuccessCount uint
HighestID uint
Hostname string `gorm:"type:varchar(100);unique_index"`
Identified bool
Fetching bool
Disabled bool
Implementation string
NextFetch time.Time
NodeInfoURL string
ServerVersionString string
ServerImplementationString string
FSMState string
}
// NB that when you add a model below you must add it to this list!
func (m *Manager) doMigrations() {
log.Info().Msg("doing database migrations if required")
m.db.AutoMigrate(&APInstance{})
m.db.AutoMigrate(&StoredToot{})
}

Voir le fichier

@ -1,47 +0,0 @@
package database
import (
"fmt"
"strings"
"git.eeqj.de/sneak/feta/toot"
"github.com/google/uuid"
_ "github.com/jinzhu/gorm/dialects/sqlite"
)
func (m *Manager) TootExists(t *toot.Toot) bool {
var try StoredToot
if m.db.Where("Hash = ?", t.GetHash()).First(&try).RecordNotFound() {
return false
} else {
return true
}
}
func (m *Manager) StoreToot(t *toot.Toot) error {
nt := new(StoredToot)
nt.UUID = uuid.New()
nt.ServerCreated = t.Parsed.CreatedAt
nt.Original = t.Original
// FIXME normalize this, check for @ and append hostname if none
nt.Acct = fmt.Sprintf("%s@%s", t.Parsed.Account.Acct, strings.ToLower(t.FromHost))
nt.URL = t.Parsed.URL
nt.Content = t.Parsed.Content
nt.Hostname = strings.ToLower(t.FromHost)
nt.Hash = t.GetHash()
r := m.db.Create(&nt)
//panic(fmt.Sprintf("%+v", t))
return r.Error
}
func (m *Manager) StoreToots(tc []*toot.Toot) error {
for _, item := range tc {
err := m.StoreToot(item)
if err != nil {
return err
}
}
return nil
}

Voir le fichier

@ -1,12 +1,9 @@
package ingester package ingester
import ( import "time"
"time" import "github.com/rs/zerolog/log"
import "git.eeqj.de/sneak/feta/toot"
"git.eeqj.de/sneak/feta/storage" import "git.eeqj.de/sneak/feta/storage"
"git.eeqj.de/sneak/feta/toot"
"github.com/rs/zerolog/log"
)
// TootIngester is the data structure for the ingester process that is // TootIngester is the data structure for the ingester process that is
// responsible for storing the discovered toots // responsible for storing the discovered toots
@ -18,7 +15,7 @@ type TootIngester struct {
type seenTootMemo struct { type seenTootMemo struct {
lastSeen time.Time lastSeen time.Time
tootHash string tootHash toot.Hash
} }
// NewTootIngester returns a fresh TootIngester for your use // NewTootIngester returns a fresh TootIngester for your use
@ -58,5 +55,5 @@ func (ti *TootIngester) storeToot(t *toot.Toot) {
if ti.storageBackend == nil { if ti.storageBackend == nil {
panic("no storage backend") panic("no storage backend")
} }
ti.storageBackend.StoreToot(t) ti.storageBackend.StoreToot(*t)
} }

Voir le fichier

@ -11,6 +11,7 @@ import (
"time" "time"
"git.eeqj.de/sneak/feta/jsonapis" "git.eeqj.de/sneak/feta/jsonapis"
"git.eeqj.de/sneak/feta/storage"
"git.eeqj.de/sneak/feta/toot" "git.eeqj.de/sneak/feta/toot"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/looplab/fsm" "github.com/looplab/fsm"
@ -25,43 +26,49 @@ const instanceHTTPTimeout = time.Second * 120
const instanceSpiderInterval = time.Second * 120 const instanceSpiderInterval = time.Second * 120
const instanceErrorInterval = time.Second * 60 * 30 const instanceErrorInterval = time.Second * 60 * 30
type instanceImplementation int
// Hostname is a special type for holding the hostname of an
// instance (string)
type Hostname string
const (
implUnknown instanceImplementation = iota
implMastodon
implPleroma
)
// Instance stores all the information we know about an instance // Instance stores all the information we know about an instance
type Instance struct { type Instance struct {
Disabled bool Identifier uuid.UUID
ErrorCount uint
FSM *fsm.FSM
Fetching bool
HighestID uint
Hostname string
Identified bool
Implementation string
InitialFSMState string
NextFetch time.Time
NodeInfoURL string
ServerImplementationString string
ServerVersionString string
SuccessCount uint
UUID uuid.UUID
fetchingLock sync.Mutex
fsmLock sync.Mutex
structLock sync.Mutex structLock sync.Mutex
tootDestination chan *toot.Toot tootDestination chan *toot.Toot
ErrorCount uint
SuccessCount uint
highestID int
Hostname string
Identified bool
fetching bool
disabled bool
implementation instanceImplementation
storageBackend *storage.TootStorageBackend
NextFetch time.Time
nodeInfoURL string
ServerVersionString string
ServerImplementationString string
fetchingLock sync.Mutex
fsm *fsm.FSM
fsmLock sync.Mutex
} }
// New returns a new instance, argument is a function that operates on the // New returns a new instance, argument is a function that operates on the
// new instance // new instance
func New(options ...func(i *Instance)) *Instance { func New(options ...func(i *Instance)) *Instance {
i := new(Instance) i := new(Instance)
i.UUID = uuid.New()
i.setNextFetchAfter(1 * time.Second) i.setNextFetchAfter(1 * time.Second)
i.InitialFSMState = "STATUS_UNKNOWN"
for _, opt := range options { i.fsm = fsm.NewFSM(
opt(i) "STATUS_UNKNOWN",
}
i.FSM = fsm.NewFSM(
i.InitialFSMState,
fsm.Events{ fsm.Events{
{Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"}, {Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"},
{Name: "GOT_NODEINFO_URL", Src: []string{"FETCHING_NODEINFO_URL"}, Dst: "PRE_NODEINFO_FETCH"}, {Name: "GOT_NODEINFO_URL", Src: []string{"FETCHING_NODEINFO_URL"}, Dst: "PRE_NODEINFO_FETCH"},
@ -78,6 +85,10 @@ func New(options ...func(i *Instance)) *Instance {
"enter_state": func(e *fsm.Event) { i.fsmEnterState(e) }, "enter_state": func(e *fsm.Event) { i.fsmEnterState(e) },
}, },
) )
for _, opt := range options {
opt(i)
}
return i return i
} }
@ -85,7 +96,7 @@ func New(options ...func(i *Instance)) *Instance {
func (i *Instance) Status() string { func (i *Instance) Status() string {
i.fsmLock.Lock() i.fsmLock.Lock()
defer i.fsmLock.Unlock() defer i.fsmLock.Unlock()
return i.FSM.Current() return i.fsm.Current()
} }
// SetTootDestination takes a channel from the manager that all toots // SetTootDestination takes a channel from the manager that all toots
@ -100,7 +111,7 @@ func (i *Instance) SetTootDestination(d chan *toot.Toot) {
func (i *Instance) Event(eventname string) { func (i *Instance) Event(eventname string) {
i.fsmLock.Lock() i.fsmLock.Lock()
defer i.fsmLock.Unlock() defer i.fsmLock.Unlock()
i.FSM.Event(eventname) i.fsm.Event(eventname)
} }
func (i *Instance) fsmEnterState(e *fsm.Event) { func (i *Instance) fsmEnterState(e *fsm.Event) {
@ -187,7 +198,7 @@ func (i *Instance) Tick() {
func (i *Instance) nodeIdentified() bool { func (i *Instance) nodeIdentified() bool {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
if i.Implementation != "" { if i.implementation > implUnknown {
return true return true
} }
return false return false
@ -277,7 +288,7 @@ func (i *Instance) fetchNodeInfoURL() error {
Msg("success fetching url for nodeinfo") Msg("success fetching url for nodeinfo")
i.Lock() i.Lock()
i.NodeInfoURL = item.Href i.nodeInfoURL = item.Href
i.Unlock() i.Unlock()
i.registerSuccess() i.registerSuccess()
i.Event("GOT_NODEINFO_URL") i.Event("GOT_NODEINFO_URL")
@ -312,7 +323,7 @@ func (i *Instance) fetchNodeInfo() error {
//FIXME make sure the nodeinfourl is on the same domain as the instance //FIXME make sure the nodeinfourl is on the same domain as the instance
//hostname //hostname
i.Lock() i.Lock()
url := i.NodeInfoURL url := i.nodeInfoURL
i.Unlock() i.Unlock()
i.Event("BEGIN_NODEINFO_FETCH") i.Event("BEGIN_NODEINFO_FETCH")
@ -357,7 +368,7 @@ func (i *Instance) fetchNodeInfo() error {
Str("serverVersion", ni.Software.Version). Str("serverVersion", ni.Software.Version).
Str("software", ni.Software.Name). Str("software", ni.Software.Name).
Str("hostname", i.Hostname). Str("hostname", i.Hostname).
Str("nodeInfoURL", i.NodeInfoURL). Str("nodeInfoURL", i.nodeInfoURL).
Msg("received nodeinfo from instance") Msg("received nodeinfo from instance")
i.Lock() i.Lock()
@ -371,7 +382,7 @@ func (i *Instance) fetchNodeInfo() error {
Str("software", ni.Software.Name). Str("software", ni.Software.Name).
Msg("detected server software") Msg("detected server software")
i.Identified = true i.Identified = true
i.Implementation = "pleroma" i.implementation = implPleroma
i.Unlock() i.Unlock()
i.registerSuccess() i.registerSuccess()
i.Event("GOT_NODEINFO") i.Event("GOT_NODEINFO")
@ -382,7 +393,7 @@ func (i *Instance) fetchNodeInfo() error {
Str("software", ni.Software.Name). Str("software", ni.Software.Name).
Msg("detected server software") Msg("detected server software")
i.Identified = true i.Identified = true
i.Implementation = "mastodon" i.implementation = implMastodon
i.Unlock() i.Unlock()
i.registerSuccess() i.registerSuccess()
i.Event("GOT_NODEINFO") i.Event("GOT_NODEINFO")

Voir le fichier

@ -7,12 +7,15 @@ import (
"sync" "sync"
"time" "time"
"git.eeqj.de/sneak/feta/instance"
"git.eeqj.de/sneak/feta/jsonapis" "git.eeqj.de/sneak/feta/jsonapis"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/spf13/viper" "github.com/spf13/viper"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
) )
//import "git.eeqj.de/sneak/feta"
// IndexAPITimeout is the timeout for fetching json instance lists // IndexAPITimeout is the timeout for fetching json instance lists
// from the listing servers // from the listing servers
const IndexAPITimeout = time.Second * 60 * 3 const IndexAPITimeout = time.Second * 60 * 3
@ -36,7 +39,7 @@ const pleromaIndexURL = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api
type InstanceLocator struct { type InstanceLocator struct {
pleromaIndexNextRefresh *time.Time pleromaIndexNextRefresh *time.Time
mastodonIndexNextRefresh *time.Time mastodonIndexNextRefresh *time.Time
reportInstanceVia chan string reportInstanceVia chan instance.Hostname
mu sync.Mutex mu sync.Mutex
} }
@ -59,13 +62,13 @@ func (il *InstanceLocator) unlock() {
// SetInstanceNotificationChannel is the way the instanceLocator returns // SetInstanceNotificationChannel is the way the instanceLocator returns
// newly discovered instances back to the manager for query/addition // newly discovered instances back to the manager for query/addition
func (il *InstanceLocator) SetInstanceNotificationChannel(via chan string) { func (il *InstanceLocator) SetInstanceNotificationChannel(via chan instance.Hostname) {
il.lock() il.lock()
defer il.unlock() defer il.unlock()
il.reportInstanceVia = via il.reportInstanceVia = via
} }
func (il *InstanceLocator) addInstance(hostname string) { func (il *InstanceLocator) addInstance(hostname instance.Hostname) {
// receiver (InstanceManager) is responsible for de-duping against its // receiver (InstanceManager) is responsible for de-duping against its
// map, we just locate and spray, it manages // map, we just locate and spray, it manages
il.reportInstanceVia <- hostname il.reportInstanceVia <- hostname
@ -198,7 +201,7 @@ func (il *InstanceLocator) locateMastodon() {
Msg("received hosts from mastodon index") Msg("received hosts from mastodon index")
for k := range hosts { for k := range hosts {
il.addInstance(k) il.addInstance(instance.Hostname(k))
} }
} }
@ -266,7 +269,7 @@ func (il *InstanceLocator) locatePleroma() {
Msg("received hosts from pleroma index") Msg("received hosts from pleroma index")
for k := range hosts { for k := range hosts {
il.addInstance(k) il.addInstance(instance.Hostname(k))
} }
} }

Voir le fichier

@ -14,8 +14,7 @@ import (
// conform for storing toots // conform for storing toots
type DatabaseStorage interface { type DatabaseStorage interface {
ListInstances() ([]*instance.Instance, error) ListInstances() ([]*instance.Instance, error)
//StoreInstances([]*instance.Instance) error StoreInstances([]*instance.Instance) error
SaveInstance(*instance.Instance) error
} }
// InstanceManager is the main data structure for the goroutine that manages // InstanceManager is the main data structure for the goroutine that manages
@ -23,53 +22,21 @@ type DatabaseStorage interface {
type InstanceManager struct { type InstanceManager struct {
mu sync.Mutex mu sync.Mutex
db DatabaseStorage db DatabaseStorage
instances map[string]*instance.Instance instances map[instance.Hostname]*instance.Instance
newInstanceNotifications chan string newInstanceNotifications chan instance.Hostname
tootDestination chan *toot.Toot tootDestination chan *toot.Toot
startup time.Time startup time.Time
hostDiscoveryParallelism int
hostAdderSemaphore chan bool hostAdderSemaphore chan bool
nextDBSave time.Time
} }
// New returns a new InstanceManager for use by the Process // New returns a new InstanceManager for use by the Process
func New(db DatabaseStorage) *InstanceManager { func New() *InstanceManager {
im := new(InstanceManager) i := new(InstanceManager)
im.db = db i.hostDiscoveryParallelism = viper.GetInt("HostDiscoveryParallelism")
im.hostAdderSemaphore = make(chan bool, viper.GetInt("HostDiscoveryParallelism")) i.hostAdderSemaphore = make(chan bool, i.hostDiscoveryParallelism)
im.instances = make(map[string]*instance.Instance) i.instances = make(map[instance.Hostname]*instance.Instance)
im.RestoreFromDB() return i
return im
}
func (im *InstanceManager) RestoreFromDB() {
newil, err := im.db.ListInstances()
if err != nil {
log.Panic().
Err(err).
Msg("cannot get instance list from db")
}
im.lock()
defer im.unlock()
count := 0
for _, x := range newil {
x.SetTootDestination(im.tootDestination)
im.instances[x.Hostname] = x
count = count + 1
}
log.Info().
Int("count", count).
Msg("restored instances from database")
}
func (im *InstanceManager) SaveToDB() {
for _, x := range im.ListInstances() {
err := im.db.SaveInstance(x)
if err != nil {
log.Panic().
Err(err).
Msg("cannot write to db")
}
}
} }
// SetTootDestination provides the instancemanager with a channel to the // SetTootDestination provides the instancemanager with a channel to the
@ -90,7 +57,7 @@ func (im *InstanceManager) unlock() {
// InstanceManager about the channel from the InstanceLocator so that the // InstanceManager about the channel from the InstanceLocator so that the
// InstanceLocator can provide it/us (the InstanceManager) with new // InstanceLocator can provide it/us (the InstanceManager) with new
// instance.Hostnames. We (the manager) deduplicate the list ourselves. // instance.Hostnames. We (the manager) deduplicate the list ourselves.
func (im *InstanceManager) SetInstanceNotificationChannel(via chan string) { func (im *InstanceManager) SetInstanceNotificationChannel(via chan instance.Hostname) {
im.lock() im.lock()
defer im.unlock() defer im.unlock()
im.newInstanceNotifications = via im.newInstanceNotifications = via
@ -98,9 +65,9 @@ func (im *InstanceManager) SetInstanceNotificationChannel(via chan string) {
func (im *InstanceManager) receiveSeedInstanceHostnames() { func (im *InstanceManager) receiveSeedInstanceHostnames() {
for _, x := range seeds.SeedInstances { for _, x := range seeds.SeedInstances {
go func(tmp string) { go func(tmp instance.Hostname) {
im.addInstanceByHostname(tmp) im.addInstanceByHostname(tmp)
}(x) }(instance.Hostname(x))
} }
} }
@ -127,11 +94,6 @@ func (im *InstanceManager) Manage() {
x = time.Now() x = time.Now()
im.logInstanceReport() im.logInstanceReport()
} }
if im.nextDBSave.Before(time.Now()) {
im.nextDBSave = time.Now().Add(time.Second * 60)
im.SaveToDB()
}
} }
} }
@ -151,7 +113,7 @@ func (im *InstanceManager) managerLoop() {
} }
} }
func (im *InstanceManager) hostnameExists(newhn string) bool { func (im *InstanceManager) hostnameExists(newhn instance.Hostname) bool {
im.lock() im.lock()
defer im.unlock() defer im.unlock()
for k := range im.instances { for k := range im.instances {
@ -162,7 +124,7 @@ func (im *InstanceManager) hostnameExists(newhn string) bool {
return false return false
} }
func (im *InstanceManager) addInstanceByHostname(newhn string) { func (im *InstanceManager) addInstanceByHostname(newhn instance.Hostname) {
if im.hostnameExists(newhn) { if im.hostnameExists(newhn) {
// ignore adding new if we already know about it // ignore adding new if we already know about it
return return
@ -190,7 +152,7 @@ func (im *InstanceManager) addInstanceByHostname(newhn string) {
} }
func (im *InstanceManager) receiveNewInstanceHostnames() { func (im *InstanceManager) receiveNewInstanceHostnames() {
var newhn string var newhn instance.Hostname
for { for {
newhn = <-im.newInstanceNotifications newhn = <-im.newInstanceNotifications
// receive them fast out of the channel, let the adding function lock to add // receive them fast out of the channel, let the adding function lock to add
@ -216,6 +178,7 @@ func (im *InstanceManager) logInstanceReport() {
// ListInstances dumps a slice of all Instances the InstanceManager knows // ListInstances dumps a slice of all Instances the InstanceManager knows
// about // about
func (im *InstanceManager) ListInstances() []*instance.Instance { func (im *InstanceManager) ListInstances() []*instance.Instance {
// FIXME make this pull from db
var out []*instance.Instance var out []*instance.Instance
im.lock() im.lock()
defer im.unlock() defer im.unlock()
@ -226,6 +189,7 @@ func (im *InstanceManager) ListInstances() []*instance.Instance {
} }
func (im *InstanceManager) instanceSummaryReport() map[string]uint { func (im *InstanceManager) instanceSummaryReport() map[string]uint {
// FIXME make this pull from db
r := make(map[string]uint) r := make(map[string]uint)
for _, v := range im.ListInstances() { for _, v := range im.ListInstances() {
v.Lock() v.Lock()

Voir le fichier

@ -6,6 +6,7 @@ import (
"git.eeqj.de/sneak/feta/database" "git.eeqj.de/sneak/feta/database"
"git.eeqj.de/sneak/feta/ingester" "git.eeqj.de/sneak/feta/ingester"
"git.eeqj.de/sneak/feta/instance"
"git.eeqj.de/sneak/feta/locator" "git.eeqj.de/sneak/feta/locator"
"git.eeqj.de/sneak/feta/manager" "git.eeqj.de/sneak/feta/manager"
"git.eeqj.de/sneak/feta/storage" "git.eeqj.de/sneak/feta/storage"
@ -52,8 +53,6 @@ func (f *Feta) configure() {
viper.AutomaticEnv() viper.AutomaticEnv()
viper.SetDefault("Debug", false) viper.SetDefault("Debug", false)
viper.SetDefault("TootsToDisk", false)
viper.SetDefault("TootsToDB", true)
viper.SetDefault("HostDiscoveryParallelism", 5) viper.SetDefault("HostDiscoveryParallelism", 5)
viper.SetDefault("FSStorageLocation", os.ExpandEnv("$HOME/Library/ApplicationSupport/feta/tootarchive.d")) viper.SetDefault("FSStorageLocation", os.ExpandEnv("$HOME/Library/ApplicationSupport/feta/tootarchive.d"))
viper.SetDefault("DBStorageLocation", os.ExpandEnv("$HOME/Library/ApplicationSupport/feta/feta.state.db")) viper.SetDefault("DBStorageLocation", os.ExpandEnv("$HOME/Library/ApplicationSupport/feta/feta.state.db"))
@ -83,6 +82,7 @@ func (f *Feta) identify() {
} }
func (f *Feta) setupDatabase() { func (f *Feta) setupDatabase() {
f.dbm = database.New()
} }
func (f *Feta) setupLogging() { func (f *Feta) setupLogging() {
@ -118,6 +118,20 @@ func (f *Feta) uptime() time.Duration {
return time.Since(f.startup) return time.Since(f.startup)
} }
/*
func (f *Feta) setupDatabase() {
var err error
f.db, err = gorm.Open("sqlite3", "feta.sqlite")
if err != nil {
panic(err)
}
//f.databaseMigrations()
}
*/
func (f *Feta) runForever() int { func (f *Feta) runForever() int {
f.startup = time.Now() f.startup = time.Now()
@ -125,14 +139,10 @@ func (f *Feta) runForever() int {
// FIXME move this channel creation into the manager's constructor // FIXME move this channel creation into the manager's constructor
// and add getters/setters on the manager/locator // and add getters/setters on the manager/locator
newInstanceHostnameNotifications := make(chan string) newInstanceHostnameNotifications := make(chan instance.Hostname)
f.dbm = database.New()
f.locator = locator.New() f.locator = locator.New()
f.manager = manager.New()
f.manager = manager.New(f.dbm)
f.ingester = ingester.NewTootIngester() f.ingester = ingester.NewTootIngester()
home := os.Getenv("HOME") home := os.Getenv("HOME")
@ -140,14 +150,8 @@ func (f *Feta) runForever() int {
panic("can't find home directory") panic("can't find home directory")
} }
if viper.GetBool("TootsToDB") { diskBackend := storage.NewTootFSStorage(home + "/.local/feta")
f.ingester.SetStorageBackend(f.dbm)
} else if viper.GetBool("TootsToDisk") {
diskBackend := storage.NewTootFSStorage(viper.GetString("FSStorageLocation"))
f.ingester.SetStorageBackend(diskBackend) f.ingester.SetStorageBackend(diskBackend)
} else {
log.Info().Msg("toots will not be saved to disk")
}
f.api = new(Server) f.api = new(Server)
f.api.SetFeta(f) // api needs to get to us to access data f.api.SetFeta(f) // api needs to get to us to access data

Voir le fichier

@ -1,13 +0,0 @@
package storage
import (
"git.eeqj.de/sneak/feta/toot"
)
// TootStorageBackend is the interface to which storage backends must
// conform for storing toots
type TootStorageBackend interface {
TootExists(t *toot.Toot) bool
StoreToot(t *toot.Toot) error
StoreToots(tc []*toot.Toot) error
}

Voir le fichier

@ -11,6 +11,18 @@ import (
"git.eeqj.de/sneak/feta/toot" "git.eeqj.de/sneak/feta/toot"
) )
// TootStorageBackend is the interface to which storage backends must
// conform for storing toots
type TootStorageBackend interface {
TootExists(t toot.Toot) bool
StoreToot(t toot.Toot) error
StoreToots(tc []*toot.Toot) error
}
type TootDBStorage struct {
db string
}
// TootFSStorage is a TootStorageBackend that writes to the local // TootFSStorage is a TootStorageBackend that writes to the local
// filesystem. // filesystem.
type TootFSStorage struct { type TootFSStorage struct {
@ -29,7 +41,7 @@ func NewTootFSStorage(root string) *TootFSStorage {
func (ts *TootFSStorage) StoreToots(tc []*toot.Toot) error { func (ts *TootFSStorage) StoreToots(tc []*toot.Toot) error {
var returnErrors []string var returnErrors []string
for _, item := range tc { for _, item := range tc {
err := ts.StoreToot(item) err := ts.StoreToot(*item)
if err != nil { if err != nil {
returnErrors = append(returnErrors, err.Error()) returnErrors = append(returnErrors, err.Error())
continue continue
@ -44,7 +56,7 @@ func (ts *TootFSStorage) StoreToots(tc []*toot.Toot) error {
// TootExists checks to see if we have already written a toot to disk or // TootExists checks to see if we have already written a toot to disk or
// not. Note that the ingester de-dupes with a table in memory so that this // not. Note that the ingester de-dupes with a table in memory so that this
// will only really get used on app restarts // will only really get used on app restarts
func (ts *TootFSStorage) TootExists(t *toot.Toot) bool { func (ts *TootFSStorage) TootExists(t toot.Toot) bool {
path := t.DiskStoragePath() path := t.DiskStoragePath()
full := ts.root + "/" + path full := ts.root + "/" + path
_, err := os.Stat(full) _, err := os.Stat(full)
@ -55,7 +67,7 @@ func (ts *TootFSStorage) TootExists(t *toot.Toot) bool {
} }
// StoreToot writes a single toot to disk // StoreToot writes a single toot to disk
func (ts *TootFSStorage) StoreToot(t *toot.Toot) error { func (ts *TootFSStorage) StoreToot(t toot.Toot) error {
path := t.DiskStoragePath() path := t.DiskStoragePath()
full := ts.root + "/" + path full := ts.root + "/" + path
dir := filepath.Dir(full) dir := filepath.Dir(full)
@ -70,7 +82,7 @@ func (ts *TootFSStorage) StoreToot(t *toot.Toot) error {
// toots in ram forever until the computer fills up and catches fire and explodes // toots in ram forever until the computer fills up and catches fire and explodes
type TootMemoryStorage struct { type TootMemoryStorage struct {
sync.Mutex sync.Mutex
toots map[string]*toot.Toot toots map[toot.Hash]toot.Toot
//maxSize uint // FIXME support eviction //maxSize uint // FIXME support eviction
} }
@ -78,12 +90,12 @@ type TootMemoryStorage struct {
// ram forever // ram forever
func NewTootMemoryStorage() *TootMemoryStorage { func NewTootMemoryStorage() *TootMemoryStorage {
ts := new(TootMemoryStorage) ts := new(TootMemoryStorage)
ts.toots = make(map[string]*toot.Toot) ts.toots = make(map[toot.Hash]toot.Toot)
return ts return ts
} }
// StoreToot saves a single toot into an in-memory hashtable // StoreToot saves a single toot into an in-memory hashtable
func (ts *TootMemoryStorage) StoreToot(t *toot.Toot) { func (ts *TootMemoryStorage) StoreToot(t toot.Toot) {
if ts.TootExists(t) { if ts.TootExists(t) {
return return
} }
@ -94,7 +106,7 @@ func (ts *TootMemoryStorage) StoreToot(t *toot.Toot) {
} }
// TootExists checks to see if we have a toot in memory already // TootExists checks to see if we have a toot in memory already
func (ts *TootMemoryStorage) TootExists(t *toot.Toot) bool { func (ts *TootMemoryStorage) TootExists(t toot.Toot) bool {
ts.Lock() ts.Lock()
defer ts.Unlock() defer ts.Unlock()
if _, ok := ts.toots[t.Hash]; ok { //this syntax is so gross if _, ok := ts.toots[t.Hash]; ok { //this syntax is so gross

Voir le fichier

@ -1,30 +1,27 @@
package toot package toot
import ( import "fmt"
"encoding/json" import "encoding/json"
"errors" import "errors"
"fmt" import "strings"
"strings" import "git.eeqj.de/sneak/feta/jsonapis"
"git.eeqj.de/sneak/feta/jsonapis" //import "github.com/davecgh/go-spew/spew"
"github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
//import "github.com/davecgh/go-spew/spew" //import "encoding/hex"
import mh "github.com/multiformats/go-multihash"
//import "encoding/hex" import mhopts "github.com/multiformats/go-multihash/opts"
mh "github.com/multiformats/go-multihash"
mhopts "github.com/multiformats/go-multihash/opts"
)
// Hash is a type for storing a string-based base58 multihash of a // Hash is a type for storing a string-based base58 multihash of a
// toot's identity // toot's identity
type Hash string
// Toot is an object we use internally for storing a discovered toot // Toot is an object we use internally for storing a discovered toot
type Toot struct { type Toot struct {
Original []byte Original []byte
Parsed *jsonapis.APISerializedToot Parsed *jsonapis.APISerializedToot
Hash string Hash Hash
FromHost string FromHost string
} }
@ -114,14 +111,7 @@ func (t *Toot) identityHashInput() string {
) )
} }
func (t *Toot) GetHash() string {
if t.Hash == "" {
t.calcHash()
}
return t.Hash
}
func (t *Toot) calcHash() { func (t *Toot) calcHash() {
hi := t.identityHashInput() hi := t.identityHashInput()
t.Hash = string(t.multiHash([]byte(hi))) t.Hash = Hash(t.multiHash([]byte(hi)))
} }