Compare commits
	
		
			2 Commits
		
	
	
		
			84b19fb14e
			...
			2ecd833726
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 2ecd833726 | |||
| b3f672b84a | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @ -2,3 +2,4 @@ feta | ||||
| output/ | ||||
| feta.sqlite | ||||
| .lintsetup | ||||
| out | ||||
|  | ||||
							
								
								
									
										13
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								README.md
									
									
									
									
									
								
							| @ -2,9 +2,20 @@ | ||||
| 
 | ||||
| archives the fediverse | ||||
| 
 | ||||
| # todo | ||||
| 
 | ||||
| * scan toots for mentions and feed to locator | ||||
| * put toots in a separate db file | ||||
| * test with a real database | ||||
| * save instances to store more often | ||||
| * verify instances load properly on startup | ||||
| * do some simple in-memory dedupe for toot storage | ||||
| * make some templates using pongo2 and a simple website | ||||
| * update APIs | ||||
| 
 | ||||
| # status | ||||
| 
 | ||||
| [](https://circleci.com/gh/sneak/feta) | ||||
| [](https://drone.datavi.be/sneak/feta) | ||||
| 
 | ||||
| # ethics statement | ||||
| 
 | ||||
|  | ||||
| @ -1,41 +0,0 @@ | ||||
| package database | ||||
| 
 | ||||
| import ( | ||||
| 	"time" | ||||
| 
 | ||||
| 	"git.eeqj.de/sneak/feta/instance" | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/jinzhu/gorm" | ||||
| 
 | ||||
| 	_ "github.com/jinzhu/gorm/dialects/sqlite" | ||||
| ) | ||||
| 
 | ||||
| // NB that when you add a model below you must add it to this list!
 | ||||
| 
 | ||||
| func (m *Manager) doMigrations() { | ||||
| 	m.db.AutoMigrate(&apinstance{}) | ||||
| } | ||||
| 
 | ||||
| type apinstance struct { | ||||
| 	gorm.Model | ||||
| 	ID                         uuid.UUID `gorm:"type:uuid;primary_key;"` | ||||
| 	ErrorCount                 uint | ||||
| 	SuccessCount               uint | ||||
| 	HighestID                  int | ||||
| 	Hostname                   string | ||||
| 	Identified                 bool | ||||
| 	Fetching                   bool | ||||
| 	Disabled                   bool | ||||
| 	Implementation             string | ||||
| 	NextFetch                  time.Time | ||||
| 	NodeInfoURL                string | ||||
| 	ServerVersionString        string | ||||
| 	ServerImplementationString string | ||||
| } | ||||
| 
 | ||||
| func (m *Manager) ListInstances() ([]*instance.Instance, error) { | ||||
| 	output := make([]*instance.Instance, 0) | ||||
| 	// FIXME have this produce a list of Instance
 | ||||
| 
 | ||||
| 	return output, nil | ||||
| } | ||||
							
								
								
									
										92
									
								
								database/imconnector.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										92
									
								
								database/imconnector.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,92 @@ | ||||
| package database | ||||
| 
 | ||||
| import ( | ||||
| 	"git.eeqj.de/sneak/feta/instance" | ||||
| 	"github.com/rs/zerolog/log" | ||||
| 
 | ||||
| 	_ "github.com/jinzhu/gorm/dialects/sqlite" | ||||
| ) | ||||
| 
 | ||||
| func (m *Manager) SaveInstance(i *instance.Instance) error { | ||||
| 	i.Lock() | ||||
| 	defer i.Unlock() | ||||
| 	var x APInstance | ||||
| 	if m.db.Where("UUID = ?", i.UUID).First(&x).RecordNotFound() { | ||||
| 		log.Info(). | ||||
| 			Str("hostname", i.Hostname). | ||||
| 			Msg("instance not in db, inserting") | ||||
| 		// item does not exist in db yet, must insert
 | ||||
| 		ni := APInstance{ | ||||
| 			UUID:                       i.UUID, | ||||
| 			Disabled:                   i.Disabled, | ||||
| 			ErrorCount:                 i.ErrorCount, | ||||
| 			FSMState:                   i.Status(), | ||||
| 			Fetching:                   i.Fetching, | ||||
| 			HighestID:                  i.HighestID, | ||||
| 			Hostname:                   i.Hostname, | ||||
| 			Identified:                 i.Identified, | ||||
| 			Implementation:             i.Implementation, | ||||
| 			NextFetch:                  i.NextFetch, | ||||
| 			NodeInfoURL:                i.NodeInfoURL, | ||||
| 			ServerImplementationString: i.ServerImplementationString, | ||||
| 			ServerVersionString:        i.ServerVersionString, | ||||
| 			SuccessCount:               i.SuccessCount, | ||||
| 		} | ||||
| 		r := m.db.Create(&ni) | ||||
| 		return r.Error | ||||
| 	} else { | ||||
| 		log.Info(). | ||||
| 			Str("hostname", i.Hostname). | ||||
| 			Str("id", i.UUID.String()). | ||||
| 			Msg("instance found in db, updating") | ||||
| 		// exists in db, update db
 | ||||
| 		var ei APInstance | ||||
| 		// EI EI uh-oh
 | ||||
| 		m.db.Where("UUID = ?", i.UUID).First(&ei) | ||||
| 		ei.Disabled = i.Disabled | ||||
| 		ei.ErrorCount = i.ErrorCount | ||||
| 		ei.FSMState = i.Status() | ||||
| 		ei.Fetching = i.Fetching | ||||
| 		ei.HighestID = i.HighestID | ||||
| 		ei.Hostname = i.Hostname | ||||
| 		ei.Identified = i.Identified | ||||
| 		ei.Implementation = string(i.Implementation) | ||||
| 		ei.NextFetch = i.NextFetch | ||||
| 		ei.NodeInfoURL = i.NodeInfoURL | ||||
| 		ei.ServerImplementationString = i.ServerImplementationString | ||||
| 		ei.ServerVersionString = i.ServerVersionString | ||||
| 		ei.SuccessCount = i.SuccessCount | ||||
| 
 | ||||
| 		r := m.db.Save(&ei) | ||||
| 		return r.Error | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (m *Manager) ListInstances() ([]*instance.Instance, error) { | ||||
| 	output := make([]*instance.Instance, 0) | ||||
| 
 | ||||
| 	var results []APInstance | ||||
| 	m.db.Find(&results) | ||||
| 
 | ||||
| 	for _, i := range results { | ||||
| 		newinst := instance.New(func(x *instance.Instance) { | ||||
| 			x.UUID = i.UUID | ||||
| 			x.Disabled = i.Disabled | ||||
| 			x.ErrorCount = i.ErrorCount | ||||
| 			x.InitialFSMState = i.FSMState | ||||
| 			x.Fetching = i.Fetching | ||||
| 			x.HighestID = i.HighestID | ||||
| 			x.Hostname = i.Hostname | ||||
| 			x.Identified = i.Identified | ||||
| 			x.Implementation = i.Implementation | ||||
| 			x.NextFetch = i.NextFetch | ||||
| 			x.NodeInfoURL = i.NodeInfoURL | ||||
| 			x.ServerImplementationString = i.ServerImplementationString | ||||
| 			x.ServerVersionString = i.ServerVersionString | ||||
| 			x.SuccessCount = i.SuccessCount | ||||
| 		}) | ||||
| 		output = append(output, newinst) | ||||
| 	} | ||||
| 
 | ||||
| 	return output, nil | ||||
| } | ||||
| @ -23,11 +23,11 @@ func New() *Manager { | ||||
| } | ||||
| 
 | ||||
| func (m *Manager) init() { | ||||
| 	m.open() | ||||
| 	m.db.LogMode(false) | ||||
| 	if viper.GetBool("Debug") { | ||||
| 		m.db.LogMode(true) | ||||
| 	} | ||||
| 	m.open() | ||||
| } | ||||
| 
 | ||||
| func mkdirp(p string) error { | ||||
|  | ||||
							
								
								
									
										49
									
								
								database/model.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										49
									
								
								database/model.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,49 @@ | ||||
| package database | ||||
| 
 | ||||
| import ( | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/jinzhu/gorm" | ||||
| 	"github.com/rs/zerolog/log" | ||||
| 
 | ||||
| 	_ "github.com/jinzhu/gorm/dialects/sqlite" | ||||
| ) | ||||
| 
 | ||||
| type StoredToot struct { | ||||
| 	gorm.Model | ||||
| 	UUID uuid.UUID `gorm:"type:uuid;primary_key;"` | ||||
| 	//Original      string    `sql:"type:text"`
 | ||||
| 	Original      []byte | ||||
| 	Hash          string `gorm:"unique_index"` | ||||
| 	ServerCreated time.Time | ||||
| 	Acct          string | ||||
| 	Content       []byte | ||||
| 	URL           string | ||||
| 	Hostname      string | ||||
| } | ||||
| 
 | ||||
| type APInstance struct { | ||||
| 	gorm.Model | ||||
| 	UUID                       uuid.UUID `gorm:"type:uuid;primary_key;"` | ||||
| 	ErrorCount                 uint | ||||
| 	SuccessCount               uint | ||||
| 	HighestID                  uint | ||||
| 	Hostname                   string `gorm:"type:varchar(100);unique_index"` | ||||
| 	Identified                 bool | ||||
| 	Fetching                   bool | ||||
| 	Disabled                   bool | ||||
| 	Implementation             string | ||||
| 	NextFetch                  time.Time | ||||
| 	NodeInfoURL                string | ||||
| 	ServerVersionString        string | ||||
| 	ServerImplementationString string | ||||
| 	FSMState                   string | ||||
| } | ||||
| 
 | ||||
| // NB that when you add a model below you must add it to this list!
 | ||||
| func (m *Manager) doMigrations() { | ||||
| 	log.Info().Msg("doing database migrations if required") | ||||
| 	m.db.AutoMigrate(&APInstance{}) | ||||
| 	m.db.AutoMigrate(&StoredToot{}) | ||||
| } | ||||
							
								
								
									
										47
									
								
								database/storageconnector.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								database/storageconnector.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,47 @@ | ||||
| package database | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
| 
 | ||||
| 	"git.eeqj.de/sneak/feta/toot" | ||||
| 
 | ||||
| 	"github.com/google/uuid" | ||||
| 	_ "github.com/jinzhu/gorm/dialects/sqlite" | ||||
| ) | ||||
| 
 | ||||
| func (m *Manager) TootExists(t *toot.Toot) bool { | ||||
| 	var try StoredToot | ||||
| 	if m.db.Where("Hash = ?", t.GetHash()).First(&try).RecordNotFound() { | ||||
| 		return false | ||||
| 	} else { | ||||
| 		return true | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (m *Manager) StoreToot(t *toot.Toot) error { | ||||
| 
 | ||||
| 	nt := new(StoredToot) | ||||
| 	nt.UUID = uuid.New() | ||||
| 	nt.ServerCreated = t.Parsed.CreatedAt | ||||
| 	nt.Original = t.Original | ||||
| 	// FIXME normalize this, check for @ and append hostname if none
 | ||||
| 	nt.Acct = fmt.Sprintf("%s@%s", t.Parsed.Account.Acct, strings.ToLower(t.FromHost)) | ||||
| 	nt.URL = t.Parsed.URL | ||||
| 	nt.Content = t.Parsed.Content | ||||
| 	nt.Hostname = strings.ToLower(t.FromHost) | ||||
| 	nt.Hash = t.GetHash() | ||||
| 	r := m.db.Create(&nt) | ||||
| 	//panic(fmt.Sprintf("%+v", t))
 | ||||
| 	return r.Error | ||||
| } | ||||
| 
 | ||||
| func (m *Manager) StoreToots(tc []*toot.Toot) error { | ||||
| 	for _, item := range tc { | ||||
| 		err := m.StoreToot(item) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| @ -1,9 +1,12 @@ | ||||
| package ingester | ||||
| 
 | ||||
| import "time" | ||||
| import "github.com/rs/zerolog/log" | ||||
| import "git.eeqj.de/sneak/feta/toot" | ||||
| import "git.eeqj.de/sneak/feta/storage" | ||||
| import ( | ||||
| 	"time" | ||||
| 
 | ||||
| 	"git.eeqj.de/sneak/feta/storage" | ||||
| 	"git.eeqj.de/sneak/feta/toot" | ||||
| 	"github.com/rs/zerolog/log" | ||||
| ) | ||||
| 
 | ||||
| // TootIngester is the data structure for the ingester process that is
 | ||||
| // responsible for storing the discovered toots
 | ||||
| @ -15,7 +18,7 @@ type TootIngester struct { | ||||
| 
 | ||||
| type seenTootMemo struct { | ||||
| 	lastSeen time.Time | ||||
| 	tootHash toot.Hash | ||||
| 	tootHash string | ||||
| } | ||||
| 
 | ||||
| // NewTootIngester returns a fresh TootIngester for your use
 | ||||
| @ -55,5 +58,5 @@ func (ti *TootIngester) storeToot(t *toot.Toot) { | ||||
| 	if ti.storageBackend == nil { | ||||
| 		panic("no storage backend") | ||||
| 	} | ||||
| 	ti.storageBackend.StoreToot(*t) | ||||
| 	ti.storageBackend.StoreToot(t) | ||||
| } | ||||
|  | ||||
| @ -11,7 +11,6 @@ import ( | ||||
| 	"time" | ||||
| 
 | ||||
| 	"git.eeqj.de/sneak/feta/jsonapis" | ||||
| 	"git.eeqj.de/sneak/feta/storage" | ||||
| 	"git.eeqj.de/sneak/feta/toot" | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/looplab/fsm" | ||||
| @ -26,49 +25,43 @@ const instanceHTTPTimeout = time.Second * 120 | ||||
| const instanceSpiderInterval = time.Second * 120 | ||||
| const instanceErrorInterval = time.Second * 60 * 30 | ||||
| 
 | ||||
| type instanceImplementation int | ||||
| 
 | ||||
| // Hostname is a special type for holding the hostname of an
 | ||||
| // instance (string)
 | ||||
| type Hostname string | ||||
| 
 | ||||
| const ( | ||||
| 	implUnknown instanceImplementation = iota | ||||
| 	implMastodon | ||||
| 	implPleroma | ||||
| ) | ||||
| 
 | ||||
| // Instance stores all the information we know about an instance
 | ||||
| type Instance struct { | ||||
| 	Identifier                 uuid.UUID | ||||
| 	structLock                 sync.Mutex | ||||
| 	tootDestination            chan *toot.Toot | ||||
| 	Disabled                   bool | ||||
| 	ErrorCount                 uint | ||||
| 	SuccessCount               uint | ||||
| 	highestID                  int | ||||
| 	FSM                        *fsm.FSM | ||||
| 	Fetching                   bool | ||||
| 	HighestID                  uint | ||||
| 	Hostname                   string | ||||
| 	Identified                 bool | ||||
| 	fetching                   bool | ||||
| 	disabled                   bool | ||||
| 	implementation             instanceImplementation | ||||
| 	storageBackend             *storage.TootStorageBackend | ||||
| 	Implementation             string | ||||
| 	InitialFSMState            string | ||||
| 	NextFetch                  time.Time | ||||
| 	nodeInfoURL                string | ||||
| 	ServerVersionString        string | ||||
| 	NodeInfoURL                string | ||||
| 	ServerImplementationString string | ||||
| 	ServerVersionString        string | ||||
| 	SuccessCount               uint | ||||
| 	UUID                       uuid.UUID | ||||
| 	fetchingLock               sync.Mutex | ||||
| 	fsm                        *fsm.FSM | ||||
| 	fsmLock                    sync.Mutex | ||||
| 	structLock                 sync.Mutex | ||||
| 	tootDestination            chan *toot.Toot | ||||
| } | ||||
| 
 | ||||
| // New returns a new instance, argument is a function that operates on the
 | ||||
| // new instance
 | ||||
| func New(options ...func(i *Instance)) *Instance { | ||||
| 	i := new(Instance) | ||||
| 	i.UUID = uuid.New() | ||||
| 	i.setNextFetchAfter(1 * time.Second) | ||||
| 	i.InitialFSMState = "STATUS_UNKNOWN" | ||||
| 
 | ||||
| 	i.fsm = fsm.NewFSM( | ||||
| 		"STATUS_UNKNOWN", | ||||
| 	for _, opt := range options { | ||||
| 		opt(i) | ||||
| 	} | ||||
| 
 | ||||
| 	i.FSM = fsm.NewFSM( | ||||
| 		i.InitialFSMState, | ||||
| 		fsm.Events{ | ||||
| 			{Name: "BEGIN_NODEINFO_URL_FETCH", Src: []string{"STATUS_UNKNOWN"}, Dst: "FETCHING_NODEINFO_URL"}, | ||||
| 			{Name: "GOT_NODEINFO_URL", Src: []string{"FETCHING_NODEINFO_URL"}, Dst: "PRE_NODEINFO_FETCH"}, | ||||
| @ -85,10 +78,6 @@ func New(options ...func(i *Instance)) *Instance { | ||||
| 			"enter_state": func(e *fsm.Event) { i.fsmEnterState(e) }, | ||||
| 		}, | ||||
| 	) | ||||
| 
 | ||||
| 	for _, opt := range options { | ||||
| 		opt(i) | ||||
| 	} | ||||
| 	return i | ||||
| } | ||||
| 
 | ||||
| @ -96,7 +85,7 @@ func New(options ...func(i *Instance)) *Instance { | ||||
| func (i *Instance) Status() string { | ||||
| 	i.fsmLock.Lock() | ||||
| 	defer i.fsmLock.Unlock() | ||||
| 	return i.fsm.Current() | ||||
| 	return i.FSM.Current() | ||||
| } | ||||
| 
 | ||||
| // SetTootDestination takes a channel from the manager that all toots
 | ||||
| @ -111,7 +100,7 @@ func (i *Instance) SetTootDestination(d chan *toot.Toot) { | ||||
| func (i *Instance) Event(eventname string) { | ||||
| 	i.fsmLock.Lock() | ||||
| 	defer i.fsmLock.Unlock() | ||||
| 	i.fsm.Event(eventname) | ||||
| 	i.FSM.Event(eventname) | ||||
| } | ||||
| 
 | ||||
| func (i *Instance) fsmEnterState(e *fsm.Event) { | ||||
| @ -198,7 +187,7 @@ func (i *Instance) Tick() { | ||||
| func (i *Instance) nodeIdentified() bool { | ||||
| 	i.Lock() | ||||
| 	defer i.Unlock() | ||||
| 	if i.implementation > implUnknown { | ||||
| 	if i.Implementation != "" { | ||||
| 		return true | ||||
| 	} | ||||
| 	return false | ||||
| @ -288,7 +277,7 @@ func (i *Instance) fetchNodeInfoURL() error { | ||||
| 				Msg("success fetching url for nodeinfo") | ||||
| 
 | ||||
| 			i.Lock() | ||||
| 			i.nodeInfoURL = item.Href | ||||
| 			i.NodeInfoURL = item.Href | ||||
| 			i.Unlock() | ||||
| 			i.registerSuccess() | ||||
| 			i.Event("GOT_NODEINFO_URL") | ||||
| @ -323,7 +312,7 @@ func (i *Instance) fetchNodeInfo() error { | ||||
| 	//FIXME make sure the nodeinfourl is on the same domain as the instance
 | ||||
| 	//hostname
 | ||||
| 	i.Lock() | ||||
| 	url := i.nodeInfoURL | ||||
| 	url := i.NodeInfoURL | ||||
| 	i.Unlock() | ||||
| 
 | ||||
| 	i.Event("BEGIN_NODEINFO_FETCH") | ||||
| @ -368,7 +357,7 @@ func (i *Instance) fetchNodeInfo() error { | ||||
| 		Str("serverVersion", ni.Software.Version). | ||||
| 		Str("software", ni.Software.Name). | ||||
| 		Str("hostname", i.Hostname). | ||||
| 		Str("nodeInfoURL", i.nodeInfoURL). | ||||
| 		Str("nodeInfoURL", i.NodeInfoURL). | ||||
| 		Msg("received nodeinfo from instance") | ||||
| 
 | ||||
| 	i.Lock() | ||||
| @ -382,7 +371,7 @@ func (i *Instance) fetchNodeInfo() error { | ||||
| 			Str("software", ni.Software.Name). | ||||
| 			Msg("detected server software") | ||||
| 		i.Identified = true | ||||
| 		i.implementation = implPleroma | ||||
| 		i.Implementation = "pleroma" | ||||
| 		i.Unlock() | ||||
| 		i.registerSuccess() | ||||
| 		i.Event("GOT_NODEINFO") | ||||
| @ -393,7 +382,7 @@ func (i *Instance) fetchNodeInfo() error { | ||||
| 			Str("software", ni.Software.Name). | ||||
| 			Msg("detected server software") | ||||
| 		i.Identified = true | ||||
| 		i.implementation = implMastodon | ||||
| 		i.Implementation = "mastodon" | ||||
| 		i.Unlock() | ||||
| 		i.registerSuccess() | ||||
| 		i.Event("GOT_NODEINFO") | ||||
|  | ||||
| @ -7,15 +7,12 @@ import ( | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"git.eeqj.de/sneak/feta/instance" | ||||
| 	"git.eeqj.de/sneak/feta/jsonapis" | ||||
| 	"github.com/rs/zerolog/log" | ||||
| 	"github.com/spf13/viper" | ||||
| 	"golang.org/x/sync/semaphore" | ||||
| ) | ||||
| 
 | ||||
| //import "git.eeqj.de/sneak/feta"
 | ||||
| 
 | ||||
| // IndexAPITimeout is the timeout for fetching json instance lists
 | ||||
| // from the listing servers
 | ||||
| const IndexAPITimeout = time.Second * 60 * 3 | ||||
| @ -39,7 +36,7 @@ const pleromaIndexURL = "https://distsn.org/cgi-bin/distsn-pleroma-instances-api | ||||
| type InstanceLocator struct { | ||||
| 	pleromaIndexNextRefresh  *time.Time | ||||
| 	mastodonIndexNextRefresh *time.Time | ||||
| 	reportInstanceVia        chan instance.Hostname | ||||
| 	reportInstanceVia        chan string | ||||
| 	mu                       sync.Mutex | ||||
| } | ||||
| 
 | ||||
| @ -62,13 +59,13 @@ func (il *InstanceLocator) unlock() { | ||||
| 
 | ||||
| // SetInstanceNotificationChannel is the way the instanceLocator returns
 | ||||
| // newly discovered instances back to the manager for query/addition
 | ||||
| func (il *InstanceLocator) SetInstanceNotificationChannel(via chan instance.Hostname) { | ||||
| func (il *InstanceLocator) SetInstanceNotificationChannel(via chan string) { | ||||
| 	il.lock() | ||||
| 	defer il.unlock() | ||||
| 	il.reportInstanceVia = via | ||||
| } | ||||
| 
 | ||||
| func (il *InstanceLocator) addInstance(hostname instance.Hostname) { | ||||
| func (il *InstanceLocator) addInstance(hostname string) { | ||||
| 	// receiver (InstanceManager) is responsible for de-duping against its
 | ||||
| 	// map, we just locate and spray, it manages
 | ||||
| 	il.reportInstanceVia <- hostname | ||||
| @ -201,7 +198,7 @@ func (il *InstanceLocator) locateMastodon() { | ||||
| 		Msg("received hosts from mastodon index") | ||||
| 
 | ||||
| 	for k := range hosts { | ||||
| 		il.addInstance(instance.Hostname(k)) | ||||
| 		il.addInstance(k) | ||||
| 	} | ||||
| 
 | ||||
| } | ||||
| @ -269,7 +266,7 @@ func (il *InstanceLocator) locatePleroma() { | ||||
| 		Msg("received hosts from pleroma index") | ||||
| 
 | ||||
| 	for k := range hosts { | ||||
| 		il.addInstance(instance.Hostname(k)) | ||||
| 		il.addInstance(k) | ||||
| 	} | ||||
| 
 | ||||
| } | ||||
|  | ||||
| @ -14,7 +14,8 @@ import ( | ||||
| // conform for storing toots
 | ||||
| type DatabaseStorage interface { | ||||
| 	ListInstances() ([]*instance.Instance, error) | ||||
| 	StoreInstances([]*instance.Instance) error | ||||
| 	//StoreInstances([]*instance.Instance) error
 | ||||
| 	SaveInstance(*instance.Instance) error | ||||
| } | ||||
| 
 | ||||
| // InstanceManager is the main data structure for the goroutine that manages
 | ||||
| @ -22,21 +23,53 @@ type DatabaseStorage interface { | ||||
| type InstanceManager struct { | ||||
| 	mu                       sync.Mutex | ||||
| 	db                       DatabaseStorage | ||||
| 	instances                map[instance.Hostname]*instance.Instance | ||||
| 	newInstanceNotifications chan instance.Hostname | ||||
| 	instances                map[string]*instance.Instance | ||||
| 	newInstanceNotifications chan string | ||||
| 	tootDestination          chan *toot.Toot | ||||
| 	startup                  time.Time | ||||
| 	hostDiscoveryParallelism int | ||||
| 	hostAdderSemaphore       chan bool | ||||
| 	nextDBSave               time.Time | ||||
| } | ||||
| 
 | ||||
| // New returns a new InstanceManager for use by the Process
 | ||||
| func New() *InstanceManager { | ||||
| 	i := new(InstanceManager) | ||||
| 	i.hostDiscoveryParallelism = viper.GetInt("HostDiscoveryParallelism") | ||||
| 	i.hostAdderSemaphore = make(chan bool, i.hostDiscoveryParallelism) | ||||
| 	i.instances = make(map[instance.Hostname]*instance.Instance) | ||||
| 	return i | ||||
| func New(db DatabaseStorage) *InstanceManager { | ||||
| 	im := new(InstanceManager) | ||||
| 	im.db = db | ||||
| 	im.hostAdderSemaphore = make(chan bool, viper.GetInt("HostDiscoveryParallelism")) | ||||
| 	im.instances = make(map[string]*instance.Instance) | ||||
| 	im.RestoreFromDB() | ||||
| 	return im | ||||
| } | ||||
| 
 | ||||
| func (im *InstanceManager) RestoreFromDB() { | ||||
| 	newil, err := im.db.ListInstances() | ||||
| 	if err != nil { | ||||
| 		log.Panic(). | ||||
| 			Err(err). | ||||
| 			Msg("cannot get instance list from db") | ||||
| 	} | ||||
| 	im.lock() | ||||
| 	defer im.unlock() | ||||
| 	count := 0 | ||||
| 	for _, x := range newil { | ||||
| 		x.SetTootDestination(im.tootDestination) | ||||
| 		im.instances[x.Hostname] = x | ||||
| 		count = count + 1 | ||||
| 	} | ||||
| 	log.Info(). | ||||
| 		Int("count", count). | ||||
| 		Msg("restored instances from database") | ||||
| } | ||||
| 
 | ||||
| func (im *InstanceManager) SaveToDB() { | ||||
| 	for _, x := range im.ListInstances() { | ||||
| 		err := im.db.SaveInstance(x) | ||||
| 		if err != nil { | ||||
| 			log.Panic(). | ||||
| 				Err(err). | ||||
| 				Msg("cannot write to db") | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // SetTootDestination provides the instancemanager with a channel to the
 | ||||
| @ -57,7 +90,7 @@ func (im *InstanceManager) unlock() { | ||||
| // InstanceManager about the channel from the InstanceLocator so that the
 | ||||
| // InstanceLocator can provide it/us (the InstanceManager) with new
 | ||||
| // instance.Hostnames.  We (the manager) deduplicate the list ourselves.
 | ||||
| func (im *InstanceManager) SetInstanceNotificationChannel(via chan instance.Hostname) { | ||||
| func (im *InstanceManager) SetInstanceNotificationChannel(via chan string) { | ||||
| 	im.lock() | ||||
| 	defer im.unlock() | ||||
| 	im.newInstanceNotifications = via | ||||
| @ -65,9 +98,9 @@ func (im *InstanceManager) SetInstanceNotificationChannel(via chan instance.Host | ||||
| 
 | ||||
| func (im *InstanceManager) receiveSeedInstanceHostnames() { | ||||
| 	for _, x := range seeds.SeedInstances { | ||||
| 		go func(tmp instance.Hostname) { | ||||
| 		go func(tmp string) { | ||||
| 			im.addInstanceByHostname(tmp) | ||||
| 		}(instance.Hostname(x)) | ||||
| 		}(x) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| @ -94,6 +127,11 @@ func (im *InstanceManager) Manage() { | ||||
| 			x = time.Now() | ||||
| 			im.logInstanceReport() | ||||
| 		} | ||||
| 
 | ||||
| 		if im.nextDBSave.Before(time.Now()) { | ||||
| 			im.nextDBSave = time.Now().Add(time.Second * 60) | ||||
| 			im.SaveToDB() | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| @ -113,7 +151,7 @@ func (im *InstanceManager) managerLoop() { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (im *InstanceManager) hostnameExists(newhn instance.Hostname) bool { | ||||
| func (im *InstanceManager) hostnameExists(newhn string) bool { | ||||
| 	im.lock() | ||||
| 	defer im.unlock() | ||||
| 	for k := range im.instances { | ||||
| @ -124,7 +162,7 @@ func (im *InstanceManager) hostnameExists(newhn instance.Hostname) bool { | ||||
| 	return false | ||||
| } | ||||
| 
 | ||||
| func (im *InstanceManager) addInstanceByHostname(newhn instance.Hostname) { | ||||
| func (im *InstanceManager) addInstanceByHostname(newhn string) { | ||||
| 	if im.hostnameExists(newhn) { | ||||
| 		// ignore adding new if we already know about it
 | ||||
| 		return | ||||
| @ -152,7 +190,7 @@ func (im *InstanceManager) addInstanceByHostname(newhn instance.Hostname) { | ||||
| } | ||||
| 
 | ||||
| func (im *InstanceManager) receiveNewInstanceHostnames() { | ||||
| 	var newhn instance.Hostname | ||||
| 	var newhn string | ||||
| 	for { | ||||
| 		newhn = <-im.newInstanceNotifications | ||||
| 		// receive them fast out of the channel, let the adding function lock to add
 | ||||
| @ -178,7 +216,6 @@ func (im *InstanceManager) logInstanceReport() { | ||||
| // ListInstances dumps a slice of all Instances the InstanceManager knows
 | ||||
| // about
 | ||||
| func (im *InstanceManager) ListInstances() []*instance.Instance { | ||||
| 	// FIXME make this pull from db
 | ||||
| 	var out []*instance.Instance | ||||
| 	im.lock() | ||||
| 	defer im.unlock() | ||||
| @ -189,7 +226,6 @@ func (im *InstanceManager) ListInstances() []*instance.Instance { | ||||
| } | ||||
| 
 | ||||
| func (im *InstanceManager) instanceSummaryReport() map[string]uint { | ||||
| 	// FIXME make this pull from db
 | ||||
| 	r := make(map[string]uint) | ||||
| 	for _, v := range im.ListInstances() { | ||||
| 		v.Lock() | ||||
|  | ||||
| @ -6,7 +6,6 @@ import ( | ||||
| 
 | ||||
| 	"git.eeqj.de/sneak/feta/database" | ||||
| 	"git.eeqj.de/sneak/feta/ingester" | ||||
| 	"git.eeqj.de/sneak/feta/instance" | ||||
| 	"git.eeqj.de/sneak/feta/locator" | ||||
| 	"git.eeqj.de/sneak/feta/manager" | ||||
| 	"git.eeqj.de/sneak/feta/storage" | ||||
| @ -53,6 +52,8 @@ func (f *Feta) configure() { | ||||
| 	viper.AutomaticEnv() | ||||
| 
 | ||||
| 	viper.SetDefault("Debug", false) | ||||
| 	viper.SetDefault("TootsToDisk", false) | ||||
| 	viper.SetDefault("TootsToDB", true) | ||||
| 	viper.SetDefault("HostDiscoveryParallelism", 5) | ||||
| 	viper.SetDefault("FSStorageLocation", os.ExpandEnv("$HOME/Library/ApplicationSupport/feta/tootarchive.d")) | ||||
| 	viper.SetDefault("DBStorageLocation", os.ExpandEnv("$HOME/Library/ApplicationSupport/feta/feta.state.db")) | ||||
| @ -82,7 +83,6 @@ func (f *Feta) identify() { | ||||
| } | ||||
| 
 | ||||
| func (f *Feta) setupDatabase() { | ||||
| 	f.dbm = database.New() | ||||
| } | ||||
| 
 | ||||
| func (f *Feta) setupLogging() { | ||||
| @ -118,20 +118,6 @@ func (f *Feta) uptime() time.Duration { | ||||
| 	return time.Since(f.startup) | ||||
| } | ||||
| 
 | ||||
| /* | ||||
| func (f *Feta) setupDatabase() { | ||||
| 	var err error | ||||
| 	f.db, err = gorm.Open("sqlite3", "feta.sqlite") | ||||
| 
 | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		panic(err) | ||||
| 	} | ||||
| 
 | ||||
| 	//f.databaseMigrations()
 | ||||
| } | ||||
| */ | ||||
| 
 | ||||
| func (f *Feta) runForever() int { | ||||
| 	f.startup = time.Now() | ||||
| 
 | ||||
| @ -139,10 +125,14 @@ func (f *Feta) runForever() int { | ||||
| 
 | ||||
| 	// FIXME move this channel creation into the manager's constructor
 | ||||
| 	// and add getters/setters on the manager/locator
 | ||||
| 	newInstanceHostnameNotifications := make(chan instance.Hostname) | ||||
| 	newInstanceHostnameNotifications := make(chan string) | ||||
| 
 | ||||
| 	f.dbm = database.New() | ||||
| 
 | ||||
| 	f.locator = locator.New() | ||||
| 	f.manager = manager.New() | ||||
| 
 | ||||
| 	f.manager = manager.New(f.dbm) | ||||
| 
 | ||||
| 	f.ingester = ingester.NewTootIngester() | ||||
| 
 | ||||
| 	home := os.Getenv("HOME") | ||||
| @ -150,8 +140,14 @@ func (f *Feta) runForever() int { | ||||
| 		panic("can't find home directory") | ||||
| 	} | ||||
| 
 | ||||
| 	diskBackend := storage.NewTootFSStorage(home + "/.local/feta") | ||||
| 	f.ingester.SetStorageBackend(diskBackend) | ||||
| 	if viper.GetBool("TootsToDB") { | ||||
| 		f.ingester.SetStorageBackend(f.dbm) | ||||
| 	} else if viper.GetBool("TootsToDisk") { | ||||
| 		diskBackend := storage.NewTootFSStorage(viper.GetString("FSStorageLocation")) | ||||
| 		f.ingester.SetStorageBackend(diskBackend) | ||||
| 	} else { | ||||
| 		log.Info().Msg("toots will not be saved to disk") | ||||
| 	} | ||||
| 
 | ||||
| 	f.api = new(Server) | ||||
| 	f.api.SetFeta(f) // api needs to get to us to access data
 | ||||
|  | ||||
| @ -11,18 +11,6 @@ import ( | ||||
| 	"git.eeqj.de/sneak/feta/toot" | ||||
| ) | ||||
| 
 | ||||
| // TootStorageBackend is the interface to which storage backends must
 | ||||
| // conform for storing toots
 | ||||
| type TootStorageBackend interface { | ||||
| 	TootExists(t toot.Toot) bool | ||||
| 	StoreToot(t toot.Toot) error | ||||
| 	StoreToots(tc []*toot.Toot) error | ||||
| } | ||||
| 
 | ||||
| type TootDBStorage struct { | ||||
| 	db string | ||||
| } | ||||
| 
 | ||||
| // TootFSStorage is a TootStorageBackend that writes to the local
 | ||||
| // filesystem.
 | ||||
| type TootFSStorage struct { | ||||
| @ -41,7 +29,7 @@ func NewTootFSStorage(root string) *TootFSStorage { | ||||
| func (ts *TootFSStorage) StoreToots(tc []*toot.Toot) error { | ||||
| 	var returnErrors []string | ||||
| 	for _, item := range tc { | ||||
| 		err := ts.StoreToot(*item) | ||||
| 		err := ts.StoreToot(item) | ||||
| 		if err != nil { | ||||
| 			returnErrors = append(returnErrors, err.Error()) | ||||
| 			continue | ||||
| @ -56,7 +44,7 @@ func (ts *TootFSStorage) StoreToots(tc []*toot.Toot) error { | ||||
| // TootExists checks to see if we have already written a toot to disk or
 | ||||
| // not.  Note that the ingester de-dupes with a table in memory so that this
 | ||||
| // will only really get used on app restarts
 | ||||
| func (ts *TootFSStorage) TootExists(t toot.Toot) bool { | ||||
| func (ts *TootFSStorage) TootExists(t *toot.Toot) bool { | ||||
| 	path := t.DiskStoragePath() | ||||
| 	full := ts.root + "/" + path | ||||
| 	_, err := os.Stat(full) | ||||
| @ -67,7 +55,7 @@ func (ts *TootFSStorage) TootExists(t toot.Toot) bool { | ||||
| } | ||||
| 
 | ||||
| // StoreToot writes a single toot to disk
 | ||||
| func (ts *TootFSStorage) StoreToot(t toot.Toot) error { | ||||
| func (ts *TootFSStorage) StoreToot(t *toot.Toot) error { | ||||
| 	path := t.DiskStoragePath() | ||||
| 	full := ts.root + "/" + path | ||||
| 	dir := filepath.Dir(full) | ||||
| @ -82,7 +70,7 @@ func (ts *TootFSStorage) StoreToot(t toot.Toot) error { | ||||
| // toots in ram forever until the computer fills up and catches fire and explodes
 | ||||
| type TootMemoryStorage struct { | ||||
| 	sync.Mutex | ||||
| 	toots map[toot.Hash]toot.Toot | ||||
| 	toots map[string]*toot.Toot | ||||
| 	//maxSize uint   // FIXME support eviction
 | ||||
| } | ||||
| 
 | ||||
| @ -90,12 +78,12 @@ type TootMemoryStorage struct { | ||||
| // ram forever
 | ||||
| func NewTootMemoryStorage() *TootMemoryStorage { | ||||
| 	ts := new(TootMemoryStorage) | ||||
| 	ts.toots = make(map[toot.Hash]toot.Toot) | ||||
| 	ts.toots = make(map[string]*toot.Toot) | ||||
| 	return ts | ||||
| } | ||||
| 
 | ||||
| // StoreToot saves a single toot into an in-memory hashtable
 | ||||
| func (ts *TootMemoryStorage) StoreToot(t toot.Toot) { | ||||
| func (ts *TootMemoryStorage) StoreToot(t *toot.Toot) { | ||||
| 	if ts.TootExists(t) { | ||||
| 		return | ||||
| 	} | ||||
| @ -106,7 +94,7 @@ func (ts *TootMemoryStorage) StoreToot(t toot.Toot) { | ||||
| } | ||||
| 
 | ||||
| // TootExists checks to see if we have a toot in memory already
 | ||||
| func (ts *TootMemoryStorage) TootExists(t toot.Toot) bool { | ||||
| func (ts *TootMemoryStorage) TootExists(t *toot.Toot) bool { | ||||
| 	ts.Lock() | ||||
| 	defer ts.Unlock() | ||||
| 	if _, ok := ts.toots[t.Hash]; ok { //this syntax is so gross
 | ||||
							
								
								
									
										13
									
								
								storage/interface.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										13
									
								
								storage/interface.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,13 @@ | ||||
| package storage | ||||
| 
 | ||||
| import ( | ||||
| 	"git.eeqj.de/sneak/feta/toot" | ||||
| ) | ||||
| 
 | ||||
| // TootStorageBackend is the interface to which storage backends must
 | ||||
| // conform for storing toots
 | ||||
| type TootStorageBackend interface { | ||||
| 	TootExists(t *toot.Toot) bool | ||||
| 	StoreToot(t *toot.Toot) error | ||||
| 	StoreToots(tc []*toot.Toot) error | ||||
| } | ||||
							
								
								
									
										36
									
								
								toot/toot.go
									
									
									
									
									
								
							
							
						
						
									
										36
									
								
								toot/toot.go
									
									
									
									
									
								
							| @ -1,27 +1,30 @@ | ||||
| package toot | ||||
| 
 | ||||
| import "fmt" | ||||
| import "encoding/json" | ||||
| import "errors" | ||||
| import "strings" | ||||
| import "git.eeqj.de/sneak/feta/jsonapis" | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
| 
 | ||||
| //import "github.com/davecgh/go-spew/spew"
 | ||||
| import "github.com/rs/zerolog/log" | ||||
| 	"git.eeqj.de/sneak/feta/jsonapis" | ||||
| 	"github.com/rs/zerolog/log" | ||||
| 
 | ||||
| //import "encoding/hex"
 | ||||
| import mh "github.com/multiformats/go-multihash" | ||||
| import mhopts "github.com/multiformats/go-multihash/opts" | ||||
| 	//import "github.com/davecgh/go-spew/spew"
 | ||||
| 
 | ||||
| 	//import "encoding/hex"
 | ||||
| 	mh "github.com/multiformats/go-multihash" | ||||
| 
 | ||||
| 	mhopts "github.com/multiformats/go-multihash/opts" | ||||
| ) | ||||
| 
 | ||||
| // Hash is a type for storing a string-based base58 multihash of a
 | ||||
| // toot's identity
 | ||||
| type Hash string | ||||
| 
 | ||||
| // Toot is an object we use internally for storing a discovered toot
 | ||||
| type Toot struct { | ||||
| 	Original []byte | ||||
| 	Parsed   *jsonapis.APISerializedToot | ||||
| 	Hash     Hash | ||||
| 	Hash     string | ||||
| 	FromHost string | ||||
| } | ||||
| 
 | ||||
| @ -111,7 +114,14 @@ func (t *Toot) identityHashInput() string { | ||||
| 	) | ||||
| } | ||||
| 
 | ||||
| func (t *Toot) GetHash() string { | ||||
| 	if t.Hash == "" { | ||||
| 		t.calcHash() | ||||
| 	} | ||||
| 	return t.Hash | ||||
| } | ||||
| 
 | ||||
| func (t *Toot) calcHash() { | ||||
| 	hi := t.identityHashInput() | ||||
| 	t.Hash = Hash(t.multiHash([]byte(hi))) | ||||
| 	t.Hash = string(t.multiHash([]byte(hi))) | ||||
| } | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user