60 lines
1.5 KiB
Go
60 lines
1.5 KiB
Go
package ingester
|
|
|
|
import "time"
|
|
import "github.com/rs/zerolog/log"
|
|
import "git.eeqj.de/sneak/feta/toot"
|
|
import "git.eeqj.de/sneak/feta/storage"
|
|
|
|
// TootIngester is the data structure for the ingester process that is
|
|
// responsible for storing the discovered toots
|
|
type TootIngester struct {
|
|
inbound chan *toot.Toot
|
|
recentlySeen []*seenTootMemo
|
|
storageBackend storage.TootStorageBackend
|
|
}
|
|
|
|
type seenTootMemo struct {
|
|
lastSeen time.Time
|
|
tootHash toot.Hash
|
|
}
|
|
|
|
// NewTootIngester returns a fresh TootIngester for your use
|
|
func NewTootIngester() *TootIngester {
|
|
ti := new(TootIngester)
|
|
ti.inbound = make(chan *toot.Toot, 10000)
|
|
return ti
|
|
}
|
|
|
|
// SetStorageBackend takes a type conforming to TootStorageBackend for
|
|
// persisting toots somewhere/somehow
|
|
func (ti *TootIngester) SetStorageBackend(be storage.TootStorageBackend) {
|
|
ti.storageBackend = be
|
|
}
|
|
|
|
// GetDeliveryChannel returns a channel that receives pointers to toots
|
|
// which the ingester will dedupe and store
|
|
func (ti *TootIngester) GetDeliveryChannel() chan *toot.Toot {
|
|
return ti.inbound
|
|
}
|
|
|
|
// Ingest is the main entrypoint for the TootIngester goroutine
|
|
func (ti *TootIngester) Ingest() {
|
|
log.Info().Msg("TootIngester starting")
|
|
go ti.readFromInboundChannel()
|
|
}
|
|
|
|
func (ti *TootIngester) readFromInboundChannel() {
|
|
for {
|
|
nt := <-ti.inbound
|
|
go ti.storeToot(nt)
|
|
}
|
|
}
|
|
|
|
func (ti *TootIngester) storeToot(t *toot.Toot) {
|
|
// FIXME first check for dupes in recentlySeen
|
|
if ti.storageBackend == nil {
|
|
panic("no storage backend")
|
|
}
|
|
ti.storageBackend.StoreToot(*t)
|
|
}
|