You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

114 lines
3.1KB

  1. package main
  2. import "sync"
  3. import "time"
  4. import log "github.com/sirupsen/logrus"
  5. //import "encoding/json"
  6. type App struct {
  7. api SteemAPIShape
  8. config *appconfig
  9. db SteemDataStorer
  10. lock *sync.Mutex
  11. }
  12. type appconfig struct {
  13. logLevel log.Level
  14. apiUrl string
  15. redisUrl string
  16. desiredFetcherThreads uint
  17. }
  18. func NewApp(config *appconfig) *App {
  19. self := new(App)
  20. self.init(config)
  21. return self
  22. }
  23. func (self *App) init(config *appconfig) {
  24. log.SetLevel(config.logLevel)
  25. log.SetFormatter(&log.TextFormatter{
  26. FullTimestamp: true,
  27. })
  28. log.SetFormatter(&log.JSONFormatter{})
  29. //log.SetReportCaller(true)
  30. self.api = NewSteemAPI(config.apiUrl)
  31. self.db = NewSteemDataStore(config.redisUrl)
  32. self.lock = &sync.Mutex{}
  33. self.config = config
  34. }
  35. func (self *App) main() {
  36. log.Infof("steem block data fetcher starting up...")
  37. self.mainloop()
  38. }
  39. func (self *App) mainloop() {
  40. log.Infof("using %d fetching threads", self.config.desiredFetcherThreads)
  41. batchSize := uint(3000)
  42. //batchSize := uint(10)
  43. var start BlockNumber
  44. var end BlockNumber
  45. for {
  46. self.db.SetCurrentNetworkBlockHeight(self.fetchCurrentBlockHeight())
  47. if self.db.CurrentNetworkBlockHeight() == self.db.CurrentLocalBlockHeight() {
  48. //we are synced
  49. time.Sleep(1 * time.Second)
  50. continue
  51. }
  52. log.Infof("current network block height = %d", self.db.CurrentNetworkBlockHeight())
  53. log.Infof("current local block height = %d", self.db.CurrentLocalBlockHeight())
  54. // we are not synced
  55. // how far behind are we?
  56. countMustFetch := uint(self.db.CurrentNetworkBlockHeight() - self.db.CurrentLocalBlockHeight())
  57. log.Infof("we are %d blocks behind", countMustFetch)
  58. start = self.db.CurrentLocalBlockHeight() + 1
  59. log.Infof("beginning fetch with start block %d", start)
  60. if countMustFetch <= batchSize {
  61. end = self.db.CurrentNetworkBlockHeight()
  62. log.Infof("fetch is within our batch size (%d), end block for fetch batch is %d", batchSize, end)
  63. } else {
  64. end = BlockNumber(uint(start) + uint(batchSize) - uint(1))
  65. log.Infof("fetch is too large for batch size (%d), end block for fetch batch is %d", batchSize, end)
  66. }
  67. bf := NewBlockFetcher(&BlockFetcherConfig{
  68. api: self.api.(*SteemAPI),
  69. desiredFetcherThreads: self.config.desiredFetcherThreads,
  70. startBlock: start,
  71. endBlock: end,
  72. })
  73. blocks := bf.fetch()
  74. log.Infof("blockfetcher has returned")
  75. self.pushBlocks(blocks)
  76. }
  77. }
  78. func (self *App) pushBlocks(newBlocks *[]FetchedBlock) {
  79. counter := 0
  80. for _, newBlock := range *newBlocks {
  81. counter += 1
  82. err := self.db.StoreBlockOps(newBlock.blockNumber, newBlock.blockData)
  83. if err != nil {
  84. log.Panic(err)
  85. }
  86. }
  87. log.Infof("pushed %d new blocks to db", counter)
  88. self.db.UpdateCurrentLocalBlockHeight()
  89. }
  90. func (self *App) fetchCurrentBlockHeight() BlockNumber {
  91. r, err := self.api.GetDynamicGlobalProperties()
  92. if err != nil {
  93. log.Panicf("can't fetch global properties, bailing. err: %s", err)
  94. }
  95. if r.LastIrreversibleBlockNum < 100 {
  96. log.Panicf("can't fetch global properties, bailing")
  97. }
  98. return r.LastIrreversibleBlockNum
  99. }