Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
steem-block-db/blockfetcher.go

217 rindas
6.0 KiB

package main
import "encoding/json"
import "sync"
import "time"
import log "github.com/sirupsen/logrus"
type FetchedBlock struct {
blockNumber BlockNumber
error *error
blockData *[]byte
}
type BlockFetcher struct {
api *SteemAPI
desiredFetcherThreads uint
wantBlocks map[BlockNumber]bool
fetchingBlocks map[BlockNumber]bool
fetchedBlocks map[BlockNumber]*FetchedBlock
lock *sync.Mutex
workChannel chan BlockNumber
resultsChannel chan *FetchedBlock
}
type BlockFetcherConfig struct {
api *SteemAPI
desiredFetcherThreads uint
startBlock BlockNumber
endBlock BlockNumber
}
func NewBlockFetcher(config *BlockFetcherConfig) *BlockFetcher {
self := new(BlockFetcher)
self.init(config)
return self
}
func (self *BlockFetcher) Done() bool {
self.lock.Lock()
defer self.lock.Unlock()
return len(self.wantBlocks) == 0
}
func (self *BlockFetcher) init(config *BlockFetcherConfig) {
self.lock = &sync.Mutex{}
self.api = config.api
self.desiredFetcherThreads = config.desiredFetcherThreads
self.workChannel = make(chan BlockNumber)
self.resultsChannel = make(chan *FetchedBlock)
self.wantBlocks = make(map[BlockNumber]bool)
self.fetchedBlocks = make(map[BlockNumber]*FetchedBlock)
diff := int(uint(config.endBlock) - uint(config.startBlock))
for i := 0; i <= diff; i++ {
self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true
}
log.Debugf("wantblocks[] is now %v", self.wantBlocks)
}
func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) {
self.lock.Lock()
defer self.lock.Unlock()
if self.wantBlocks[blockNum] == false {
log.Panicf("shouldn't happen")
}
delete(self.wantBlocks, blockNum)
}
func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) {
self.lock.Lock()
defer self.lock.Unlock()
if self.fetchingBlocks[blockNum] == false {
log.Panicf("shouldn't happen")
}
delete(self.fetchingBlocks, blockNum)
}
func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) {
self.lock.Lock()
defer self.lock.Unlock()
if self.fetchingBlocks[blockNum] == true {
log.Panicf("shouldn't happen")
}
if self.fetchingBlocks == nil {
self.fetchingBlocks = make(map[BlockNumber]bool)
}
if self.fetchingBlocks[blockNum] == false {
self.fetchingBlocks[blockNum] = true
}
}
func (self *BlockFetcher) fetcher(index uint) {
log.Debugf("fetcher thread %d starting", index)
WorkLoop:
for blockNum := range self.workChannel {
log.Debugf("fetcher %d beginning fetch for block %d", index, blockNum)
self.addFetchingBlock(blockNum)
tries := 3
for i := 0; i < tries; i++ {
r := self.fetchBlockOpsFromNetwork(blockNum)
if r.error == nil {
// it worked, return result
self.resultsChannel <- r
continue WorkLoop
} else {
// wait a sec and try again
time.Sleep(1 * time.Second)
log.Infof("error fetching block %d, retry %d (%v)", blockNum, i+1, r.error)
}
}
}
log.Debugf("fetcher thread %d ending", index)
}
func (self *BlockFetcher) sendWork(b BlockNumber) {
go func() {
// yay cheap goroutines, let them block on the unbuffered channel
log.Debugf("waiting to send blockNum %d into the work channel", b)
self.workChannel <- b
log.Debugf("sent blockNum %d into the work channel", b)
}()
}
func (self *BlockFetcher) fetch() *[]FetchedBlock {
log.Debugf("blockfetcher beginning fetch")
startTime := time.Now()
for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ {
go self.fetcher(i)
}
self.lock.Lock()
for blockNum := range self.wantBlocks {
self.sendWork(blockNum)
}
self.lock.Unlock()
// now we have to start reading from the unbuffered resultsChannel
// otherwise the workers will block when returning results
for {
select {
case result := <-self.resultsChannel:
self.receiveResult(result)
default:
if startTime.Add(30 * time.Second).Before(time.Now()) {
// fetch took too long, return anyway
log.Infof("30s fetcher batch timeout reached, but not done fetching")
// return what we have
var final []FetchedBlock
self.lock.Lock()
for _, value := range self.fetchedBlocks {
final = append(final, *value)
}
self.lock.Unlock()
self = nil //this BlockFetcher is now finished.
return &final
}
if self.Done() == true {
log.Infof("blockfetcher %+v considers itself Done()", self)
// if we get here, it's because workList is now empty and there
// are no more results in the results channel.
close(self.workChannel) // shut down the workers
var final []FetchedBlock
self.lock.Lock()
for _, value := range self.fetchedBlocks {
final = append(final, *value)
}
self.lock.Unlock()
self = nil //this BlockFetcher is now finished.
return &final
}
// in this case we are not done but got nothing from the result
// channel so just wait a little bit to get more results and
// check the channel again
time.Sleep(10 * time.Millisecond)
//FIXME(sneak) we maybe need to handle a case here where wantBlocks never
//empties but workers need to be re-dispatched..
}
}
log.Panicf("this shouldn't happen")
return nil //shouldn't happen, return should happen from above
}
func (self *BlockFetcher) receiveResult(r *FetchedBlock) {
log.Debugf("got result for blocknum %d", r.blockNumber)
self.removeFetchingBlock(r.blockNumber)
self.lock.Lock()
self.fetchedBlocks[r.blockNumber] = r
self.lock.Unlock()
self.removeWantBlock(r.blockNumber)
}
func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *FetchedBlock {
result := &FetchedBlock{
blockNumber: blockNum,
error: nil,
blockData: nil,
}
r, err := self.api.GetOpsInBlock(blockNum)
if err != nil {
result.error = &err
return result
}
bytes, err := json.Marshal(r)
if err != nil {
result.error = &err
return result
}
count := len(*r)
log.Infof("got %d operations for block %d fetched from network", count, blockNum)
result.blockData = &bytes
result.error = nil // make sure this is nil if it worked
return result
}