From 33b759d15f79ed281a1f6914373091506ffe2d67 Mon Sep 17 00:00:00 2001 From: Jeffrey Paul Date: Wed, 31 Oct 2018 20:35:30 -0700 Subject: [PATCH] it is now broken --- app.go | 160 +++++++++++++++++++++++++++++++++++++++++++++++++++ db.go | 25 +++----- kvstore.go | 4 +- main.go | 163 ++++------------------------------------------------ steemapi.go | 9 +-- types.go | 13 +++-- 6 files changed, 189 insertions(+), 185 deletions(-) create mode 100644 app.go diff --git a/app.go b/app.go new file mode 100644 index 0000000..a379d70 --- /dev/null +++ b/app.go @@ -0,0 +1,160 @@ +package main + +type FetchedBlock struct { + blockNumber BlockNumber + data []byte +} + +type App struct { + datastore SteemDataStorer + api *SteemAPI + currentNetworkBlockHeight BlockNumber + currentLocalBlockHeight BlockNumber + desiredFetcherThreads uint + wantBlocks []BlockNumber + fetchingBlocks []BlockNumber + fetchedBlocks *[]FetchedBlock + lock *sync.Mutex +} + +type appconfig struct { + logLevel log.Level + apiUrl string + redisUrl string +} + +func NewApp(config *appconfig) *App { + self := new(App) + self.init(config) + return self +} + +func (self *App) init(config *appconfig) { + log.SetLevel(config.logLevel) + self.api = NewSteemAPI(config.apiUrl) + self.datastore = NewSteemDataStore(config.redisUrl) + self.desiredFetcherThreads = config.fetcherThreads + self.lock = &sync.Mutex{} +} + +func (self *App) updateCurrentBlockHeight() { + h := self.fetchCurrentBlockHeight() + if h > self.currentNetworkBlockHeight { + self.lock.Lock() + defer self.lock.Unlock() + self.currentNetworkBlockHeight = h + log.Infof("current block height is now %d", self.currentNetworkBlockHeight) + } +} + +func (self *App) main() { + log.Infof("steem block data fetcher starting up...") + self.mainloop() +} + +func (self *App) numFetchers() uint { + self.lock.Lock() + defer self.lock.Unlock() + return uint(len(*self.fetchingBlocks)) +} + +func (self *App) spawnNewFetcher(blockNum BlockNumber) { + log.Debugf("spawning fetcher for block %d", blockNum) + go func() { + // this is so hacky, make a queue like a grownup would you + time.Sleep(100 * time.Millisecond) + if self.datastore.HaveOpsForBlock(blockNum) { + log.Infof("already have ops for block %d, not re-fetching", blockNum) + return + } + //self.incrFetchers() + self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum)) + //self.decrFetchers() + + }() +} + +func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) { + self.datastore.StoreBlockOps(blockNum, blockOps) +} + +// note that parallelFetchAndStoreBlocks does not respect the +// limitation on number of desired fetchers, that is for the caller +func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) { + var diff = uint64(end) - uint64(start) + var i uint64 + for i = 0; i < diff; i++ { + self.spawnNewFetcher(BlockNumber(uint64(start) + i)) + } +} + +func (self *App) populateFetchers() { +} + +func (self *App) mainloop() { + + log.Infof("using %d fetching threads", self.desiredFetchingThreads) + + self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() + + for { + self.spawnMoreFetchers() + self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() + time.Sleep(1000 * time.Millisecond) + } +} + +func (self *App) spawnMoreFetchers() { +} + +/* + self.updateCurrentBlockHeight() + log.Infof("current number of active fetchers: %d", self.numFetchers()) + time.Sleep(1500 * time.Millisecond) + self.datastore.SetCurrentBlockHeight() + localHeight := self.datastore.CurrentBlockHeight() + log.Infof("our highest fetched block height is %d", localHeight) + if localHeight < self.currentNetworkBlockHeight { + // we need to fetch some blocks from the network + avail := self.desiredFetchingThreads - self.numFetchers() + diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight) + log.Infof("we need to fetch %d blocks", diff) + if uint64(diff) > uint64(avail) { + self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail))) + } else { + // just spawn fetchers for the blocks we don't have + // spawning will update the number of running fetchers + self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight) + } + } + //needFetchers := self.desiredFetchingThreads - self.numFetchers() + +*/ + +func (self *App) fetchCurrentBlockHeight() BlockNumber { + r, err := self.api.GetDynamicGlobalProperties() + if err != nil { + log.Panicf("can't fetch global properties, bailing. err: %s", err) + } + if r.LastIrreversibleBlockNum < 100 { + log.Panicf("can't fetch global properties, bailing") + } + return r.LastIrreversibleBlockNum +} + +func (self *App) fetchBlockOps(blockNum BlockNumber) *[]byte { + r, err := self.api.GetOpsInBlock(blockNum) + if err != nil { + // just retry on error + // sloppy, but works + return self.fetchBlockOps(blockNum) + } + bytes, err := json.Marshal(r) + if err != nil { + panic(err) + } + count := len(*r) + log.Infof("got %d operations for block %d", count, blockNum) + return &bytes + //self.datastore.writeBlockOps(blockNum, bytes) +} diff --git a/db.go b/db.go index f0601da..3efbbf7 100644 --- a/db.go +++ b/db.go @@ -10,8 +10,7 @@ import "strconv" const appPrefix = "sbf" type SteemDataStorer interface { - SetCurrentBlockHeight() error - ForceSetCurrentBlockHeight(BlockNumber) error + SetCurrentBlockHeight(BlockNumber) error CurrentBlockHeight() BlockNumber HaveOpsForBlock(BlockNumber) bool StoreBlockOps(BlockNumber, *[]byte) error @@ -22,28 +21,18 @@ type SteemDataStore struct { kv KVStorer } -func NewSteemDataStore(dir string) *SteemDataStore { +func NewSteemDataStore(hostname string) SteemDataStorer { self := new(SteemDataStore) - self.kv = NewRedisKVStore() + self.kv = NewRedisKVStore(hostname) return self } -func (self *SteemDataStore) ForceSetCurrentBlockHeight(blockNum BlockNumber) error { - keyname := fmt.Sprintf("%s.global.CurrentBlockHeight", appPrefix) +func (self *SteemDataStore) SetCurrentBlockHeight(blockNum BlockNumber) error { + keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) value := fmt.Sprintf("%d", blockNum) return self.kv.Put(&keyname, &value) } -// this function searches for the highest contiguously stored blocknum -// and updates the memo in the db -func (self *SteemDataStore) SetCurrentBlockHeight() error { - nextVal := self.FindHighestContiguousBlockInDb(self.CurrentBlockHeight()) - keyname := fmt.Sprintf("%s.global.CurrentBlockHeight", appPrefix) - value := fmt.Sprintf("%d", nextVal) - log.Infof("updating our current highest block in db to %d", nextVal) - return self.kv.Put(&keyname, &value) -} - func (self *SteemDataStore) FindHighestContiguousBlockInDb(from BlockNumber) BlockNumber { last := from @@ -76,11 +65,11 @@ func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool { } func (self *SteemDataStore) CurrentBlockHeight() BlockNumber { - keyname := fmt.Sprintf("%s.global.CurrentBlockHeight", appPrefix) + keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) val, err := self.kv.Get(&keyname) if err != nil { // assume this is key not found, initialize key to default - self.ForceSetCurrentBlockHeight(0) + self.SetCurrentBlockHeight(0) // retry return self.CurrentBlockHeight() } diff --git a/kvstore.go b/kvstore.go index f1e5cb8..8c94e84 100644 --- a/kvstore.go +++ b/kvstore.go @@ -22,9 +22,9 @@ type RedisKVStore struct { rc *redis.Client } -func NewRedisKVStore() *RedisKVStore { +func NewRedisKVStore(hostname string) *RedisKVStore { rkvs := new(RedisKVStore) - rkvs.Open("localhost:6379") //FIXME(sneak) use viper + rkvs.Open(hostname) //FIXME(sneak) use viper return rkvs } diff --git a/main.go b/main.go index de48bff..8145ee9 100644 --- a/main.go +++ b/main.go @@ -9,160 +9,17 @@ import "encoding/json" import log "github.com/sirupsen/logrus" const steemAPIURL = "https://api.steemit.com" +const redisUrl = "localhost:6379" + +//const steemAPIURL = "http://10.100.202.175:8090" +//const steemAPIURL = "http://las2.local:8090" func main() { - processinit() - app := NewApp(&appconfig{}) + app := NewApp(&appconfig{ + logLevel: log.DebugLevel, + apiUrl: steemAPIURL, + redisUrl: redisUrl, + fetcherThreads: 40, + }) app.main() } - -func processinit() { - //FIXME(sneak) use viper to set loglevel - //log.SetLevel(log.DebugLevel) - log.SetLevel(log.InfoLevel) -} - -type BlockNumber uint64 - -type App struct { - datastore SteemDataStorer - api *SteemAPI - currentNetworkBlockHeight BlockNumber - currentLocalBlockHeight BlockNumber - currentFetchingThreads uint - desiredFetchingThreads uint - lock *sync.Mutex -} - -type appconfig map[string]string - -func NewApp(config *appconfig) *App { - self := new(App) - self.init(config) - return self -} - -func (self *App) init(config *appconfig) { - self.api = NewSteemAPI(steemAPIURL) - self.datastore = NewSteemDataStore("./d") - self.desiredFetchingThreads = 40 - self.currentFetchingThreads = 0 - self.lock = &sync.Mutex{} -} - -func (self *App) updateCurrentBlockHeight() { - h := self.fetchCurrentBlockHeight() - if h > self.currentNetworkBlockHeight { - self.lock.Lock() - defer self.lock.Unlock() - self.currentNetworkBlockHeight = h - log.Infof("current block height is now %d", self.currentNetworkBlockHeight) - } -} - -func (self *App) main() { - log.Infof("steem block data fetcher starting up...") - self.mainloop() -} - -func (self *App) numFetchers() uint { - self.lock.Lock() - defer self.lock.Unlock() - return self.currentFetchingThreads -} - -func (self *App) incrFetchers() { - self.lock.Lock() - defer self.lock.Unlock() - self.currentFetchingThreads += 1 -} - -func (self *App) decrFetchers() { - self.lock.Lock() - defer self.lock.Unlock() - self.currentFetchingThreads -= 1 -} - -func (self *App) spawnNewFetcher(blockNum BlockNumber) { - log.Debugf("spawning fetcher for block %d", blockNum) - go func() { - // this is so hacky, make a queue like a grownup would you - time.Sleep(100 * time.Millisecond) - if self.datastore.HaveOpsForBlock(blockNum) { - log.Infof("already have ops for block %d, not re-fetching", blockNum) - return - } - self.incrFetchers() - self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum)) - self.decrFetchers() - - }() -} - -func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) { - self.datastore.StoreBlockOps(blockNum, blockOps) -} - -// note that parallelFetchAndStoreBlocks does not respect the -// limitation on number of desired fetchers, that is for the caller -func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) { - var diff = uint64(end) - uint64(start) - var i uint64 - for i = 0; i < diff; i++ { - self.spawnNewFetcher(BlockNumber(uint64(start) + i)) - } -} - -func (self *App) mainloop() { - log.Infof("using %d fetching threads", self.desiredFetchingThreads) - for { - self.updateCurrentBlockHeight() - log.Infof("current number of active fetchers: %d", self.numFetchers()) - time.Sleep(1500 * time.Millisecond) - self.datastore.SetCurrentBlockHeight() - localHeight := self.datastore.CurrentBlockHeight() - log.Infof("our highest fetched block height is %d", localHeight) - if localHeight < self.currentNetworkBlockHeight { - // we need to fetch some blocks from the network - avail := self.desiredFetchingThreads - self.numFetchers() - diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight) - log.Infof("we need to fetch %d blocks", diff) - if uint64(diff) > uint64(avail) { - self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail))) - } else { - // just spawn fetchers for the blocks we don't have - // spawning will update the number of running fetchers - self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight) - } - } - //needFetchers := self.desiredFetchingThreads - self.numFetchers() - } -} - -func (self *App) fetchCurrentBlockHeight() BlockNumber { - r, err := self.api.GetDynamicGlobalProperties() - if err != nil { - panic("can't fetch global properties, bailing") - } - if r.LastIrreversibleBlockNum < 100 { - panic("can't fetch global properties, bailing") - } - return r.LastIrreversibleBlockNum -} - -func (self *App) fetchBlockOps(blockNum BlockNumber) *[]byte { - r, err := self.api.GetOpsInBlock(blockNum) - if err != nil { - // just retry on error - // sloppy, but works - return self.fetchBlockOps(blockNum) - } - bytes, err := json.Marshal(r) - if err != nil { - panic(err) - } - count := len(*r) - log.Infof("got %d operations for block %d", count, blockNum) - return &bytes - //self.datastore.writeBlockOps(blockNum, bytes) -} diff --git a/steemapi.go b/steemapi.go index f385132..f97959d 100644 --- a/steemapi.go +++ b/steemapi.go @@ -1,14 +1,11 @@ package main -import ( - "encoding/json" - log "github.com/sirupsen/logrus" -) +import "encoding/json" +import log "github.com/sirupsen/logrus" type SteemAPI struct { url string rpc *JSONRPC - log *log.Logger } var EmptyParams = []string{} @@ -22,8 +19,6 @@ func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI { rpc: rpc, } - self.log = log.New() - for _, option := range options { option(self) } diff --git a/types.go b/types.go index 91888a3..aa024bb 100644 --- a/types.go +++ b/types.go @@ -3,6 +3,8 @@ package main import "encoding/json" import "github.com/joeshaw/iso8601" +type BlockNumber uint64 + type OperationObject struct { BlockNumber BlockNumber `json:"block"` OpInTx int `json:"op_in_trx"` @@ -18,6 +20,11 @@ type GetOpsInBlockRequestParams struct { VirtualOps bool } +func (r *GetOpsInBlockRequestParams) MarshalJSON() ([]byte, error) { + arr := []interface{}{r.BlockNum, r.VirtualOps} + return json.Marshal(arr) +} + type DynamicGlobalProperties struct { ConfidentialSbdSupply string `json:"confidential_sbd_supply"` ConfidentialSupply string `json:"confidential_supply"` @@ -51,9 +58,5 @@ type DynamicGlobalProperties struct { } type GetOpsInBlockResponse *[]OperationObject -type GetDynamicGlobalPropertiesResponse *DynamicGlobalProperties -func (r *GetOpsInBlockRequestParams) MarshalJSON() ([]byte, error) { - arr := []interface{}{r.BlockNum, r.VirtualOps} - return json.Marshal(arr) -} +type GetDynamicGlobalPropertiesResponse *DynamicGlobalProperties