package main import "encoding/json" import "sync" import "time" import log "github.com/sirupsen/logrus" type FetchedBlock struct { blockNumber BlockNumber error *error blockData *[]byte } type BlockFetcher struct { api *SteemAPI desiredFetcherThreads uint wantBlocks map[BlockNumber]bool fetchingBlocks map[BlockNumber]bool fetchedBlocks map[BlockNumber]*FetchedBlock lock *sync.Mutex workChannel chan BlockNumber resultsChannel chan *FetchedBlock } type BlockFetcherConfig struct { api *SteemAPI desiredFetcherThreads uint startBlock BlockNumber endBlock BlockNumber } func NewBlockFetcher(config *BlockFetcherConfig) *BlockFetcher { self := new(BlockFetcher) self.init(config) return self } func (self *BlockFetcher) Done() bool { self.lock.Lock() defer self.lock.Unlock() return len(self.wantBlocks) == 0 } func (self *BlockFetcher) init(config *BlockFetcherConfig) { self.lock = &sync.Mutex{} self.api = config.api self.desiredFetcherThreads = config.desiredFetcherThreads self.workChannel = make(chan BlockNumber) self.resultsChannel = make(chan *FetchedBlock) self.wantBlocks = make(map[BlockNumber]bool) self.fetchedBlocks = make(map[BlockNumber]*FetchedBlock) diff := int(uint(config.endBlock) - uint(config.startBlock)) for i := 0; i <= diff; i++ { self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true } log.Debugf("wantblocks[] is now %v", self.wantBlocks) } func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) { self.lock.Lock() defer self.lock.Unlock() if self.wantBlocks[blockNum] == false { log.Panicf("shouldn't happen") } delete(self.wantBlocks, blockNum) } func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) { self.lock.Lock() defer self.lock.Unlock() if self.fetchingBlocks[blockNum] == false { log.Panicf("shouldn't happen") } delete(self.fetchingBlocks, blockNum) } func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) { self.lock.Lock() defer self.lock.Unlock() if self.fetchingBlocks[blockNum] == true { log.Panicf("shouldn't happen") } if self.fetchingBlocks == nil { self.fetchingBlocks = make(map[BlockNumber]bool) } if self.fetchingBlocks[blockNum] == false { self.fetchingBlocks[blockNum] = true } } func (self *BlockFetcher) fetcher(index uint) { log.Debugf("fetcher thread %d starting", index) WorkLoop: for blockNum := range self.workChannel { log.Debugf("fetcher %d beginning fetch for block %d", index, blockNum) self.addFetchingBlock(blockNum) tries := 3 for i := 0; i < tries; i++ { r := self.fetchBlockOpsFromNetwork(blockNum) if r.error == nil { // it worked, return result self.resultsChannel <- r continue WorkLoop } else { // wait a sec and try again time.Sleep(1 * time.Second) log.Infof("error fetching block %d, retry %d (%v)", blockNum, i+1, r.error) } } } log.Debugf("fetcher thread %d ending", index) } func (self *BlockFetcher) sendWork(b BlockNumber) { go func() { // yay cheap goroutines, let them block on the unbuffered channel log.Debugf("waiting to send blockNum %d into the work channel", b) self.workChannel <- b log.Debugf("sent blockNum %d into the work channel", b) }() } func (self *BlockFetcher) fetch() *[]FetchedBlock { log.Debugf("blockfetcher beginning fetch") startTime := time.Now() for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ { go self.fetcher(i) } self.lock.Lock() for blockNum := range self.wantBlocks { self.sendWork(blockNum) } self.lock.Unlock() // now we have to start reading from the unbuffered resultsChannel // otherwise the workers will block when returning results for { select { case result := <-self.resultsChannel: self.receiveResult(result) default: if startTime.Add(30 * time.Second).Before(time.Now()) { // fetch took too long, return anyway log.Infof("30s fetcher batch timeout reached, but not done fetching") // return what we have var final []FetchedBlock self.lock.Lock() for _, value := range self.fetchedBlocks { final = append(final, *value) } self.lock.Unlock() self = nil //this BlockFetcher is now finished. return &final } if self.Done() == true { log.Infof("blockfetcher %+v considers itself Done()", self) // if we get here, it's because workList is now empty and there // are no more results in the results channel. close(self.workChannel) // shut down the workers var final []FetchedBlock self.lock.Lock() for _, value := range self.fetchedBlocks { final = append(final, *value) } self.lock.Unlock() self = nil //this BlockFetcher is now finished. return &final } // in this case we are not done but got nothing from the result // channel so just wait a little bit to get more results and // check the channel again time.Sleep(10 * time.Millisecond) //FIXME(sneak) we maybe need to handle a case here where wantBlocks never //empties but workers need to be re-dispatched.. } } log.Panicf("this shouldn't happen") return nil //shouldn't happen, return should happen from above } func (self *BlockFetcher) receiveResult(r *FetchedBlock) { log.Debugf("got result for blocknum %d", r.blockNumber) self.removeFetchingBlock(r.blockNumber) self.lock.Lock() self.fetchedBlocks[r.blockNumber] = r self.lock.Unlock() self.removeWantBlock(r.blockNumber) } func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *FetchedBlock { result := &FetchedBlock{ blockNumber: blockNum, error: nil, blockData: nil, } r, err := self.api.GetOpsInBlock(blockNum) if err != nil { result.error = &err return result } bytes, err := json.Marshal(r) if err != nil { result.error = &err return result } count := len(*r) log.Infof("got %d operations for block %d fetched from network", count, blockNum) result.blockData = &bytes result.error = nil // make sure this is nil if it worked return result }