114 lignes
3.1 KiB
Go
114 lignes
3.1 KiB
Go
package main
|
|
|
|
import "sync"
|
|
import "time"
|
|
import log "github.com/sirupsen/logrus"
|
|
|
|
//import "encoding/json"
|
|
|
|
type App struct {
|
|
api SteemAPIShape
|
|
config *appconfig
|
|
db SteemDataStorer
|
|
lock *sync.Mutex
|
|
}
|
|
|
|
type appconfig struct {
|
|
logLevel log.Level
|
|
apiUrl string
|
|
redisUrl string
|
|
desiredFetcherThreads uint
|
|
}
|
|
|
|
func NewApp(config *appconfig) *App {
|
|
self := new(App)
|
|
self.init(config)
|
|
return self
|
|
}
|
|
|
|
func (self *App) init(config *appconfig) {
|
|
log.SetLevel(config.logLevel)
|
|
log.SetFormatter(&log.TextFormatter{
|
|
FullTimestamp: true,
|
|
})
|
|
log.SetFormatter(&log.JSONFormatter{})
|
|
//log.SetReportCaller(true)
|
|
self.api = NewSteemAPI(config.apiUrl)
|
|
self.db = NewSteemDataStore(config.redisUrl)
|
|
self.lock = &sync.Mutex{}
|
|
self.config = config
|
|
}
|
|
|
|
func (self *App) main() {
|
|
log.Infof("steem block data fetcher starting up...")
|
|
self.mainloop()
|
|
}
|
|
|
|
func (self *App) mainloop() {
|
|
log.Infof("using %d fetching threads", self.config.desiredFetcherThreads)
|
|
batchSize := uint(3000)
|
|
//batchSize := uint(10)
|
|
var start BlockNumber
|
|
var end BlockNumber
|
|
for {
|
|
self.db.SetCurrentNetworkBlockHeight(self.fetchCurrentBlockHeight())
|
|
if self.db.CurrentNetworkBlockHeight() == self.db.CurrentLocalBlockHeight() {
|
|
//we are synced
|
|
time.Sleep(1 * time.Second)
|
|
continue
|
|
}
|
|
log.Infof("current network block height = %d", self.db.CurrentNetworkBlockHeight())
|
|
log.Infof("current local block height = %d", self.db.CurrentLocalBlockHeight())
|
|
// we are not synced
|
|
|
|
// how far behind are we?
|
|
countMustFetch := uint(self.db.CurrentNetworkBlockHeight() - self.db.CurrentLocalBlockHeight())
|
|
log.Infof("we are %d blocks behind", countMustFetch)
|
|
start = self.db.CurrentLocalBlockHeight() + 1
|
|
log.Infof("beginning fetch with start block %d", start)
|
|
if countMustFetch <= batchSize {
|
|
end = self.db.CurrentNetworkBlockHeight()
|
|
log.Infof("fetch is within our batch size (%d), end block for fetch batch is %d", batchSize, end)
|
|
} else {
|
|
end = BlockNumber(uint(start) + uint(batchSize) - uint(1))
|
|
log.Infof("fetch is too large for batch size (%d), end block for fetch batch is %d", batchSize, end)
|
|
}
|
|
|
|
bf := NewBlockFetcher(&BlockFetcherConfig{
|
|
api: self.api.(*SteemAPI),
|
|
desiredFetcherThreads: self.config.desiredFetcherThreads,
|
|
startBlock: start,
|
|
endBlock: end,
|
|
})
|
|
|
|
blocks := bf.fetch()
|
|
log.Infof("blockfetcher has returned")
|
|
self.pushBlocks(blocks)
|
|
}
|
|
}
|
|
|
|
func (self *App) pushBlocks(newBlocks *[]FetchedBlock) {
|
|
counter := 0
|
|
for _, newBlock := range *newBlocks {
|
|
counter += 1
|
|
err := self.db.StoreBlockOps(newBlock.blockNumber, newBlock.blockData)
|
|
if err != nil {
|
|
log.Panic(err)
|
|
}
|
|
}
|
|
log.Infof("pushed %d new blocks to db", counter)
|
|
self.db.UpdateCurrentLocalBlockHeight()
|
|
}
|
|
|
|
func (self *App) fetchCurrentBlockHeight() BlockNumber {
|
|
|
|
r, err := self.api.GetDynamicGlobalProperties()
|
|
if err != nil {
|
|
log.Panicf("can't fetch global properties, bailing. err: %s", err)
|
|
}
|
|
if r.LastIrreversibleBlockNum < 100 {
|
|
log.Panicf("can't fetch global properties, bailing")
|
|
}
|
|
return r.LastIrreversibleBlockNum
|
|
}
|