From df7f53e7d626dd9b621bf96f0fa0717aee594861 Mon Sep 17 00:00:00 2001 From: Jeffrey Paul Date: Wed, 31 Oct 2018 04:05:25 -0700 Subject: [PATCH] tweaks --- db.go | 17 ++++++++++------- main.go | 23 ++++++++++++++++------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/db.go b/db.go index 03b9307..f0601da 100644 --- a/db.go +++ b/db.go @@ -6,7 +6,8 @@ import log "github.com/sirupsen/logrus" import "fmt" import "strconv" -const appPrefix = "steem-block-fetcher" +//steem block fetcher +const appPrefix = "sbf" type SteemDataStorer interface { SetCurrentBlockHeight() error @@ -28,7 +29,7 @@ func NewSteemDataStore(dir string) *SteemDataStore { } func (self *SteemDataStore) ForceSetCurrentBlockHeight(blockNum BlockNumber) error { - keyname := fmt.Sprintf("%s:global:CurrentBlockHeight", appPrefix) + keyname := fmt.Sprintf("%s.global.CurrentBlockHeight", appPrefix) value := fmt.Sprintf("%d", blockNum) return self.kv.Put(&keyname, &value) } @@ -37,7 +38,7 @@ func (self *SteemDataStore) ForceSetCurrentBlockHeight(blockNum BlockNumber) err // and updates the memo in the db func (self *SteemDataStore) SetCurrentBlockHeight() error { nextVal := self.FindHighestContiguousBlockInDb(self.CurrentBlockHeight()) - keyname := fmt.Sprintf("%s:global:CurrentBlockHeight", appPrefix) + keyname := fmt.Sprintf("%s.global.CurrentBlockHeight", appPrefix) value := fmt.Sprintf("%d", nextVal) log.Infof("updating our current highest block in db to %d", nextVal) return self.kv.Put(&keyname, &value) @@ -51,7 +52,7 @@ func (self *SteemDataStore) FindHighestContiguousBlockInDb(from BlockNumber) Blo for { try = BlockNumber(uint64(last) + 1) - keyname = fmt.Sprintf("%s:BlockOps:%d", appPrefix, try) + keyname = fmt.Sprintf("%s.ops_in_block.%d", appPrefix, try) exists, _ := self.kv.Exists(&keyname) if exists == false { log.Debugf("cannot find block %d in db, highest found is %d", try, last) @@ -63,17 +64,19 @@ func (self *SteemDataStore) FindHighestContiguousBlockInDb(from BlockNumber) Blo } func (self *SteemDataStore) StoreBlockOps(blockNum BlockNumber, blockOps *[]byte) error { - keyname := fmt.Sprintf("%s:BlockOps:%d", appPrefix, blockNum) + keyname := fmt.Sprintf("%s.ops_in_block.%d", appPrefix, blockNum) value := string(*blockOps) return self.kv.Put(&keyname, &value) } func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool { - panic("unimplemented") + keyname := fmt.Sprintf("%s.ops_in_block.%d", appPrefix, blockNum) + exists, _ := self.kv.Exists(&keyname) + return exists } func (self *SteemDataStore) CurrentBlockHeight() BlockNumber { - keyname := fmt.Sprintf("%s:global:CurrentBlockHeight", appPrefix) + keyname := fmt.Sprintf("%s.global.CurrentBlockHeight", appPrefix) val, err := self.kv.Get(&keyname) if err != nil { // assume this is key not found, initialize key to default diff --git a/main.go b/main.go index ffbd187..de48bff 100644 --- a/main.go +++ b/main.go @@ -45,7 +45,7 @@ func NewApp(config *appconfig) *App { func (self *App) init(config *appconfig) { self.api = NewSteemAPI(steemAPIURL) self.datastore = NewSteemDataStore("./d") - self.desiredFetchingThreads = 20 + self.desiredFetchingThreads = 40 self.currentFetchingThreads = 0 self.lock = &sync.Mutex{} } @@ -84,12 +84,16 @@ func (self *App) decrFetchers() { } func (self *App) spawnNewFetcher(blockNum BlockNumber) { - log.Infof("spawning fetcher for block %d", blockNum) + log.Debugf("spawning fetcher for block %d", blockNum) go func() { + // this is so hacky, make a queue like a grownup would you + time.Sleep(100 * time.Millisecond) + if self.datastore.HaveOpsForBlock(blockNum) { + log.Infof("already have ops for block %d, not re-fetching", blockNum) + return + } self.incrFetchers() self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum)) - self.datastore.SetCurrentBlockHeight() - time.Sleep(250 * time.Millisecond) self.decrFetchers() }() @@ -113,10 +117,11 @@ func (self *App) mainloop() { log.Infof("using %d fetching threads", self.desiredFetchingThreads) for { self.updateCurrentBlockHeight() + log.Infof("current number of active fetchers: %d", self.numFetchers()) + time.Sleep(1500 * time.Millisecond) + self.datastore.SetCurrentBlockHeight() localHeight := self.datastore.CurrentBlockHeight() log.Infof("our highest fetched block height is %d", localHeight) - log.Infof("current number of active fetchers: %d", self.numFetchers()) - time.Sleep(1 * time.Second) if localHeight < self.currentNetworkBlockHeight { // we need to fetch some blocks from the network avail := self.desiredFetchingThreads - self.numFetchers() @@ -148,12 +153,16 @@ func (self *App) fetchCurrentBlockHeight() BlockNumber { func (self *App) fetchBlockOps(blockNum BlockNumber) *[]byte { r, err := self.api.GetOpsInBlock(blockNum) if err != nil { - panic(err) + // just retry on error + // sloppy, but works + return self.fetchBlockOps(blockNum) } bytes, err := json.Marshal(r) if err != nil { panic(err) } + count := len(*r) + log.Infof("got %d operations for block %d", count, blockNum) return &bytes //self.datastore.writeBlockOps(blockNum, bytes) }