package main import "sync" import "time" import log "github.com/sirupsen/logrus" //import "encoding/json" type App struct { api SteemAPIShape config *appconfig db SteemDataStorer lock *sync.Mutex } type appconfig struct { logLevel log.Level apiUrl string redisUrl string desiredFetcherThreads uint } func NewApp(config *appconfig) *App { self := new(App) self.init(config) return self } func (self *App) init(config *appconfig) { log.SetLevel(config.logLevel) log.SetFormatter(&log.TextFormatter{ FullTimestamp: true, }) log.SetFormatter(&log.JSONFormatter{}) //log.SetReportCaller(true) self.api = NewSteemAPI(config.apiUrl) self.db = NewSteemDataStore(config.redisUrl) self.lock = &sync.Mutex{} self.config = config } func (self *App) main() { log.Infof("steem block data fetcher starting up...") self.mainloop() } func (self *App) mainloop() { log.Infof("using %d fetching threads", self.config.desiredFetcherThreads) batchSize := uint(3000) //batchSize := uint(10) var start BlockNumber var end BlockNumber for { self.db.SetCurrentNetworkBlockHeight(self.fetchCurrentBlockHeight()) if self.db.CurrentNetworkBlockHeight() == self.db.CurrentLocalBlockHeight() { //we are synced time.Sleep(1 * time.Second) continue } log.Infof("current network block height = %d", self.db.CurrentNetworkBlockHeight()) log.Infof("current local block height = %d", self.db.CurrentLocalBlockHeight()) // we are not synced // how far behind are we? countMustFetch := uint(self.db.CurrentNetworkBlockHeight() - self.db.CurrentLocalBlockHeight()) log.Infof("we are %d blocks behind", countMustFetch) start = self.db.CurrentLocalBlockHeight() + 1 log.Infof("beginning fetch with start block %d", start) if countMustFetch <= batchSize { end = self.db.CurrentNetworkBlockHeight() log.Infof("fetch is within our batch size (%d), end block for fetch batch is %d", batchSize, end) } else { end = BlockNumber(uint(start) + uint(batchSize) - uint(1)) log.Infof("fetch is too large for batch size (%d), end block for fetch batch is %d", batchSize, end) } bf := NewBlockFetcher(&BlockFetcherConfig{ api: self.api.(*SteemAPI), desiredFetcherThreads: self.config.desiredFetcherThreads, startBlock: start, endBlock: end, }) blocks := bf.fetch() log.Infof("blockfetcher has returned") self.pushBlocks(blocks) } } func (self *App) pushBlocks(newBlocks *[]FetchedBlock) { counter := 0 for _, newBlock := range *newBlocks { counter += 1 err := self.db.StoreBlockOps(newBlock.blockNumber, newBlock.blockData) if err != nil { log.Panic(err) } } log.Infof("pushed %d new blocks to db", counter) self.db.UpdateCurrentLocalBlockHeight() } func (self *App) fetchCurrentBlockHeight() BlockNumber { r, err := self.api.GetDynamicGlobalProperties() if err != nil { log.Panicf("can't fetch global properties, bailing. err: %s", err) } if r.LastIrreversibleBlockNum < 100 { log.Panicf("can't fetch global properties, bailing") } return r.LastIrreversibleBlockNum }