steem-block-db/blockfetcher.go

173 lines
4.6 KiB
Go

package main
import "sync"
import log "github.com/sirupsen/logrus"
type FetchedBlock struct {
blockNumber BlockNumber
error *error
blockData *[]byte
}
type BlockFetcher struct {
api *SteemAPI
desiredFetcherThreads uint
wantBlocks map[BlockNumber]bool
fetchingBlocks map[BlockNumber]bool
fetchedBlocks *[]FetchedBlock
lock *sync.Mutex
workChannel chan BlockNumber
resultsChannel chan *FetchedBlock
}
type BlockFetcherConfig struct {
api *SteemAPI
desiredFetcherThreads uint
startBlock BlockNumber
endBlock BlockNumber
}
func NewBlockFetcher(config *BlockFetcherConfig) *BlockFetcher {
self := new(BlockFetcher)
self.init(config)
return self
}
func (self *BlockFetcher) Done() bool {
self.lock.Lock()
defer self.lock.Unlock()
return len(self.wantBlocks) == 0
}
func (self *BlockFetcher) init(config *BlockFetcherConfig) {
self.lock = &sync.Mutex{}
self.api = config.api
self.desiredFetcherThreads = config.desiredFetcherThreads
self.workChannel = make(chan BlockNumber)
self.resultsChannel = make(chan *FetchedBlock)
diff := int(uint(config.endBlock) - uint(config.startBlock))
log.Debugf("diff is %d", diff)
for i := 0; i <= diff; i++ {
self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true
}
log.Debugf("wantblocks[] is now %v", self.wantBlocks)
}
func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) {
if self.wantBlocks[blockNum] == false {
log.Panicf("shouldn't happen")
}
self.lock.Lock()
defer self.lock.Unlock()
delete(self.wantBlocks, blockNum)
}
func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) {
if self.fetchingBlocks[blockNum] == false {
log.Panicf("shouldn't happen")
}
self.lock.Lock()
defer self.lock.Unlock()
delete(self.fetchingBlocks, blockNum)
}
func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) {
if self.fetchingBlocks[blockNum] == true {
log.Panicf("shouldn't happen")
}
self.lock.Lock()
defer self.lock.Unlock()
if self.fetchingBlocks[blockNum] == false {
self.fetchingBlocks[blockNum] = true
}
}
func (self *BlockFetcher) fetcher(index int) {
log.Debugf("fetcher thread %d starting", index)
WorkLoop:
for blockNum := range self.workChannel {
log.Debugf("fetcher %d beginning fetch for block %d", index, blockNum)
self.addFetchingBlock(blockNum)
tries := 3
for i := 0; i < tries; i++ {
r := self.fetchBlockOpsFromNetwork(blockNum)
if r.error == nil {
// it worked, return result
self.resultsChannel <- r
continue WorkLoop
} else {
// wait a sec and try again
time.Sleep(1 * time.Second)
}
}
}
log.Debugf("fetcher thread %d ending", index)
}
func (self *BlockFetcher) fetch() *[]FetchedBlock {
for i := 1; i < self.desiredFetcherThreads+1; i++ {
go self.fetcher(i)
}
for blockNum, _ := range self.wantBlocks {
// yay cheap goroutines, let them block on the unbuffered channel
go func() {
log.Debugf("waiting to send blockNum %d into the work channel", blockNum)
self.workChannel <- blockNum
log.Debugf("sent blockNum %d into the work channel", blockNum)
}()
}
// now we have to start reading from the unbuffered resultsChannel
// otherwise the workers will block when returning results
select {
case result := <-self.resultsChannel:
self.receiveResult(result)
default:
if self.Done() == true {
// if we get here, it's because workList is now empty and there
// are no more results in the results channel.
close(self.workChannel) // shut down the workers
result := self.fetchedBlocks
self = nil //this BlockFetcher is now finished.
return result
}
time.Sleep(10 * time.Millisecond)
//FIXME(sneak) we maybe need to handle a case here where wantBlocks never
//empties but workers need to be re-dispatched..
}
}
func (self *BlockFetcher) receiveResult(r *FetchedBlock) {
log.Debugf("got result for blocknum %d", r.blockNumber)
self.removeFetchingBlock(r.blockNumber)
self.lock.Lock()
self.fetchedBlocks = append(self.fetchedBlocks, r)
self.lock.Unlock()
self.removeWantBlock(r.blockNumber)
}
func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *FetchedBlock {
result := &FetchedBlock{
blockNumber: blockNum,
error: nil,
blockData: nil,
}
r, err := self.api.GetOpsInBlock(blockNum)
if err != nil {
result.error = err
return result
}
bytes, err := json.Marshal(r)
if err != nil {
result.error = err
return result
}
count := len(*r)
log.Infof("got %d operations for block %d", count, blockNum)
result.blockData = &bytes
result.error = nil // make sure this is nil if it worked
return result
}