diff --git a/Makefile b/Makefile index 483138d..402ca8a 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,19 @@ -default: run +GOFILES := $(shell find . -type f -name '*.go' -not -name '*_test.go') + +default: test + +.PHONY: run build test run: - go run *.go + go run $(GOFILES) + +build: steem-block-db + +steem-block-db: *.go + go build + +test: + go test + +clean: + rm steem-block-db diff --git a/app.go b/app.go index 121f590..89f6223 100644 --- a/app.go +++ b/app.go @@ -1,21 +1,23 @@ package main import "sync" +import "time" import log "github.com/sirupsen/logrus" -import "encoding/json" + +//import "encoding/json" type App struct { - datastore SteemDataStorer - api *SteemAPI - currentNetworkBlockHeight BlockNumber - currentLocalBlockHeight BlockNumber - lock *sync.Mutex + api SteemAPIShape + config *appconfig + db SteemDataStorer + lock *sync.Mutex } type appconfig struct { - logLevel log.Level - apiUrl string - redisUrl string + logLevel log.Level + apiUrl string + redisUrl string + desiredFetcherThreads uint } func NewApp(config *appconfig) *App { @@ -27,106 +29,71 @@ func NewApp(config *appconfig) *App { func (self *App) init(config *appconfig) { log.SetLevel(config.logLevel) self.api = NewSteemAPI(config.apiUrl) - self.datastore = NewSteemDataStore(config.redisUrl) + self.db = NewSteemDataStore(config.redisUrl) self.lock = &sync.Mutex{} -} - -func (self *App) updateCurrentBlockHeight() { - h := self.fetchCurrentBlockHeight() - if h > self.currentNetworkBlockHeight { - self.lock.Lock() - defer self.lock.Unlock() - self.currentNetworkBlockHeight = h - log.Infof("current block height is now %d", self.currentNetworkBlockHeight) - } + self.config = config } func (self *App) main() { log.Infof("steem block data fetcher starting up...") - //self.mainloop() -} - -/* -func (self *App) numFetchers() uint { - self.lock.Lock() - defer self.lock.Unlock() - return uint(len(*self.fetchingBlocks)) -} - -func (self *App) spawnNewFetcher(blockNum BlockNumber) { - log.Debugf("spawning fetcher for block %d", blockNum) - go func() { - // this is so hacky, make a queue like a grownup would you - time.Sleep(100 * time.Millisecond) - if self.datastore.HaveOpsForBlock(blockNum) { - log.Infof("already have ops for block %d, not re-fetching", blockNum) - return - } - //self.incrFetchers() - self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum)) - //self.decrFetchers() - - }() -} - -func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) { - self.datastore.StoreBlockOps(blockNum, blockOps) -} - -// note that parallelFetchAndStoreBlocks does not respect the -// limitation on number of desired fetchers, that is for the caller -func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) { - var diff = uint64(end) - uint64(start) - var i uint64 - for i = 0; i < diff; i++ { - self.spawnNewFetcher(BlockNumber(uint64(start) + i)) - } -} - -func (self *App) populateFetchers() { + self.mainloop() } func (self *App) mainloop() { - - log.Infof("using %d fetching threads", self.desiredFetchingThreads) - - self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() - + log.Infof("using %d fetching threads", self.config.desiredFetcherThreads) + // we are going to do batches of 5,000 blocks + batchSize := uint(1000) + var start BlockNumber + var end BlockNumber for { - self.spawnMoreFetchers() - self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() - time.Sleep(1000 * time.Millisecond) + self.db.SetCurrentNetworkBlockHeight(self.fetchCurrentBlockHeight()) + if self.db.CurrentNetworkBlockHeight() == self.db.CurrentLocalBlockHeight() { + //we are synced + time.Sleep(1 * time.Second) + continue + } + // we are not synced + + // how far behind are we? + countMustFetch := uint(self.db.CurrentNetworkBlockHeight() - self.db.CurrentLocalBlockHeight()) + start = self.db.CurrentLocalBlockHeight() + 1 + log.Infof("beginning fetch with start block %d", start) + if countMustFetch <= batchSize { + end = self.db.CurrentNetworkBlockHeight() + log.Infof("fetch is within our batch size (%d), end block for fetch batch is %d", batchSize, end) + } else { + end = BlockNumber(uint(start) + uint(batchSize) - uint(1)) + log.Infof("fetch is too large for batch size (%d), end block for fetch batch is %d", batchSize, end) + } + + bf := NewBlockFetcher(&BlockFetcherConfig{ + api: self.api.(*SteemAPI), + desiredFetcherThreads: self.config.desiredFetcherThreads, + startBlock: start, + endBlock: end, + }) + + blocks := bf.fetch() + log.Infof("blockfetcher has returned") + self.pushBlocks(blocks) } } -func (self *App) spawnMoreFetchers() { -} - -/* - self.updateCurrentBlockHeight() - log.Infof("current number of active fetchers: %d", self.numFetchers()) - time.Sleep(1500 * time.Millisecond) - self.datastore.SetCurrentBlockHeight() - localHeight := self.datastore.CurrentBlockHeight() - log.Infof("our highest fetched block height is %d", localHeight) - if localHeight < self.currentNetworkBlockHeight { - // we need to fetch some blocks from the network - avail := self.desiredFetchingThreads - self.numFetchers() - diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight) - log.Infof("we need to fetch %d blocks", diff) - if uint64(diff) > uint64(avail) { - self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail))) - } else { - // just spawn fetchers for the blocks we don't have - // spawning will update the number of running fetchers - self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight) +func (self *App) pushBlocks(newBlocks *[]FetchedBlock) { + counter := 0 + for _, newBlock := range *newBlocks { + counter += 1 + err := self.db.StoreBlockOps(newBlock.blockNumber, newBlock.blockData) + if err != nil { + log.Panic(err) } } - //needFetchers := self.desiredFetchingThreads - self.numFetchers() - -*/ + log.Infof("pushed %d new blocks to db", counter) + self.db.UpdateCurrentLocalBlockHeight() +} func (self *App) fetchCurrentBlockHeight() BlockNumber { + r, err := self.api.GetDynamicGlobalProperties() if err != nil { log.Panicf("can't fetch global properties, bailing. err: %s", err) diff --git a/blockfetcher.go b/blockfetcher.go index 9f52770..1461fa1 100644 --- a/blockfetcher.go +++ b/blockfetcher.go @@ -1,6 +1,8 @@ package main +import "encoding/json" import "sync" +import "time" import log "github.com/sirupsen/logrus" type FetchedBlock struct { @@ -14,7 +16,7 @@ type BlockFetcher struct { desiredFetcherThreads uint wantBlocks map[BlockNumber]bool fetchingBlocks map[BlockNumber]bool - fetchedBlocks *[]FetchedBlock + fetchedBlocks map[BlockNumber]*FetchedBlock lock *sync.Mutex workChannel chan BlockNumber resultsChannel chan *FetchedBlock @@ -43,47 +45,52 @@ func (self *BlockFetcher) init(config *BlockFetcherConfig) { self.lock = &sync.Mutex{} self.api = config.api self.desiredFetcherThreads = config.desiredFetcherThreads - self.workChannel = make(chan BlockNumber) self.resultsChannel = make(chan *FetchedBlock) + self.wantBlocks = make(map[BlockNumber]bool) + self.fetchedBlocks = make(map[BlockNumber]*FetchedBlock) + diff := int(uint(config.endBlock) - uint(config.startBlock)) - log.Debugf("diff is %d", diff) for i := 0; i <= diff; i++ { self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true } + log.Debugf("wantblocks[] is now %v", self.wantBlocks) } func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) { + self.lock.Lock() + defer self.lock.Unlock() if self.wantBlocks[blockNum] == false { log.Panicf("shouldn't happen") } - self.lock.Lock() - defer self.lock.Unlock() delete(self.wantBlocks, blockNum) } func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) { + self.lock.Lock() + defer self.lock.Unlock() if self.fetchingBlocks[blockNum] == false { log.Panicf("shouldn't happen") } - self.lock.Lock() - defer self.lock.Unlock() delete(self.fetchingBlocks, blockNum) } func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) { + self.lock.Lock() + defer self.lock.Unlock() if self.fetchingBlocks[blockNum] == true { log.Panicf("shouldn't happen") } - self.lock.Lock() - defer self.lock.Unlock() + if self.fetchingBlocks == nil { + self.fetchingBlocks = make(map[BlockNumber]bool) + } if self.fetchingBlocks[blockNum] == false { self.fetchingBlocks[blockNum] = true } } -func (self *BlockFetcher) fetcher(index int) { +func (self *BlockFetcher) fetcher(index uint) { log.Debugf("fetcher thread %d starting", index) WorkLoop: @@ -106,44 +113,65 @@ WorkLoop: log.Debugf("fetcher thread %d ending", index) } +func (self *BlockFetcher) sendWork(b BlockNumber) { + go func() { + // yay cheap goroutines, let them block on the unbuffered channel + log.Debugf("waiting to send blockNum %d into the work channel", b) + self.workChannel <- b + log.Debugf("sent blockNum %d into the work channel", b) + }() +} + func (self *BlockFetcher) fetch() *[]FetchedBlock { - for i := 1; i < self.desiredFetcherThreads+1; i++ { + log.Debugf("blockfetcher beginning fetch") + for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ { go self.fetcher(i) } - for blockNum, _ := range self.wantBlocks { - // yay cheap goroutines, let them block on the unbuffered channel - go func() { - log.Debugf("waiting to send blockNum %d into the work channel", blockNum) - self.workChannel <- blockNum - log.Debugf("sent blockNum %d into the work channel", blockNum) - }() + + self.lock.Lock() + for blockNum := range self.wantBlocks { + self.sendWork(blockNum) } + self.lock.Unlock() // now we have to start reading from the unbuffered resultsChannel // otherwise the workers will block when returning results - select { - case result := <-self.resultsChannel: - self.receiveResult(result) - default: - if self.Done() == true { - // if we get here, it's because workList is now empty and there - // are no more results in the results channel. - close(self.workChannel) // shut down the workers - result := self.fetchedBlocks - self = nil //this BlockFetcher is now finished. - return result + for { + select { + case result := <-self.resultsChannel: + self.receiveResult(result) + default: + if self.Done() == true { + log.Infof("blockfetcher %+v considers itself Done()", self) + // if we get here, it's because workList is now empty and there + // are no more results in the results channel. + close(self.workChannel) // shut down the workers + var final []FetchedBlock + self.lock.Lock() + for _, value := range self.fetchedBlocks { + final = append(final, *value) + } + self.lock.Unlock() + self = nil //this BlockFetcher is now finished. + return &final + } + // in this case we are not done but got nothing from the result + // channel so just wait a little bit to get more results and + // check the channel again + time.Sleep(10 * time.Millisecond) + //FIXME(sneak) we maybe need to handle a case here where wantBlocks never + //empties but workers need to be re-dispatched.. } - time.Sleep(10 * time.Millisecond) - //FIXME(sneak) we maybe need to handle a case here where wantBlocks never - //empties but workers need to be re-dispatched.. } + log.Panicf("this shouldn't happen") + return nil //shouldn't happen, return should happen from above } func (self *BlockFetcher) receiveResult(r *FetchedBlock) { log.Debugf("got result for blocknum %d", r.blockNumber) self.removeFetchingBlock(r.blockNumber) self.lock.Lock() - self.fetchedBlocks = append(self.fetchedBlocks, r) + self.fetchedBlocks[r.blockNumber] = r self.lock.Unlock() self.removeWantBlock(r.blockNumber) } @@ -156,16 +184,16 @@ func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *Fetche } r, err := self.api.GetOpsInBlock(blockNum) if err != nil { - result.error = err + result.error = &err return result } bytes, err := json.Marshal(r) if err != nil { - result.error = err + result.error = &err return result } count := len(*r) - log.Infof("got %d operations for block %d", count, blockNum) + log.Infof("got %d operations for block %d fetched from network", count, blockNum) result.blockData = &bytes result.error = nil // make sure this is nil if it worked return result diff --git a/blockfetcher_test.go b/blockfetcher_test.go new file mode 100644 index 0000000..5ef0b1f --- /dev/null +++ b/blockfetcher_test.go @@ -0,0 +1,53 @@ +package main + +import "testing" +import log "github.com/sirupsen/logrus" + +/* +import "github.com/stretchr/testify/mock" + +type MockedSteemAPI struct { + mock.Mock +} + +func (m *MockedSteemAPI) DoSomething(number int) (bool, error) { + args := m.Called(number) + return args.Bool(0), args.Error(1) +} + +*/ + +func TestBlockfetcherInit(t *testing.T) { + + log.SetLevel(log.DebugLevel) + + bf := NewBlockFetcher(&BlockFetcherConfig{ + api: nil, + desiredFetcherThreads: 1, + startBlock: 10000, + endBlock: 10005, + }) + + if bf == nil { + t.Errorf("could not instantiate blockfetcher") + } + +} + +//can't actually fetch yet until we mock the api +/* +func TestBlockfetcherFetch(t *testing.T) { + + log.SetLevel(log.DebugLevel) + + bf := NewBlockFetcher(&BlockFetcherConfig{ + api: nil, + desiredFetcherThreads: 1, + startBlock: 10000, + endBlock: 10005, + }) + + bf.fetch() +} + +*/ diff --git a/db.go b/db.go index 3efbbf7..8f979a6 100644 --- a/db.go +++ b/db.go @@ -10,8 +10,11 @@ import "strconv" const appPrefix = "sbf" type SteemDataStorer interface { - SetCurrentBlockHeight(BlockNumber) error - CurrentBlockHeight() BlockNumber + SetCurrentNetworkBlockHeight(BlockNumber) error + SetCurrentLocalBlockHeight(BlockNumber) error + UpdateCurrentLocalBlockHeight() + CurrentLocalBlockHeight() BlockNumber + CurrentNetworkBlockHeight() BlockNumber HaveOpsForBlock(BlockNumber) bool StoreBlockOps(BlockNumber, *[]byte) error } @@ -24,11 +27,35 @@ type SteemDataStore struct { func NewSteemDataStore(hostname string) SteemDataStorer { self := new(SteemDataStore) self.kv = NewRedisKVStore(hostname) + self.init() return self } -func (self *SteemDataStore) SetCurrentBlockHeight(blockNum BlockNumber) error { - keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) +func (self *SteemDataStore) init() { + self.UpdateCurrentLocalBlockHeight() +} + +func (self *SteemDataStore) UpdateCurrentLocalBlockHeight() { + cur := self.CurrentLocalBlockHeight() + next := self.FindHighestContiguousBlockInDb(cur) + if next != cur { + err := self.SetCurrentLocalBlockHeight(next) + log.Infof("current highest contig block in db is now %d", next) + if err != nil { + log.Panic(err) + } + return + } +} + +func (self *SteemDataStore) SetCurrentLocalBlockHeight(blockNum BlockNumber) error { + keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix) + value := fmt.Sprintf("%d", blockNum) + return self.kv.Put(&keyname, &value) +} + +func (self *SteemDataStore) SetCurrentNetworkBlockHeight(blockNum BlockNumber) error { + keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix) value := fmt.Sprintf("%d", blockNum) return self.kv.Put(&keyname, &value) } @@ -42,7 +69,10 @@ func (self *SteemDataStore) FindHighestContiguousBlockInDb(from BlockNumber) Blo for { try = BlockNumber(uint64(last) + 1) keyname = fmt.Sprintf("%s.ops_in_block.%d", appPrefix, try) - exists, _ := self.kv.Exists(&keyname) + exists, err := self.kv.Exists(&keyname) + if err != nil { + log.Panic(err) + } if exists == false { log.Debugf("cannot find block %d in db, highest found is %d", try, last) return last @@ -64,14 +94,27 @@ func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool { return exists } -func (self *SteemDataStore) CurrentBlockHeight() BlockNumber { - keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) +func (self *SteemDataStore) CurrentNetworkBlockHeight() BlockNumber { + keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix) val, err := self.kv.Get(&keyname) if err != nil { // assume this is key not found, initialize key to default - self.SetCurrentBlockHeight(0) + self.SetCurrentNetworkBlockHeight(0) // retry - return self.CurrentBlockHeight() + return self.CurrentNetworkBlockHeight() + } + intval, err := strconv.ParseUint(*val, 10, 64) + return BlockNumber(intval) +} + +func (self *SteemDataStore) CurrentLocalBlockHeight() BlockNumber { + keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix) + val, err := self.kv.Get(&keyname) + if err != nil { + // assume this is key not found, initialize key to default + self.SetCurrentLocalBlockHeight(0) + // retry + return self.CurrentLocalBlockHeight() } intval, err := strconv.ParseUint(*val, 10, 64) return BlockNumber(intval) diff --git a/main.go b/main.go index 8ea0f29..b7790fd 100644 --- a/main.go +++ b/main.go @@ -11,26 +11,11 @@ const redisUrl = "localhost:6379" //const steemAPIURL = "http://las2.local:8090" func main() { - log.SetLevel(log.DebugLevel) - var x *BlockFetcher - x = NewBlockFetcher(&BlockFetcherConfig{ - api: nil, - desiredFetcherThreads: 40, - startBlock: 10000, - endBlock: 10005, - }) - _ = x -} - -/* -func mainx() { app := NewApp(&appconfig{ - logLevel: log.DebugLevel, - apiUrl: steemAPIURL, - redisUrl: redisUrl, - fetcherThreads: 40, + logLevel: log.InfoLevel, + apiUrl: steemAPIURL, + redisUrl: redisUrl, + desiredFetcherThreads: 20, }) app.main() } - -*/ diff --git a/steemapi.go b/steemapi.go index 48b2f63..4559d90 100644 --- a/steemapi.go +++ b/steemapi.go @@ -9,10 +9,15 @@ type SteemAPI struct { rpc *JSONRPC } +type SteemAPIShape interface { + GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) + GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse, error) +} + var EmptyParams = []string{} var EmptyParamsRaw, _ = json.Marshal(EmptyParams) -func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI { +func NewSteemAPI(url string, options ...func(s SteemAPIShape)) *SteemAPI { rpc := NewJSONRPC(url, func(x *JSONRPC) {}) @@ -46,9 +51,12 @@ func (self *SteemAPI) GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse // api.steemit.com. realOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: false} rop, err := realOpsParams.MarshalJSON() - realOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop) + if err != nil { + return nil, err + } + rawOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop) var result []OperationObject - err = json.Unmarshal(realOpsResponse, &result) + err = json.Unmarshal(rawOpsResponse, &result) if err != nil { return nil, err }