|
- package main
-
- import "encoding/json"
- import "sync"
- import "time"
- import log "github.com/sirupsen/logrus"
-
- type FetchedBlock struct {
- blockNumber BlockNumber
- error *error
- blockData *[]byte
- }
-
- type BlockFetcher struct {
- api *SteemAPI
- desiredFetcherThreads uint
- wantBlocks map[BlockNumber]bool
- fetchingBlocks map[BlockNumber]bool
- fetchedBlocks map[BlockNumber]*FetchedBlock
- lock *sync.Mutex
- workChannel chan BlockNumber
- resultsChannel chan *FetchedBlock
- }
-
- type BlockFetcherConfig struct {
- api *SteemAPI
- desiredFetcherThreads uint
- startBlock BlockNumber
- endBlock BlockNumber
- }
-
- func NewBlockFetcher(config *BlockFetcherConfig) *BlockFetcher {
- self := new(BlockFetcher)
- self.init(config)
- return self
- }
-
- func (self *BlockFetcher) Done() bool {
- self.lock.Lock()
- defer self.lock.Unlock()
- return len(self.wantBlocks) == 0
- }
-
- func (self *BlockFetcher) init(config *BlockFetcherConfig) {
- self.lock = &sync.Mutex{}
- self.api = config.api
- self.desiredFetcherThreads = config.desiredFetcherThreads
- self.workChannel = make(chan BlockNumber)
- self.resultsChannel = make(chan *FetchedBlock)
- self.wantBlocks = make(map[BlockNumber]bool)
- self.fetchedBlocks = make(map[BlockNumber]*FetchedBlock)
-
- diff := int(uint(config.endBlock) - uint(config.startBlock))
- for i := 0; i <= diff; i++ {
- self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true
- }
-
- log.Debugf("wantblocks[] is now %v", self.wantBlocks)
- }
-
- func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) {
- self.lock.Lock()
- defer self.lock.Unlock()
- if self.wantBlocks[blockNum] == false {
- log.Panicf("shouldn't happen")
- }
- delete(self.wantBlocks, blockNum)
- }
-
- func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) {
- self.lock.Lock()
- defer self.lock.Unlock()
- if self.fetchingBlocks[blockNum] == false {
- log.Panicf("shouldn't happen")
- }
- delete(self.fetchingBlocks, blockNum)
- }
-
- func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) {
- self.lock.Lock()
- defer self.lock.Unlock()
- if self.fetchingBlocks[blockNum] == true {
- log.Panicf("shouldn't happen")
- }
- if self.fetchingBlocks == nil {
- self.fetchingBlocks = make(map[BlockNumber]bool)
- }
- if self.fetchingBlocks[blockNum] == false {
- self.fetchingBlocks[blockNum] = true
- }
- }
-
- func (self *BlockFetcher) fetcher(index uint) {
- log.Debugf("fetcher thread %d starting", index)
-
- WorkLoop:
- for blockNum := range self.workChannel {
- log.Debugf("fetcher %d beginning fetch for block %d", index, blockNum)
- self.addFetchingBlock(blockNum)
- tries := 3
- for i := 0; i < tries; i++ {
- r := self.fetchBlockOpsFromNetwork(blockNum)
- if r.error == nil {
- // it worked, return result
- self.resultsChannel <- r
- continue WorkLoop
- } else {
- // wait a sec and try again
- time.Sleep(1 * time.Second)
- log.Infof("error fetching block %d, retry %d (%v)", blockNum, i+1, r.error)
- }
- }
- }
- log.Debugf("fetcher thread %d ending", index)
- }
-
- func (self *BlockFetcher) sendWork(b BlockNumber) {
- go func() {
- // yay cheap goroutines, let them block on the unbuffered channel
- log.Debugf("waiting to send blockNum %d into the work channel", b)
- self.workChannel <- b
- log.Debugf("sent blockNum %d into the work channel", b)
- }()
- }
-
- func (self *BlockFetcher) fetch() *[]FetchedBlock {
- log.Debugf("blockfetcher beginning fetch")
- startTime := time.Now()
- for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ {
- go self.fetcher(i)
- }
-
- self.lock.Lock()
- for blockNum := range self.wantBlocks {
- self.sendWork(blockNum)
- }
- self.lock.Unlock()
-
- // now we have to start reading from the unbuffered resultsChannel
- // otherwise the workers will block when returning results
- for {
- select {
- case result := <-self.resultsChannel:
- self.receiveResult(result)
- default:
-
- if startTime.Add(30 * time.Second).Before(time.Now()) {
- // fetch took too long, return anyway
- log.Infof("30s fetcher batch timeout reached, but not done fetching")
- // return what we have
- var final []FetchedBlock
- self.lock.Lock()
- for _, value := range self.fetchedBlocks {
- final = append(final, *value)
- }
- self.lock.Unlock()
- self = nil //this BlockFetcher is now finished.
- return &final
- }
-
- if self.Done() == true {
- log.Infof("blockfetcher %+v considers itself Done()", self)
- // if we get here, it's because workList is now empty and there
- // are no more results in the results channel.
- close(self.workChannel) // shut down the workers
- var final []FetchedBlock
- self.lock.Lock()
- for _, value := range self.fetchedBlocks {
- final = append(final, *value)
- }
- self.lock.Unlock()
- self = nil //this BlockFetcher is now finished.
- return &final
- }
- // in this case we are not done but got nothing from the result
- // channel so just wait a little bit to get more results and
- // check the channel again
- time.Sleep(10 * time.Millisecond)
- //FIXME(sneak) we maybe need to handle a case here where wantBlocks never
- //empties but workers need to be re-dispatched..
- }
- }
- log.Panicf("this shouldn't happen")
- return nil //shouldn't happen, return should happen from above
- }
-
- func (self *BlockFetcher) receiveResult(r *FetchedBlock) {
- log.Debugf("got result for blocknum %d", r.blockNumber)
- self.removeFetchingBlock(r.blockNumber)
- self.lock.Lock()
- self.fetchedBlocks[r.blockNumber] = r
- self.lock.Unlock()
- self.removeWantBlock(r.blockNumber)
- }
-
- func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *FetchedBlock {
- result := &FetchedBlock{
- blockNumber: blockNum,
- error: nil,
- blockData: nil,
- }
- r, err := self.api.GetOpsInBlock(blockNum)
- if err != nil {
- result.error = &err
- return result
- }
- bytes, err := json.Marshal(r)
- if err != nil {
- result.error = &err
- return result
- }
- count := len(*r)
- log.Infof("got %d operations for block %d fetched from network", count, blockNum)
- result.blockData = &bytes
- result.error = nil // make sure this is nil if it worked
- return result
- }
|