diff --git a/app.go b/app.go index a379d70..121f590 100644 --- a/app.go +++ b/app.go @@ -1,19 +1,14 @@ package main -type FetchedBlock struct { - blockNumber BlockNumber - data []byte -} +import "sync" +import log "github.com/sirupsen/logrus" +import "encoding/json" type App struct { datastore SteemDataStorer api *SteemAPI currentNetworkBlockHeight BlockNumber currentLocalBlockHeight BlockNumber - desiredFetcherThreads uint - wantBlocks []BlockNumber - fetchingBlocks []BlockNumber - fetchedBlocks *[]FetchedBlock lock *sync.Mutex } @@ -33,7 +28,6 @@ func (self *App) init(config *appconfig) { log.SetLevel(config.logLevel) self.api = NewSteemAPI(config.apiUrl) self.datastore = NewSteemDataStore(config.redisUrl) - self.desiredFetcherThreads = config.fetcherThreads self.lock = &sync.Mutex{} } @@ -49,9 +43,10 @@ func (self *App) updateCurrentBlockHeight() { func (self *App) main() { log.Infof("steem block data fetcher starting up...") - self.mainloop() + //self.mainloop() } +/* func (self *App) numFetchers() uint { self.lock.Lock() defer self.lock.Unlock() @@ -141,20 +136,3 @@ func (self *App) fetchCurrentBlockHeight() BlockNumber { } return r.LastIrreversibleBlockNum } - -func (self *App) fetchBlockOps(blockNum BlockNumber) *[]byte { - r, err := self.api.GetOpsInBlock(blockNum) - if err != nil { - // just retry on error - // sloppy, but works - return self.fetchBlockOps(blockNum) - } - bytes, err := json.Marshal(r) - if err != nil { - panic(err) - } - count := len(*r) - log.Infof("got %d operations for block %d", count, blockNum) - return &bytes - //self.datastore.writeBlockOps(blockNum, bytes) -} diff --git a/blockfetcher.go b/blockfetcher.go new file mode 100644 index 0000000..9f52770 --- /dev/null +++ b/blockfetcher.go @@ -0,0 +1,172 @@ +package main + +import "sync" +import log "github.com/sirupsen/logrus" + +type FetchedBlock struct { + blockNumber BlockNumber + error *error + blockData *[]byte +} + +type BlockFetcher struct { + api *SteemAPI + desiredFetcherThreads uint + wantBlocks map[BlockNumber]bool + fetchingBlocks map[BlockNumber]bool + fetchedBlocks *[]FetchedBlock + lock *sync.Mutex + workChannel chan BlockNumber + resultsChannel chan *FetchedBlock +} + +type BlockFetcherConfig struct { + api *SteemAPI + desiredFetcherThreads uint + startBlock BlockNumber + endBlock BlockNumber +} + +func NewBlockFetcher(config *BlockFetcherConfig) *BlockFetcher { + self := new(BlockFetcher) + self.init(config) + return self +} + +func (self *BlockFetcher) Done() bool { + self.lock.Lock() + defer self.lock.Unlock() + return len(self.wantBlocks) == 0 +} + +func (self *BlockFetcher) init(config *BlockFetcherConfig) { + self.lock = &sync.Mutex{} + self.api = config.api + self.desiredFetcherThreads = config.desiredFetcherThreads + + self.workChannel = make(chan BlockNumber) + self.resultsChannel = make(chan *FetchedBlock) + diff := int(uint(config.endBlock) - uint(config.startBlock)) + log.Debugf("diff is %d", diff) + for i := 0; i <= diff; i++ { + self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true + } + log.Debugf("wantblocks[] is now %v", self.wantBlocks) +} + +func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) { + if self.wantBlocks[blockNum] == false { + log.Panicf("shouldn't happen") + } + self.lock.Lock() + defer self.lock.Unlock() + delete(self.wantBlocks, blockNum) +} + +func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) { + if self.fetchingBlocks[blockNum] == false { + log.Panicf("shouldn't happen") + } + self.lock.Lock() + defer self.lock.Unlock() + delete(self.fetchingBlocks, blockNum) +} + +func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) { + if self.fetchingBlocks[blockNum] == true { + log.Panicf("shouldn't happen") + } + self.lock.Lock() + defer self.lock.Unlock() + if self.fetchingBlocks[blockNum] == false { + self.fetchingBlocks[blockNum] = true + } +} + +func (self *BlockFetcher) fetcher(index int) { + log.Debugf("fetcher thread %d starting", index) + +WorkLoop: + for blockNum := range self.workChannel { + log.Debugf("fetcher %d beginning fetch for block %d", index, blockNum) + self.addFetchingBlock(blockNum) + tries := 3 + for i := 0; i < tries; i++ { + r := self.fetchBlockOpsFromNetwork(blockNum) + if r.error == nil { + // it worked, return result + self.resultsChannel <- r + continue WorkLoop + } else { + // wait a sec and try again + time.Sleep(1 * time.Second) + } + } + } + log.Debugf("fetcher thread %d ending", index) +} + +func (self *BlockFetcher) fetch() *[]FetchedBlock { + for i := 1; i < self.desiredFetcherThreads+1; i++ { + go self.fetcher(i) + } + for blockNum, _ := range self.wantBlocks { + // yay cheap goroutines, let them block on the unbuffered channel + go func() { + log.Debugf("waiting to send blockNum %d into the work channel", blockNum) + self.workChannel <- blockNum + log.Debugf("sent blockNum %d into the work channel", blockNum) + }() + } + + // now we have to start reading from the unbuffered resultsChannel + // otherwise the workers will block when returning results + select { + case result := <-self.resultsChannel: + self.receiveResult(result) + default: + if self.Done() == true { + // if we get here, it's because workList is now empty and there + // are no more results in the results channel. + close(self.workChannel) // shut down the workers + result := self.fetchedBlocks + self = nil //this BlockFetcher is now finished. + return result + } + time.Sleep(10 * time.Millisecond) + //FIXME(sneak) we maybe need to handle a case here where wantBlocks never + //empties but workers need to be re-dispatched.. + } +} + +func (self *BlockFetcher) receiveResult(r *FetchedBlock) { + log.Debugf("got result for blocknum %d", r.blockNumber) + self.removeFetchingBlock(r.blockNumber) + self.lock.Lock() + self.fetchedBlocks = append(self.fetchedBlocks, r) + self.lock.Unlock() + self.removeWantBlock(r.blockNumber) +} + +func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *FetchedBlock { + result := &FetchedBlock{ + blockNumber: blockNum, + error: nil, + blockData: nil, + } + r, err := self.api.GetOpsInBlock(blockNum) + if err != nil { + result.error = err + return result + } + bytes, err := json.Marshal(r) + if err != nil { + result.error = err + return result + } + count := len(*r) + log.Infof("got %d operations for block %d", count, blockNum) + result.blockData = &bytes + result.error = nil // make sure this is nil if it worked + return result +} diff --git a/main.go b/main.go index 8145ee9..8ea0f29 100644 --- a/main.go +++ b/main.go @@ -2,10 +2,6 @@ package main //import "github.com/spf13/viper" //import "encoding/json" -//import "fmt" -import "sync" -import "time" -import "encoding/json" import log "github.com/sirupsen/logrus" const steemAPIURL = "https://api.steemit.com" @@ -15,6 +11,19 @@ const redisUrl = "localhost:6379" //const steemAPIURL = "http://las2.local:8090" func main() { + log.SetLevel(log.DebugLevel) + var x *BlockFetcher + x = NewBlockFetcher(&BlockFetcherConfig{ + api: nil, + desiredFetcherThreads: 40, + startBlock: 10000, + endBlock: 10005, + }) + _ = x +} + +/* +func mainx() { app := NewApp(&appconfig{ logLevel: log.DebugLevel, apiUrl: steemAPIURL, @@ -23,3 +32,5 @@ func main() { }) app.main() } + +*/ diff --git a/steemapi.go b/steemapi.go index f97959d..48b2f63 100644 --- a/steemapi.go +++ b/steemapi.go @@ -1,7 +1,8 @@ package main import "encoding/json" -import log "github.com/sirupsen/logrus" + +//import log "github.com/sirupsen/logrus" type SteemAPI struct { url string @@ -27,46 +28,29 @@ func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI { } func (self *SteemAPI) GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) { - var resp DynamicGlobalProperties - raw, err := self.rpc.Call("get_dynamic_global_properties", EmptyParamsRaw) - if err != nil { return nil, err } - json.Unmarshal(raw, &resp) - return &resp, nil } func (self *SteemAPI) GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse, error) { - - // first fetch virtual ops - vOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: true} - vop, err := vOpsParams.MarshalJSON() - vOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", vop) - if err != nil { - return nil, err - } - - var result []OperationObject - err = json.Unmarshal(vOpsResponse, &result) - if err != nil { - return nil, err - } - - // result is now populated with vops, now get real ops + // i was mistaken, i thought the second param == true meant only + // virtualops, and == false meant only non-virtualops. turns out the + // arg should be named "excludenonvirtualops", as setting it to false + // returns both real ops *and* virtual ops in a single call. not sure if + // this was always the case, but it is as of 20181101 against + // api.steemit.com. realOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: false} rop, err := realOpsParams.MarshalJSON() realOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop) - var secondResult []OperationObject - - err = json.Unmarshal(realOpsResponse, &secondResult) + var result []OperationObject + err = json.Unmarshal(realOpsResponse, &result) if err != nil { return nil, err } - result = append(result, secondResult...) return &result, nil }