feta/ingester/ingester.go

60 lines
1.5 KiB
Go
Raw Normal View History

package ingester
import "time"
import "github.com/rs/zerolog/log"
import "github.com/sneak/feta/toot"
import "github.com/sneak/feta/storage"
2019-12-19 13:20:23 +00:00
// TootIngester is the data structure for the ingester process that is
// responsible for storing the discovered toots
type TootIngester struct {
inbound chan *toot.Toot
recentlySeen []*seenTootMemo
2019-12-19 13:20:23 +00:00
storageBackend storage.TootStorageBackend
}
type seenTootMemo struct {
lastSeen time.Time
2019-12-19 13:20:23 +00:00
tootHash toot.Hash
}
2019-12-19 13:20:23 +00:00
// NewTootIngester returns a fresh TootIngester for your use
func NewTootIngester() *TootIngester {
ti := new(TootIngester)
2019-12-19 13:20:23 +00:00
ti.inbound = make(chan *toot.Toot, 10000)
return ti
}
2019-12-19 13:20:23 +00:00
// SetStorageBackend takes a type conforming to TootStorageBackend for
// persisting toots somewhere/somehow
func (ti *TootIngester) SetStorageBackend(be storage.TootStorageBackend) {
ti.storageBackend = be
}
// GetDeliveryChannel returns a channel that receives pointers to toots
// which the ingester will dedupe and store
func (ti *TootIngester) GetDeliveryChannel() chan *toot.Toot {
return ti.inbound
}
2019-12-19 13:20:23 +00:00
// Ingest is the main entrypoint for the TootIngester goroutine
func (ti *TootIngester) Ingest() {
log.Info().Msg("TootIngester starting")
2019-12-19 13:20:23 +00:00
go ti.readFromInboundChannel()
}
func (ti *TootIngester) readFromInboundChannel() {
for {
2019-12-19 13:20:23 +00:00
nt := <-ti.inbound
go ti.storeToot(nt)
}
}
2019-12-19 13:20:23 +00:00
func (ti *TootIngester) storeToot(t *toot.Toot) {
// FIXME first check for dupes in recentlySeen
2019-12-19 14:24:26 +00:00
if ti.storageBackend == nil {
panic("no storage backend")
}
2019-12-19 13:20:23 +00:00
ti.storageBackend.StoreToot(*t)
}