feta/ingester/ingester.go

60 lines
1.5 KiB
Go

package ingester
import "time"
import "github.com/rs/zerolog/log"
import "git.eeqj.de/sneak/feta/toot"
import "git.eeqj.de/sneak/feta/storage"
// TootIngester is the data structure for the ingester process that is
// responsible for storing the discovered toots
type TootIngester struct {
inbound chan *toot.Toot
recentlySeen []*seenTootMemo
storageBackend storage.TootStorageBackend
}
type seenTootMemo struct {
lastSeen time.Time
tootHash toot.Hash
}
// NewTootIngester returns a fresh TootIngester for your use
func NewTootIngester() *TootIngester {
ti := new(TootIngester)
ti.inbound = make(chan *toot.Toot, 10000)
return ti
}
// SetStorageBackend takes a type conforming to TootStorageBackend for
// persisting toots somewhere/somehow
func (ti *TootIngester) SetStorageBackend(be storage.TootStorageBackend) {
ti.storageBackend = be
}
// GetDeliveryChannel returns a channel that receives pointers to toots
// which the ingester will dedupe and store
func (ti *TootIngester) GetDeliveryChannel() chan *toot.Toot {
return ti.inbound
}
// Ingest is the main entrypoint for the TootIngester goroutine
func (ti *TootIngester) Ingest() {
log.Info().Msg("TootIngester starting")
go ti.readFromInboundChannel()
}
func (ti *TootIngester) readFromInboundChannel() {
for {
nt := <-ti.inbound
go ti.storeToot(nt)
}
}
func (ti *TootIngester) storeToot(t *toot.Toot) {
// FIXME first check for dupes in recentlySeen
if ti.storageBackend == nil {
panic("no storage backend")
}
ti.storageBackend.StoreToot(*t)
}