Compare commits
No commits in common. "76d26a6f1e84475c74bd30f8d69d05e55d1b4a90" and "32874cca63768ce0d4faac6228097dd57609f7dd" have entirely different histories.
76d26a6f1e
...
32874cca63
1
.gitignore
vendored
1
.gitignore
vendored
@ -1 +0,0 @@
|
|||||||
steem-block-db
|
|
@ -1,30 +0,0 @@
|
|||||||
image: golang:1.11
|
|
||||||
|
|
||||||
cache:
|
|
||||||
paths:
|
|
||||||
- /apt-cache
|
|
||||||
- /go/src/github.com
|
|
||||||
- /go/src/golang.org
|
|
||||||
- /go/src/google.golang.org
|
|
||||||
- /go/src/gopkg.in
|
|
||||||
|
|
||||||
stages:
|
|
||||||
- test
|
|
||||||
- build
|
|
||||||
|
|
||||||
before_script:
|
|
||||||
- mkdir /go/src/steem-block-db
|
|
||||||
- cp $CI_PROJECT_DIR/*.go /go/src/steem-block-db
|
|
||||||
- cd /go/src/steem-block-db
|
|
||||||
- GOPATH=/go go get
|
|
||||||
|
|
||||||
#unit_tests:
|
|
||||||
# stage: test
|
|
||||||
# script:
|
|
||||||
# - make test
|
|
||||||
|
|
||||||
build:
|
|
||||||
stage: build
|
|
||||||
script:
|
|
||||||
- cd /go/src/steem-block-db
|
|
||||||
- GOPATH=/go go build
|
|
19
Makefile
19
Makefile
@ -1,19 +1,4 @@
|
|||||||
GOFILES := $(shell find . -type f -name '*.go' -not -name '*_test.go')
|
default: run
|
||||||
|
|
||||||
default: test
|
|
||||||
|
|
||||||
.PHONY: run build test
|
|
||||||
|
|
||||||
run:
|
run:
|
||||||
go run $(GOFILES)
|
go run *.go
|
||||||
|
|
||||||
build: steem-block-db
|
|
||||||
|
|
||||||
steem-block-db: *.go
|
|
||||||
go build
|
|
||||||
|
|
||||||
test:
|
|
||||||
go test
|
|
||||||
|
|
||||||
clean:
|
|
||||||
rm steem-block-db
|
|
||||||
|
153
app.go
153
app.go
@ -1,15 +1,14 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import "sync"
|
import "sync"
|
||||||
import "time"
|
|
||||||
import log "github.com/sirupsen/logrus"
|
import log "github.com/sirupsen/logrus"
|
||||||
|
import "encoding/json"
|
||||||
//import "encoding/json"
|
|
||||||
|
|
||||||
type App struct {
|
type App struct {
|
||||||
api SteemAPIShape
|
datastore SteemDataStorer
|
||||||
config *appconfig
|
api *SteemAPI
|
||||||
db SteemDataStorer
|
currentNetworkBlockHeight BlockNumber
|
||||||
|
currentLocalBlockHeight BlockNumber
|
||||||
lock *sync.Mutex
|
lock *sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -17,7 +16,6 @@ type appconfig struct {
|
|||||||
logLevel log.Level
|
logLevel log.Level
|
||||||
apiUrl string
|
apiUrl string
|
||||||
redisUrl string
|
redisUrl string
|
||||||
desiredFetcherThreads uint
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewApp(config *appconfig) *App {
|
func NewApp(config *appconfig) *App {
|
||||||
@ -28,80 +26,107 @@ func NewApp(config *appconfig) *App {
|
|||||||
|
|
||||||
func (self *App) init(config *appconfig) {
|
func (self *App) init(config *appconfig) {
|
||||||
log.SetLevel(config.logLevel)
|
log.SetLevel(config.logLevel)
|
||||||
log.SetFormatter(&log.TextFormatter{
|
|
||||||
FullTimestamp: true,
|
|
||||||
})
|
|
||||||
log.SetFormatter(&log.JSONFormatter{})
|
|
||||||
//log.SetReportCaller(true)
|
|
||||||
self.api = NewSteemAPI(config.apiUrl)
|
self.api = NewSteemAPI(config.apiUrl)
|
||||||
self.db = NewSteemDataStore(config.redisUrl)
|
self.datastore = NewSteemDataStore(config.redisUrl)
|
||||||
self.lock = &sync.Mutex{}
|
self.lock = &sync.Mutex{}
|
||||||
self.config = config
|
}
|
||||||
|
|
||||||
|
func (self *App) updateCurrentBlockHeight() {
|
||||||
|
h := self.fetchCurrentBlockHeight()
|
||||||
|
if h > self.currentNetworkBlockHeight {
|
||||||
|
self.lock.Lock()
|
||||||
|
defer self.lock.Unlock()
|
||||||
|
self.currentNetworkBlockHeight = h
|
||||||
|
log.Infof("current block height is now %d", self.currentNetworkBlockHeight)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *App) main() {
|
func (self *App) main() {
|
||||||
log.Infof("steem block data fetcher starting up...")
|
log.Infof("steem block data fetcher starting up...")
|
||||||
self.mainloop()
|
//self.mainloop()
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
func (self *App) numFetchers() uint {
|
||||||
|
self.lock.Lock()
|
||||||
|
defer self.lock.Unlock()
|
||||||
|
return uint(len(*self.fetchingBlocks))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *App) spawnNewFetcher(blockNum BlockNumber) {
|
||||||
|
log.Debugf("spawning fetcher for block %d", blockNum)
|
||||||
|
go func() {
|
||||||
|
// this is so hacky, make a queue like a grownup would you
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
if self.datastore.HaveOpsForBlock(blockNum) {
|
||||||
|
log.Infof("already have ops for block %d, not re-fetching", blockNum)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
//self.incrFetchers()
|
||||||
|
self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum))
|
||||||
|
//self.decrFetchers()
|
||||||
|
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) {
|
||||||
|
self.datastore.StoreBlockOps(blockNum, blockOps)
|
||||||
|
}
|
||||||
|
|
||||||
|
// note that parallelFetchAndStoreBlocks does not respect the
|
||||||
|
// limitation on number of desired fetchers, that is for the caller
|
||||||
|
func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) {
|
||||||
|
var diff = uint64(end) - uint64(start)
|
||||||
|
var i uint64
|
||||||
|
for i = 0; i < diff; i++ {
|
||||||
|
self.spawnNewFetcher(BlockNumber(uint64(start) + i))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *App) populateFetchers() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *App) mainloop() {
|
func (self *App) mainloop() {
|
||||||
log.Infof("using %d fetching threads", self.config.desiredFetcherThreads)
|
|
||||||
batchSize := uint(3000)
|
log.Infof("using %d fetching threads", self.desiredFetchingThreads)
|
||||||
//batchSize := uint(10)
|
|
||||||
var start BlockNumber
|
self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight()
|
||||||
var end BlockNumber
|
|
||||||
for {
|
for {
|
||||||
self.db.SetCurrentNetworkBlockHeight(self.fetchCurrentBlockHeight())
|
self.spawnMoreFetchers()
|
||||||
if self.db.CurrentNetworkBlockHeight() == self.db.CurrentLocalBlockHeight() {
|
self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight()
|
||||||
//we are synced
|
time.Sleep(1000 * time.Millisecond)
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
log.Infof("current network block height = %d", self.db.CurrentNetworkBlockHeight())
|
}
|
||||||
log.Infof("current local block height = %d", self.db.CurrentLocalBlockHeight())
|
|
||||||
// we are not synced
|
|
||||||
|
|
||||||
// how far behind are we?
|
func (self *App) spawnMoreFetchers() {
|
||||||
countMustFetch := uint(self.db.CurrentNetworkBlockHeight() - self.db.CurrentLocalBlockHeight())
|
}
|
||||||
log.Infof("we are %d blocks behind", countMustFetch)
|
|
||||||
start = self.db.CurrentLocalBlockHeight() + 1
|
/*
|
||||||
log.Infof("beginning fetch with start block %d", start)
|
self.updateCurrentBlockHeight()
|
||||||
if countMustFetch <= batchSize {
|
log.Infof("current number of active fetchers: %d", self.numFetchers())
|
||||||
end = self.db.CurrentNetworkBlockHeight()
|
time.Sleep(1500 * time.Millisecond)
|
||||||
log.Infof("fetch is within our batch size (%d), end block for fetch batch is %d", batchSize, end)
|
self.datastore.SetCurrentBlockHeight()
|
||||||
|
localHeight := self.datastore.CurrentBlockHeight()
|
||||||
|
log.Infof("our highest fetched block height is %d", localHeight)
|
||||||
|
if localHeight < self.currentNetworkBlockHeight {
|
||||||
|
// we need to fetch some blocks from the network
|
||||||
|
avail := self.desiredFetchingThreads - self.numFetchers()
|
||||||
|
diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight)
|
||||||
|
log.Infof("we need to fetch %d blocks", diff)
|
||||||
|
if uint64(diff) > uint64(avail) {
|
||||||
|
self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail)))
|
||||||
} else {
|
} else {
|
||||||
end = BlockNumber(uint(start) + uint(batchSize) - uint(1))
|
// just spawn fetchers for the blocks we don't have
|
||||||
log.Infof("fetch is too large for batch size (%d), end block for fetch batch is %d", batchSize, end)
|
// spawning will update the number of running fetchers
|
||||||
|
self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
//needFetchers := self.desiredFetchingThreads - self.numFetchers()
|
||||||
|
|
||||||
bf := NewBlockFetcher(&BlockFetcherConfig{
|
*/
|
||||||
api: self.api.(*SteemAPI),
|
|
||||||
desiredFetcherThreads: self.config.desiredFetcherThreads,
|
|
||||||
startBlock: start,
|
|
||||||
endBlock: end,
|
|
||||||
})
|
|
||||||
|
|
||||||
blocks := bf.fetch()
|
|
||||||
log.Infof("blockfetcher has returned")
|
|
||||||
self.pushBlocks(blocks)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *App) pushBlocks(newBlocks *[]FetchedBlock) {
|
|
||||||
counter := 0
|
|
||||||
for _, newBlock := range *newBlocks {
|
|
||||||
counter += 1
|
|
||||||
err := self.db.StoreBlockOps(newBlock.blockNumber, newBlock.blockData)
|
|
||||||
if err != nil {
|
|
||||||
log.Panic(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.Infof("pushed %d new blocks to db", counter)
|
|
||||||
self.db.UpdateCurrentLocalBlockHeight()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *App) fetchCurrentBlockHeight() BlockNumber {
|
func (self *App) fetchCurrentBlockHeight() BlockNumber {
|
||||||
|
|
||||||
r, err := self.api.GetDynamicGlobalProperties()
|
r, err := self.api.GetDynamicGlobalProperties()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("can't fetch global properties, bailing. err: %s", err)
|
log.Panicf("can't fetch global properties, bailing. err: %s", err)
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import "encoding/json"
|
|
||||||
import "sync"
|
import "sync"
|
||||||
import "time"
|
|
||||||
import log "github.com/sirupsen/logrus"
|
import log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
type FetchedBlock struct {
|
type FetchedBlock struct {
|
||||||
@ -16,7 +14,7 @@ type BlockFetcher struct {
|
|||||||
desiredFetcherThreads uint
|
desiredFetcherThreads uint
|
||||||
wantBlocks map[BlockNumber]bool
|
wantBlocks map[BlockNumber]bool
|
||||||
fetchingBlocks map[BlockNumber]bool
|
fetchingBlocks map[BlockNumber]bool
|
||||||
fetchedBlocks map[BlockNumber]*FetchedBlock
|
fetchedBlocks *[]FetchedBlock
|
||||||
lock *sync.Mutex
|
lock *sync.Mutex
|
||||||
workChannel chan BlockNumber
|
workChannel chan BlockNumber
|
||||||
resultsChannel chan *FetchedBlock
|
resultsChannel chan *FetchedBlock
|
||||||
@ -45,52 +43,47 @@ func (self *BlockFetcher) init(config *BlockFetcherConfig) {
|
|||||||
self.lock = &sync.Mutex{}
|
self.lock = &sync.Mutex{}
|
||||||
self.api = config.api
|
self.api = config.api
|
||||||
self.desiredFetcherThreads = config.desiredFetcherThreads
|
self.desiredFetcherThreads = config.desiredFetcherThreads
|
||||||
|
|
||||||
self.workChannel = make(chan BlockNumber)
|
self.workChannel = make(chan BlockNumber)
|
||||||
self.resultsChannel = make(chan *FetchedBlock)
|
self.resultsChannel = make(chan *FetchedBlock)
|
||||||
self.wantBlocks = make(map[BlockNumber]bool)
|
|
||||||
self.fetchedBlocks = make(map[BlockNumber]*FetchedBlock)
|
|
||||||
|
|
||||||
diff := int(uint(config.endBlock) - uint(config.startBlock))
|
diff := int(uint(config.endBlock) - uint(config.startBlock))
|
||||||
|
log.Debugf("diff is %d", diff)
|
||||||
for i := 0; i <= diff; i++ {
|
for i := 0; i <= diff; i++ {
|
||||||
self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true
|
self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("wantblocks[] is now %v", self.wantBlocks)
|
log.Debugf("wantblocks[] is now %v", self.wantBlocks)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) {
|
func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) {
|
||||||
self.lock.Lock()
|
|
||||||
defer self.lock.Unlock()
|
|
||||||
if self.wantBlocks[blockNum] == false {
|
if self.wantBlocks[blockNum] == false {
|
||||||
log.Panicf("shouldn't happen")
|
log.Panicf("shouldn't happen")
|
||||||
}
|
}
|
||||||
|
self.lock.Lock()
|
||||||
|
defer self.lock.Unlock()
|
||||||
delete(self.wantBlocks, blockNum)
|
delete(self.wantBlocks, blockNum)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) {
|
func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) {
|
||||||
self.lock.Lock()
|
|
||||||
defer self.lock.Unlock()
|
|
||||||
if self.fetchingBlocks[blockNum] == false {
|
if self.fetchingBlocks[blockNum] == false {
|
||||||
log.Panicf("shouldn't happen")
|
log.Panicf("shouldn't happen")
|
||||||
}
|
}
|
||||||
|
self.lock.Lock()
|
||||||
|
defer self.lock.Unlock()
|
||||||
delete(self.fetchingBlocks, blockNum)
|
delete(self.fetchingBlocks, blockNum)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) {
|
func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) {
|
||||||
self.lock.Lock()
|
|
||||||
defer self.lock.Unlock()
|
|
||||||
if self.fetchingBlocks[blockNum] == true {
|
if self.fetchingBlocks[blockNum] == true {
|
||||||
log.Panicf("shouldn't happen")
|
log.Panicf("shouldn't happen")
|
||||||
}
|
}
|
||||||
if self.fetchingBlocks == nil {
|
self.lock.Lock()
|
||||||
self.fetchingBlocks = make(map[BlockNumber]bool)
|
defer self.lock.Unlock()
|
||||||
}
|
|
||||||
if self.fetchingBlocks[blockNum] == false {
|
if self.fetchingBlocks[blockNum] == false {
|
||||||
self.fetchingBlocks[blockNum] = true
|
self.fetchingBlocks[blockNum] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *BlockFetcher) fetcher(index uint) {
|
func (self *BlockFetcher) fetcher(index int) {
|
||||||
log.Debugf("fetcher thread %d starting", index)
|
log.Debugf("fetcher thread %d starting", index)
|
||||||
|
|
||||||
WorkLoop:
|
WorkLoop:
|
||||||
@ -107,88 +100,50 @@ WorkLoop:
|
|||||||
} else {
|
} else {
|
||||||
// wait a sec and try again
|
// wait a sec and try again
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
log.Infof("error fetching block %d, retry %d (%v)", blockNum, i+1, r.error)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Debugf("fetcher thread %d ending", index)
|
log.Debugf("fetcher thread %d ending", index)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *BlockFetcher) sendWork(b BlockNumber) {
|
|
||||||
go func() {
|
|
||||||
// yay cheap goroutines, let them block on the unbuffered channel
|
|
||||||
log.Debugf("waiting to send blockNum %d into the work channel", b)
|
|
||||||
self.workChannel <- b
|
|
||||||
log.Debugf("sent blockNum %d into the work channel", b)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *BlockFetcher) fetch() *[]FetchedBlock {
|
func (self *BlockFetcher) fetch() *[]FetchedBlock {
|
||||||
log.Debugf("blockfetcher beginning fetch")
|
for i := 1; i < self.desiredFetcherThreads+1; i++ {
|
||||||
startTime := time.Now()
|
|
||||||
for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ {
|
|
||||||
go self.fetcher(i)
|
go self.fetcher(i)
|
||||||
}
|
}
|
||||||
|
for blockNum, _ := range self.wantBlocks {
|
||||||
self.lock.Lock()
|
// yay cheap goroutines, let them block on the unbuffered channel
|
||||||
for blockNum := range self.wantBlocks {
|
go func() {
|
||||||
self.sendWork(blockNum)
|
log.Debugf("waiting to send blockNum %d into the work channel", blockNum)
|
||||||
|
self.workChannel <- blockNum
|
||||||
|
log.Debugf("sent blockNum %d into the work channel", blockNum)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
self.lock.Unlock()
|
|
||||||
|
|
||||||
// now we have to start reading from the unbuffered resultsChannel
|
// now we have to start reading from the unbuffered resultsChannel
|
||||||
// otherwise the workers will block when returning results
|
// otherwise the workers will block when returning results
|
||||||
for {
|
|
||||||
select {
|
select {
|
||||||
case result := <-self.resultsChannel:
|
case result := <-self.resultsChannel:
|
||||||
self.receiveResult(result)
|
self.receiveResult(result)
|
||||||
default:
|
default:
|
||||||
|
|
||||||
if startTime.Add(30 * time.Second).Before(time.Now()) {
|
|
||||||
// fetch took too long, return anyway
|
|
||||||
log.Infof("30s fetcher batch timeout reached, but not done fetching")
|
|
||||||
// return what we have
|
|
||||||
var final []FetchedBlock
|
|
||||||
self.lock.Lock()
|
|
||||||
for _, value := range self.fetchedBlocks {
|
|
||||||
final = append(final, *value)
|
|
||||||
}
|
|
||||||
self.lock.Unlock()
|
|
||||||
self = nil //this BlockFetcher is now finished.
|
|
||||||
return &final
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.Done() == true {
|
if self.Done() == true {
|
||||||
log.Infof("blockfetcher %+v considers itself Done()", self)
|
|
||||||
// if we get here, it's because workList is now empty and there
|
// if we get here, it's because workList is now empty and there
|
||||||
// are no more results in the results channel.
|
// are no more results in the results channel.
|
||||||
close(self.workChannel) // shut down the workers
|
close(self.workChannel) // shut down the workers
|
||||||
var final []FetchedBlock
|
result := self.fetchedBlocks
|
||||||
self.lock.Lock()
|
|
||||||
for _, value := range self.fetchedBlocks {
|
|
||||||
final = append(final, *value)
|
|
||||||
}
|
|
||||||
self.lock.Unlock()
|
|
||||||
self = nil //this BlockFetcher is now finished.
|
self = nil //this BlockFetcher is now finished.
|
||||||
return &final
|
return result
|
||||||
}
|
}
|
||||||
// in this case we are not done but got nothing from the result
|
|
||||||
// channel so just wait a little bit to get more results and
|
|
||||||
// check the channel again
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
//FIXME(sneak) we maybe need to handle a case here where wantBlocks never
|
//FIXME(sneak) we maybe need to handle a case here where wantBlocks never
|
||||||
//empties but workers need to be re-dispatched..
|
//empties but workers need to be re-dispatched..
|
||||||
}
|
}
|
||||||
}
|
|
||||||
log.Panicf("this shouldn't happen")
|
|
||||||
return nil //shouldn't happen, return should happen from above
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *BlockFetcher) receiveResult(r *FetchedBlock) {
|
func (self *BlockFetcher) receiveResult(r *FetchedBlock) {
|
||||||
log.Debugf("got result for blocknum %d", r.blockNumber)
|
log.Debugf("got result for blocknum %d", r.blockNumber)
|
||||||
self.removeFetchingBlock(r.blockNumber)
|
self.removeFetchingBlock(r.blockNumber)
|
||||||
self.lock.Lock()
|
self.lock.Lock()
|
||||||
self.fetchedBlocks[r.blockNumber] = r
|
self.fetchedBlocks = append(self.fetchedBlocks, r)
|
||||||
self.lock.Unlock()
|
self.lock.Unlock()
|
||||||
self.removeWantBlock(r.blockNumber)
|
self.removeWantBlock(r.blockNumber)
|
||||||
}
|
}
|
||||||
@ -201,16 +156,16 @@ func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *Fetche
|
|||||||
}
|
}
|
||||||
r, err := self.api.GetOpsInBlock(blockNum)
|
r, err := self.api.GetOpsInBlock(blockNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
result.error = &err
|
result.error = err
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
bytes, err := json.Marshal(r)
|
bytes, err := json.Marshal(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
result.error = &err
|
result.error = err
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
count := len(*r)
|
count := len(*r)
|
||||||
log.Infof("got %d operations for block %d fetched from network", count, blockNum)
|
log.Infof("got %d operations for block %d", count, blockNum)
|
||||||
result.blockData = &bytes
|
result.blockData = &bytes
|
||||||
result.error = nil // make sure this is nil if it worked
|
result.error = nil // make sure this is nil if it worked
|
||||||
return result
|
return result
|
||||||
|
@ -1,53 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import "testing"
|
|
||||||
import log "github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
/*
|
|
||||||
import "github.com/stretchr/testify/mock"
|
|
||||||
|
|
||||||
type MockedSteemAPI struct {
|
|
||||||
mock.Mock
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockedSteemAPI) DoSomething(number int) (bool, error) {
|
|
||||||
args := m.Called(number)
|
|
||||||
return args.Bool(0), args.Error(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
func TestBlockfetcherInit(t *testing.T) {
|
|
||||||
|
|
||||||
log.SetLevel(log.DebugLevel)
|
|
||||||
|
|
||||||
bf := NewBlockFetcher(&BlockFetcherConfig{
|
|
||||||
api: nil,
|
|
||||||
desiredFetcherThreads: 1,
|
|
||||||
startBlock: 10000,
|
|
||||||
endBlock: 10005,
|
|
||||||
})
|
|
||||||
|
|
||||||
if bf == nil {
|
|
||||||
t.Errorf("could not instantiate blockfetcher")
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
//can't actually fetch yet until we mock the api
|
|
||||||
/*
|
|
||||||
func TestBlockfetcherFetch(t *testing.T) {
|
|
||||||
|
|
||||||
log.SetLevel(log.DebugLevel)
|
|
||||||
|
|
||||||
bf := NewBlockFetcher(&BlockFetcherConfig{
|
|
||||||
api: nil,
|
|
||||||
desiredFetcherThreads: 1,
|
|
||||||
startBlock: 10000,
|
|
||||||
endBlock: 10005,
|
|
||||||
})
|
|
||||||
|
|
||||||
bf.fetch()
|
|
||||||
}
|
|
||||||
|
|
||||||
*/
|
|
61
db.go
61
db.go
@ -10,11 +10,8 @@ import "strconv"
|
|||||||
const appPrefix = "sbf"
|
const appPrefix = "sbf"
|
||||||
|
|
||||||
type SteemDataStorer interface {
|
type SteemDataStorer interface {
|
||||||
SetCurrentNetworkBlockHeight(BlockNumber) error
|
SetCurrentBlockHeight(BlockNumber) error
|
||||||
SetCurrentLocalBlockHeight(BlockNumber) error
|
CurrentBlockHeight() BlockNumber
|
||||||
UpdateCurrentLocalBlockHeight()
|
|
||||||
CurrentLocalBlockHeight() BlockNumber
|
|
||||||
CurrentNetworkBlockHeight() BlockNumber
|
|
||||||
HaveOpsForBlock(BlockNumber) bool
|
HaveOpsForBlock(BlockNumber) bool
|
||||||
StoreBlockOps(BlockNumber, *[]byte) error
|
StoreBlockOps(BlockNumber, *[]byte) error
|
||||||
}
|
}
|
||||||
@ -27,35 +24,11 @@ type SteemDataStore struct {
|
|||||||
func NewSteemDataStore(hostname string) SteemDataStorer {
|
func NewSteemDataStore(hostname string) SteemDataStorer {
|
||||||
self := new(SteemDataStore)
|
self := new(SteemDataStore)
|
||||||
self.kv = NewRedisKVStore(hostname)
|
self.kv = NewRedisKVStore(hostname)
|
||||||
self.init()
|
|
||||||
return self
|
return self
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *SteemDataStore) init() {
|
func (self *SteemDataStore) SetCurrentBlockHeight(blockNum BlockNumber) error {
|
||||||
self.UpdateCurrentLocalBlockHeight()
|
keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix)
|
||||||
}
|
|
||||||
|
|
||||||
func (self *SteemDataStore) UpdateCurrentLocalBlockHeight() {
|
|
||||||
cur := self.CurrentLocalBlockHeight()
|
|
||||||
next := self.FindHighestContiguousBlockInDb(cur)
|
|
||||||
if next != cur {
|
|
||||||
err := self.SetCurrentLocalBlockHeight(next)
|
|
||||||
log.Infof("current highest contig block in db is now %d", next)
|
|
||||||
if err != nil {
|
|
||||||
log.Panic(err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *SteemDataStore) SetCurrentLocalBlockHeight(blockNum BlockNumber) error {
|
|
||||||
keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix)
|
|
||||||
value := fmt.Sprintf("%d", blockNum)
|
|
||||||
return self.kv.Put(&keyname, &value)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *SteemDataStore) SetCurrentNetworkBlockHeight(blockNum BlockNumber) error {
|
|
||||||
keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix)
|
|
||||||
value := fmt.Sprintf("%d", blockNum)
|
value := fmt.Sprintf("%d", blockNum)
|
||||||
return self.kv.Put(&keyname, &value)
|
return self.kv.Put(&keyname, &value)
|
||||||
}
|
}
|
||||||
@ -69,10 +42,7 @@ func (self *SteemDataStore) FindHighestContiguousBlockInDb(from BlockNumber) Blo
|
|||||||
for {
|
for {
|
||||||
try = BlockNumber(uint64(last) + 1)
|
try = BlockNumber(uint64(last) + 1)
|
||||||
keyname = fmt.Sprintf("%s.ops_in_block.%d", appPrefix, try)
|
keyname = fmt.Sprintf("%s.ops_in_block.%d", appPrefix, try)
|
||||||
exists, err := self.kv.Exists(&keyname)
|
exists, _ := self.kv.Exists(&keyname)
|
||||||
if err != nil {
|
|
||||||
log.Panic(err)
|
|
||||||
}
|
|
||||||
if exists == false {
|
if exists == false {
|
||||||
log.Debugf("cannot find block %d in db, highest found is %d", try, last)
|
log.Debugf("cannot find block %d in db, highest found is %d", try, last)
|
||||||
return last
|
return last
|
||||||
@ -94,27 +64,14 @@ func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool {
|
|||||||
return exists
|
return exists
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *SteemDataStore) CurrentNetworkBlockHeight() BlockNumber {
|
func (self *SteemDataStore) CurrentBlockHeight() BlockNumber {
|
||||||
keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix)
|
keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix)
|
||||||
val, err := self.kv.Get(&keyname)
|
val, err := self.kv.Get(&keyname)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// assume this is key not found, initialize key to default
|
// assume this is key not found, initialize key to default
|
||||||
self.SetCurrentNetworkBlockHeight(0)
|
self.SetCurrentBlockHeight(0)
|
||||||
// retry
|
// retry
|
||||||
return self.CurrentNetworkBlockHeight()
|
return self.CurrentBlockHeight()
|
||||||
}
|
|
||||||
intval, err := strconv.ParseUint(*val, 10, 64)
|
|
||||||
return BlockNumber(intval)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (self *SteemDataStore) CurrentLocalBlockHeight() BlockNumber {
|
|
||||||
keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix)
|
|
||||||
val, err := self.kv.Get(&keyname)
|
|
||||||
if err != nil {
|
|
||||||
// assume this is key not found, initialize key to default
|
|
||||||
self.SetCurrentLocalBlockHeight(0)
|
|
||||||
// retry
|
|
||||||
return self.CurrentLocalBlockHeight()
|
|
||||||
}
|
}
|
||||||
intval, err := strconv.ParseUint(*val, 10, 64)
|
intval, err := strconv.ParseUint(*val, 10, 64)
|
||||||
return BlockNumber(intval)
|
return BlockNumber(intval)
|
||||||
|
@ -11,7 +11,6 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type httpRPCClient interface {
|
type httpRPCClient interface {
|
||||||
@ -48,14 +47,9 @@ type JSONRPC struct {
|
|||||||
|
|
||||||
// New create new rpc client with given url
|
// New create new rpc client with given url
|
||||||
func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC {
|
func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC {
|
||||||
|
|
||||||
netClient := &http.Client{
|
|
||||||
Timeout: time.Second * 20,
|
|
||||||
}
|
|
||||||
|
|
||||||
rpc := &JSONRPC{
|
rpc := &JSONRPC{
|
||||||
url: url,
|
url: url,
|
||||||
client: netClient,
|
client: http.DefaultClient,
|
||||||
}
|
}
|
||||||
for _, option := range options {
|
for _, option := range options {
|
||||||
option(rpc)
|
option(rpc)
|
||||||
@ -83,7 +77,6 @@ func (rpc *JSONRPC) Call(method string, params json.RawMessage) (json.RawMessage
|
|||||||
defer response.Body.Close()
|
defer response.Body.Close()
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Infof("jsonrpc error: %v", err)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,7 +115,7 @@ func (kv *BadgerKVStore) Put(key *string, value *string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
err = txn.Commit()
|
err = txn.Commit(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
52
main.go
52
main.go
@ -1,36 +1,36 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import "github.com/spf13/viper"
|
//import "github.com/spf13/viper"
|
||||||
|
//import "encoding/json"
|
||||||
import log "github.com/sirupsen/logrus"
|
import log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
// STEEM_APIURL=https://api.steem.house ./steem-block-db
|
const steemAPIURL = "https://api.steemit.com"
|
||||||
|
const redisUrl = "localhost:6379"
|
||||||
|
|
||||||
|
//const steemAPIURL = "http://10.100.202.175:8090"
|
||||||
|
//const steemAPIURL = "http://las2.local:8090"
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
viper.SetConfigName("steem")
|
log.SetLevel(log.DebugLevel)
|
||||||
viper.AddConfigPath("/etc/steem")
|
var x *BlockFetcher
|
||||||
viper.AddConfigPath("$HOME/.config/steem")
|
x = NewBlockFetcher(&BlockFetcherConfig{
|
||||||
viper.SetEnvPrefix("steem")
|
api: nil,
|
||||||
viper.BindEnv("debug")
|
desiredFetcherThreads: 40,
|
||||||
viper.BindEnv("redis")
|
startBlock: 10000,
|
||||||
viper.BindEnv("apiurl")
|
endBlock: 10005,
|
||||||
viper.ReadInConfig() // Find and read the config file if exists
|
})
|
||||||
logLevel := log.InfoLevel
|
_ = x
|
||||||
if viper.GetBool("debug") == true {
|
}
|
||||||
logLevel = log.DebugLevel
|
|
||||||
}
|
/*
|
||||||
redis := "localhost:6379"
|
func mainx() {
|
||||||
if viper.Get("redis") != nil {
|
|
||||||
redis = viper.GetString("redis")
|
|
||||||
}
|
|
||||||
apiurl := "https://api.steemit.com"
|
|
||||||
if viper.Get("apiurl") != nil {
|
|
||||||
apiurl = viper.GetString("apiurl")
|
|
||||||
}
|
|
||||||
app := NewApp(&appconfig{
|
app := NewApp(&appconfig{
|
||||||
logLevel: logLevel,
|
logLevel: log.DebugLevel,
|
||||||
apiUrl: apiurl,
|
apiUrl: steemAPIURL,
|
||||||
redisUrl: redis,
|
redisUrl: redisUrl,
|
||||||
desiredFetcherThreads: 30,
|
fetcherThreads: 40,
|
||||||
})
|
})
|
||||||
app.main()
|
app.main()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*/
|
||||||
|
16
steemapi.go
16
steemapi.go
@ -9,15 +9,10 @@ type SteemAPI struct {
|
|||||||
rpc *JSONRPC
|
rpc *JSONRPC
|
||||||
}
|
}
|
||||||
|
|
||||||
type SteemAPIShape interface {
|
|
||||||
GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error)
|
|
||||||
GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
var EmptyParams = []string{}
|
var EmptyParams = []string{}
|
||||||
var EmptyParamsRaw, _ = json.Marshal(EmptyParams)
|
var EmptyParamsRaw, _ = json.Marshal(EmptyParams)
|
||||||
|
|
||||||
func NewSteemAPI(url string, options ...func(s SteemAPIShape)) *SteemAPI {
|
func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI {
|
||||||
|
|
||||||
rpc := NewJSONRPC(url, func(x *JSONRPC) {})
|
rpc := NewJSONRPC(url, func(x *JSONRPC) {})
|
||||||
|
|
||||||
@ -34,7 +29,7 @@ func NewSteemAPI(url string, options ...func(s SteemAPIShape)) *SteemAPI {
|
|||||||
|
|
||||||
func (self *SteemAPI) GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) {
|
func (self *SteemAPI) GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) {
|
||||||
var resp DynamicGlobalProperties
|
var resp DynamicGlobalProperties
|
||||||
raw, err := self.rpc.Call("condenser_api.get_dynamic_global_properties", EmptyParamsRaw)
|
raw, err := self.rpc.Call("get_dynamic_global_properties", EmptyParamsRaw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -51,12 +46,9 @@ func (self *SteemAPI) GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse
|
|||||||
// api.steemit.com.
|
// api.steemit.com.
|
||||||
realOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: false}
|
realOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: false}
|
||||||
rop, err := realOpsParams.MarshalJSON()
|
rop, err := realOpsParams.MarshalJSON()
|
||||||
if err != nil {
|
realOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop)
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
rawOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop)
|
|
||||||
var result []OperationObject
|
var result []OperationObject
|
||||||
err = json.Unmarshal(rawOpsResponse, &result)
|
err = json.Unmarshal(realOpsResponse, &result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user