package main import "sync" import log "github.com/sirupsen/logrus" type FetchedBlock struct { blockNumber BlockNumber error *error blockData *[]byte } type BlockFetcher struct { api *SteemAPI desiredFetcherThreads uint wantBlocks map[BlockNumber]bool fetchingBlocks map[BlockNumber]bool fetchedBlocks *[]FetchedBlock lock *sync.Mutex workChannel chan BlockNumber resultsChannel chan *FetchedBlock } type BlockFetcherConfig struct { api *SteemAPI desiredFetcherThreads uint startBlock BlockNumber endBlock BlockNumber } func NewBlockFetcher(config *BlockFetcherConfig) *BlockFetcher { self := new(BlockFetcher) self.init(config) return self } func (self *BlockFetcher) Done() bool { self.lock.Lock() defer self.lock.Unlock() return len(self.wantBlocks) == 0 } func (self *BlockFetcher) init(config *BlockFetcherConfig) { self.lock = &sync.Mutex{} self.api = config.api self.desiredFetcherThreads = config.desiredFetcherThreads self.workChannel = make(chan BlockNumber) self.resultsChannel = make(chan *FetchedBlock) diff := int(uint(config.endBlock) - uint(config.startBlock)) log.Debugf("diff is %d", diff) for i := 0; i <= diff; i++ { self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true } log.Debugf("wantblocks[] is now %v", self.wantBlocks) } func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) { if self.wantBlocks[blockNum] == false { log.Panicf("shouldn't happen") } self.lock.Lock() defer self.lock.Unlock() delete(self.wantBlocks, blockNum) } func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) { if self.fetchingBlocks[blockNum] == false { log.Panicf("shouldn't happen") } self.lock.Lock() defer self.lock.Unlock() delete(self.fetchingBlocks, blockNum) } func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) { if self.fetchingBlocks[blockNum] == true { log.Panicf("shouldn't happen") } self.lock.Lock() defer self.lock.Unlock() if self.fetchingBlocks[blockNum] == false { self.fetchingBlocks[blockNum] = true } } func (self *BlockFetcher) fetcher(index int) { log.Debugf("fetcher thread %d starting", index) WorkLoop: for blockNum := range self.workChannel { log.Debugf("fetcher %d beginning fetch for block %d", index, blockNum) self.addFetchingBlock(blockNum) tries := 3 for i := 0; i < tries; i++ { r := self.fetchBlockOpsFromNetwork(blockNum) if r.error == nil { // it worked, return result self.resultsChannel <- r continue WorkLoop } else { // wait a sec and try again time.Sleep(1 * time.Second) } } } log.Debugf("fetcher thread %d ending", index) } func (self *BlockFetcher) fetch() *[]FetchedBlock { for i := 1; i < self.desiredFetcherThreads+1; i++ { go self.fetcher(i) } for blockNum, _ := range self.wantBlocks { // yay cheap goroutines, let them block on the unbuffered channel go func() { log.Debugf("waiting to send blockNum %d into the work channel", blockNum) self.workChannel <- blockNum log.Debugf("sent blockNum %d into the work channel", blockNum) }() } // now we have to start reading from the unbuffered resultsChannel // otherwise the workers will block when returning results select { case result := <-self.resultsChannel: self.receiveResult(result) default: if self.Done() == true { // if we get here, it's because workList is now empty and there // are no more results in the results channel. close(self.workChannel) // shut down the workers result := self.fetchedBlocks self = nil //this BlockFetcher is now finished. return result } time.Sleep(10 * time.Millisecond) //FIXME(sneak) we maybe need to handle a case here where wantBlocks never //empties but workers need to be re-dispatched.. } } func (self *BlockFetcher) receiveResult(r *FetchedBlock) { log.Debugf("got result for blocknum %d", r.blockNumber) self.removeFetchingBlock(r.blockNumber) self.lock.Lock() self.fetchedBlocks = append(self.fetchedBlocks, r) self.lock.Unlock() self.removeWantBlock(r.blockNumber) } func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *FetchedBlock { result := &FetchedBlock{ blockNumber: blockNum, error: nil, blockData: nil, } r, err := self.api.GetOpsInBlock(blockNum) if err != nil { result.error = err return result } bytes, err := json.Marshal(r) if err != nil { result.error = err return result } count := len(*r) log.Infof("got %d operations for block %d", count, blockNum) result.blockData = &bytes result.error = nil // make sure this is nil if it worked return result }