feta/ingester/ingester.go
sneak 06df947186
Some checks failed
continuous-integration/drone/push Build is failing
major overhaul, including:
* builds with echo now instead of gin
* beginning of web UI
* factor out util functions
2020-03-30 16:05:53 -07:00

65 lines
1.5 KiB
Go

package ingester
import (
"time"
"git.eeqj.de/sneak/feta/storage"
"git.eeqj.de/sneak/feta/toot"
"github.com/rs/zerolog/log"
)
// TootIngester is the data structure for the ingester process that is
// responsible for storing the discovered toots
type TootIngester struct {
inbound chan *toot.Toot
recentlySeen []*seenTootMemo
storageBackend storage.TootStorageBackend
}
type seenTootMemo struct {
lastSeen time.Time
tootHash string
}
// NewTootIngester returns a fresh TootIngester for your use
func NewTootIngester() *TootIngester {
ti := new(TootIngester)
ti.inbound = make(chan *toot.Toot, 10000)
return ti
}
// SetStorageBackend takes a type conforming to TootStorageBackend for
// persisting toots somewhere/somehow
func (ti *TootIngester) SetStorageBackend(be storage.TootStorageBackend) {
ti.storageBackend = be
}
// GetDeliveryChannel returns a channel that receives pointers to toots
// which the ingester will dedupe and store
func (ti *TootIngester) GetDeliveryChannel() chan *toot.Toot {
return ti.inbound
}
// Ingest is the main entrypoint for the TootIngester goroutine
func (ti *TootIngester) Ingest() {
log.Info().Msg("TootIngester starting")
go ti.readFromInboundChannel()
}
func (ti *TootIngester) readFromInboundChannel() {
for {
nt := <-ti.inbound
go ti.storeToot(nt)
}
}
func (ti *TootIngester) storeToot(t *toot.Toot) {
// FIXME first check for dupes in recentlySeen
if ti.storageBackend == nil {
panic("no storage backend")
}
if !ti.storageBackend.TootExists(t) {
ti.storageBackend.StoreToot(t)
}
}