You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
steem-block-db/app.go

113 lines
3.1 KiB

package main
import "sync"
import "time"
import log "github.com/sirupsen/logrus"
//import "encoding/json"
type App struct {
api SteemAPIShape
config *appconfig
db SteemDataStorer
lock *sync.Mutex
}
type appconfig struct {
logLevel log.Level
apiUrl string
redisUrl string
desiredFetcherThreads uint
}
func NewApp(config *appconfig) *App {
self := new(App)
self.init(config)
return self
}
func (self *App) init(config *appconfig) {
log.SetLevel(config.logLevel)
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
log.SetFormatter(&log.JSONFormatter{})
//log.SetReportCaller(true)
self.api = NewSteemAPI(config.apiUrl)
self.db = NewSteemDataStore(config.redisUrl)
self.lock = &sync.Mutex{}
self.config = config
}
func (self *App) main() {
log.Infof("steem block data fetcher starting up...")
self.mainloop()
}
func (self *App) mainloop() {
log.Infof("using %d fetching threads", self.config.desiredFetcherThreads)
batchSize := uint(3000)
//batchSize := uint(10)
var start BlockNumber
var end BlockNumber
for {
self.db.SetCurrentNetworkBlockHeight(self.fetchCurrentBlockHeight())
if self.db.CurrentNetworkBlockHeight() == self.db.CurrentLocalBlockHeight() {
//we are synced
time.Sleep(1 * time.Second)
continue
}
log.Infof("current network block height = %d", self.db.CurrentNetworkBlockHeight())
log.Infof("current local block height = %d", self.db.CurrentLocalBlockHeight())
// we are not synced
// how far behind are we?
countMustFetch := uint(self.db.CurrentNetworkBlockHeight() - self.db.CurrentLocalBlockHeight())
log.Infof("we are %d blocks behind", countMustFetch)
start = self.db.CurrentLocalBlockHeight() + 1
log.Infof("beginning fetch with start block %d", start)
if countMustFetch <= batchSize {
end = self.db.CurrentNetworkBlockHeight()
log.Infof("fetch is within our batch size (%d), end block for fetch batch is %d", batchSize, end)
} else {
end = BlockNumber(uint(start) + uint(batchSize) - uint(1))
log.Infof("fetch is too large for batch size (%d), end block for fetch batch is %d", batchSize, end)
}
bf := NewBlockFetcher(&BlockFetcherConfig{
api: self.api.(*SteemAPI),
desiredFetcherThreads: self.config.desiredFetcherThreads,
startBlock: start,
endBlock: end,
})
blocks := bf.fetch()
log.Infof("blockfetcher has returned")
self.pushBlocks(blocks)
}
}
func (self *App) pushBlocks(newBlocks *[]FetchedBlock) {
counter := 0
for _, newBlock := range *newBlocks {
counter += 1
err := self.db.StoreBlockOps(newBlock.blockNumber, newBlock.blockData)
if err != nil {
log.Panic(err)
}
}
log.Infof("pushed %d new blocks to db", counter)
self.db.UpdateCurrentLocalBlockHeight()
}
func (self *App) fetchCurrentBlockHeight() BlockNumber {
r, err := self.api.GetDynamicGlobalProperties()
if err != nil {
log.Panicf("can't fetch global properties, bailing. err: %s", err)
}
if r.LastIrreversibleBlockNum < 100 {
log.Panicf("can't fetch global properties, bailing")
}
return r.LastIrreversibleBlockNum
}