201 lines
5.5 KiB
Go
201 lines
5.5 KiB
Go
package main
|
|
|
|
import "encoding/json"
|
|
import "sync"
|
|
import "time"
|
|
import log "github.com/sirupsen/logrus"
|
|
|
|
type FetchedBlock struct {
|
|
blockNumber BlockNumber
|
|
error *error
|
|
blockData *[]byte
|
|
}
|
|
|
|
type BlockFetcher struct {
|
|
api *SteemAPI
|
|
desiredFetcherThreads uint
|
|
wantBlocks map[BlockNumber]bool
|
|
fetchingBlocks map[BlockNumber]bool
|
|
fetchedBlocks map[BlockNumber]*FetchedBlock
|
|
lock *sync.Mutex
|
|
workChannel chan BlockNumber
|
|
resultsChannel chan *FetchedBlock
|
|
}
|
|
|
|
type BlockFetcherConfig struct {
|
|
api *SteemAPI
|
|
desiredFetcherThreads uint
|
|
startBlock BlockNumber
|
|
endBlock BlockNumber
|
|
}
|
|
|
|
func NewBlockFetcher(config *BlockFetcherConfig) *BlockFetcher {
|
|
self := new(BlockFetcher)
|
|
self.init(config)
|
|
return self
|
|
}
|
|
|
|
func (self *BlockFetcher) Done() bool {
|
|
self.lock.Lock()
|
|
defer self.lock.Unlock()
|
|
return len(self.wantBlocks) == 0
|
|
}
|
|
|
|
func (self *BlockFetcher) init(config *BlockFetcherConfig) {
|
|
self.lock = &sync.Mutex{}
|
|
self.api = config.api
|
|
self.desiredFetcherThreads = config.desiredFetcherThreads
|
|
self.workChannel = make(chan BlockNumber)
|
|
self.resultsChannel = make(chan *FetchedBlock)
|
|
self.wantBlocks = make(map[BlockNumber]bool)
|
|
self.fetchedBlocks = make(map[BlockNumber]*FetchedBlock)
|
|
|
|
diff := int(uint(config.endBlock) - uint(config.startBlock))
|
|
for i := 0; i <= diff; i++ {
|
|
self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true
|
|
}
|
|
|
|
log.Debugf("wantblocks[] is now %v", self.wantBlocks)
|
|
}
|
|
|
|
func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) {
|
|
self.lock.Lock()
|
|
defer self.lock.Unlock()
|
|
if self.wantBlocks[blockNum] == false {
|
|
log.Panicf("shouldn't happen")
|
|
}
|
|
delete(self.wantBlocks, blockNum)
|
|
}
|
|
|
|
func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) {
|
|
self.lock.Lock()
|
|
defer self.lock.Unlock()
|
|
if self.fetchingBlocks[blockNum] == false {
|
|
log.Panicf("shouldn't happen")
|
|
}
|
|
delete(self.fetchingBlocks, blockNum)
|
|
}
|
|
|
|
func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) {
|
|
self.lock.Lock()
|
|
defer self.lock.Unlock()
|
|
if self.fetchingBlocks[blockNum] == true {
|
|
log.Panicf("shouldn't happen")
|
|
}
|
|
if self.fetchingBlocks == nil {
|
|
self.fetchingBlocks = make(map[BlockNumber]bool)
|
|
}
|
|
if self.fetchingBlocks[blockNum] == false {
|
|
self.fetchingBlocks[blockNum] = true
|
|
}
|
|
}
|
|
|
|
func (self *BlockFetcher) fetcher(index uint) {
|
|
log.Debugf("fetcher thread %d starting", index)
|
|
|
|
WorkLoop:
|
|
for blockNum := range self.workChannel {
|
|
log.Debugf("fetcher %d beginning fetch for block %d", index, blockNum)
|
|
self.addFetchingBlock(blockNum)
|
|
tries := 3
|
|
for i := 0; i < tries; i++ {
|
|
r := self.fetchBlockOpsFromNetwork(blockNum)
|
|
if r.error == nil {
|
|
// it worked, return result
|
|
self.resultsChannel <- r
|
|
continue WorkLoop
|
|
} else {
|
|
// wait a sec and try again
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
}
|
|
}
|
|
log.Debugf("fetcher thread %d ending", index)
|
|
}
|
|
|
|
func (self *BlockFetcher) sendWork(b BlockNumber) {
|
|
go func() {
|
|
// yay cheap goroutines, let them block on the unbuffered channel
|
|
log.Debugf("waiting to send blockNum %d into the work channel", b)
|
|
self.workChannel <- b
|
|
log.Debugf("sent blockNum %d into the work channel", b)
|
|
}()
|
|
}
|
|
|
|
func (self *BlockFetcher) fetch() *[]FetchedBlock {
|
|
log.Debugf("blockfetcher beginning fetch")
|
|
for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ {
|
|
go self.fetcher(i)
|
|
}
|
|
|
|
self.lock.Lock()
|
|
for blockNum := range self.wantBlocks {
|
|
self.sendWork(blockNum)
|
|
}
|
|
self.lock.Unlock()
|
|
|
|
// now we have to start reading from the unbuffered resultsChannel
|
|
// otherwise the workers will block when returning results
|
|
for {
|
|
select {
|
|
case result := <-self.resultsChannel:
|
|
self.receiveResult(result)
|
|
default:
|
|
if self.Done() == true {
|
|
log.Infof("blockfetcher %+v considers itself Done()", self)
|
|
// if we get here, it's because workList is now empty and there
|
|
// are no more results in the results channel.
|
|
close(self.workChannel) // shut down the workers
|
|
var final []FetchedBlock
|
|
self.lock.Lock()
|
|
for _, value := range self.fetchedBlocks {
|
|
final = append(final, *value)
|
|
}
|
|
self.lock.Unlock()
|
|
self = nil //this BlockFetcher is now finished.
|
|
return &final
|
|
}
|
|
// in this case we are not done but got nothing from the result
|
|
// channel so just wait a little bit to get more results and
|
|
// check the channel again
|
|
time.Sleep(10 * time.Millisecond)
|
|
//FIXME(sneak) we maybe need to handle a case here where wantBlocks never
|
|
//empties but workers need to be re-dispatched..
|
|
}
|
|
}
|
|
log.Panicf("this shouldn't happen")
|
|
return nil //shouldn't happen, return should happen from above
|
|
}
|
|
|
|
func (self *BlockFetcher) receiveResult(r *FetchedBlock) {
|
|
log.Debugf("got result for blocknum %d", r.blockNumber)
|
|
self.removeFetchingBlock(r.blockNumber)
|
|
self.lock.Lock()
|
|
self.fetchedBlocks[r.blockNumber] = r
|
|
self.lock.Unlock()
|
|
self.removeWantBlock(r.blockNumber)
|
|
}
|
|
|
|
func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *FetchedBlock {
|
|
result := &FetchedBlock{
|
|
blockNumber: blockNum,
|
|
error: nil,
|
|
blockData: nil,
|
|
}
|
|
r, err := self.api.GetOpsInBlock(blockNum)
|
|
if err != nil {
|
|
result.error = &err
|
|
return result
|
|
}
|
|
bytes, err := json.Marshal(r)
|
|
if err != nil {
|
|
result.error = &err
|
|
return result
|
|
}
|
|
count := len(*r)
|
|
log.Infof("got %d operations for block %d fetched from network", count, blockNum)
|
|
result.blockData = &bytes
|
|
result.error = nil // make sure this is nil if it worked
|
|
return result
|
|
}
|