You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

218 lines
6.0KB

  1. package main
  2. import "encoding/json"
  3. import "sync"
  4. import "time"
  5. import log "github.com/sirupsen/logrus"
  6. type FetchedBlock struct {
  7. blockNumber BlockNumber
  8. error *error
  9. blockData *[]byte
  10. }
  11. type BlockFetcher struct {
  12. api *SteemAPI
  13. desiredFetcherThreads uint
  14. wantBlocks map[BlockNumber]bool
  15. fetchingBlocks map[BlockNumber]bool
  16. fetchedBlocks map[BlockNumber]*FetchedBlock
  17. lock *sync.Mutex
  18. workChannel chan BlockNumber
  19. resultsChannel chan *FetchedBlock
  20. }
  21. type BlockFetcherConfig struct {
  22. api *SteemAPI
  23. desiredFetcherThreads uint
  24. startBlock BlockNumber
  25. endBlock BlockNumber
  26. }
  27. func NewBlockFetcher(config *BlockFetcherConfig) *BlockFetcher {
  28. self := new(BlockFetcher)
  29. self.init(config)
  30. return self
  31. }
  32. func (self *BlockFetcher) Done() bool {
  33. self.lock.Lock()
  34. defer self.lock.Unlock()
  35. return len(self.wantBlocks) == 0
  36. }
  37. func (self *BlockFetcher) init(config *BlockFetcherConfig) {
  38. self.lock = &sync.Mutex{}
  39. self.api = config.api
  40. self.desiredFetcherThreads = config.desiredFetcherThreads
  41. self.workChannel = make(chan BlockNumber)
  42. self.resultsChannel = make(chan *FetchedBlock)
  43. self.wantBlocks = make(map[BlockNumber]bool)
  44. self.fetchedBlocks = make(map[BlockNumber]*FetchedBlock)
  45. diff := int(uint(config.endBlock) - uint(config.startBlock))
  46. for i := 0; i <= diff; i++ {
  47. self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true
  48. }
  49. log.Debugf("wantblocks[] is now %v", self.wantBlocks)
  50. }
  51. func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) {
  52. self.lock.Lock()
  53. defer self.lock.Unlock()
  54. if self.wantBlocks[blockNum] == false {
  55. log.Panicf("shouldn't happen")
  56. }
  57. delete(self.wantBlocks, blockNum)
  58. }
  59. func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) {
  60. self.lock.Lock()
  61. defer self.lock.Unlock()
  62. if self.fetchingBlocks[blockNum] == false {
  63. log.Panicf("shouldn't happen")
  64. }
  65. delete(self.fetchingBlocks, blockNum)
  66. }
  67. func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) {
  68. self.lock.Lock()
  69. defer self.lock.Unlock()
  70. if self.fetchingBlocks[blockNum] == true {
  71. log.Panicf("shouldn't happen")
  72. }
  73. if self.fetchingBlocks == nil {
  74. self.fetchingBlocks = make(map[BlockNumber]bool)
  75. }
  76. if self.fetchingBlocks[blockNum] == false {
  77. self.fetchingBlocks[blockNum] = true
  78. }
  79. }
  80. func (self *BlockFetcher) fetcher(index uint) {
  81. log.Debugf("fetcher thread %d starting", index)
  82. WorkLoop:
  83. for blockNum := range self.workChannel {
  84. log.Debugf("fetcher %d beginning fetch for block %d", index, blockNum)
  85. self.addFetchingBlock(blockNum)
  86. tries := 3
  87. for i := 0; i < tries; i++ {
  88. r := self.fetchBlockOpsFromNetwork(blockNum)
  89. if r.error == nil {
  90. // it worked, return result
  91. self.resultsChannel <- r
  92. continue WorkLoop
  93. } else {
  94. // wait a sec and try again
  95. time.Sleep(1 * time.Second)
  96. log.Infof("error fetching block %d, retry %d (%v)", blockNum, i+1, r.error)
  97. }
  98. }
  99. }
  100. log.Debugf("fetcher thread %d ending", index)
  101. }
  102. func (self *BlockFetcher) sendWork(b BlockNumber) {
  103. go func() {
  104. // yay cheap goroutines, let them block on the unbuffered channel
  105. log.Debugf("waiting to send blockNum %d into the work channel", b)
  106. self.workChannel <- b
  107. log.Debugf("sent blockNum %d into the work channel", b)
  108. }()
  109. }
  110. func (self *BlockFetcher) fetch() *[]FetchedBlock {
  111. log.Debugf("blockfetcher beginning fetch")
  112. startTime := time.Now()
  113. for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ {
  114. go self.fetcher(i)
  115. }
  116. self.lock.Lock()
  117. for blockNum := range self.wantBlocks {
  118. self.sendWork(blockNum)
  119. }
  120. self.lock.Unlock()
  121. // now we have to start reading from the unbuffered resultsChannel
  122. // otherwise the workers will block when returning results
  123. for {
  124. select {
  125. case result := <-self.resultsChannel:
  126. self.receiveResult(result)
  127. default:
  128. if startTime.Add(30 * time.Second).Before(time.Now()) {
  129. // fetch took too long, return anyway
  130. log.Infof("30s fetcher batch timeout reached, but not done fetching")
  131. // return what we have
  132. var final []FetchedBlock
  133. self.lock.Lock()
  134. for _, value := range self.fetchedBlocks {
  135. final = append(final, *value)
  136. }
  137. self.lock.Unlock()
  138. self = nil //this BlockFetcher is now finished.
  139. return &final
  140. }
  141. if self.Done() == true {
  142. log.Infof("blockfetcher %+v considers itself Done()", self)
  143. // if we get here, it's because workList is now empty and there
  144. // are no more results in the results channel.
  145. close(self.workChannel) // shut down the workers
  146. var final []FetchedBlock
  147. self.lock.Lock()
  148. for _, value := range self.fetchedBlocks {
  149. final = append(final, *value)
  150. }
  151. self.lock.Unlock()
  152. self = nil //this BlockFetcher is now finished.
  153. return &final
  154. }
  155. // in this case we are not done but got nothing from the result
  156. // channel so just wait a little bit to get more results and
  157. // check the channel again
  158. time.Sleep(10 * time.Millisecond)
  159. //FIXME(sneak) we maybe need to handle a case here where wantBlocks never
  160. //empties but workers need to be re-dispatched..
  161. }
  162. }
  163. log.Panicf("this shouldn't happen")
  164. return nil //shouldn't happen, return should happen from above
  165. }
  166. func (self *BlockFetcher) receiveResult(r *FetchedBlock) {
  167. log.Debugf("got result for blocknum %d", r.blockNumber)
  168. self.removeFetchingBlock(r.blockNumber)
  169. self.lock.Lock()
  170. self.fetchedBlocks[r.blockNumber] = r
  171. self.lock.Unlock()
  172. self.removeWantBlock(r.blockNumber)
  173. }
  174. func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *FetchedBlock {
  175. result := &FetchedBlock{
  176. blockNumber: blockNum,
  177. error: nil,
  178. blockData: nil,
  179. }
  180. r, err := self.api.GetOpsInBlock(blockNum)
  181. if err != nil {
  182. result.error = &err
  183. return result
  184. }
  185. bytes, err := json.Marshal(r)
  186. if err != nil {
  187. result.error = &err
  188. return result
  189. }
  190. count := len(*r)
  191. log.Infof("got %d operations for block %d fetched from network", count, blockNum)
  192. result.blockData = &bytes
  193. result.error = nil // make sure this is nil if it worked
  194. return result
  195. }