Compare commits

...

7 Commits

Author SHA1 Message Date
76d26a6f1e
this now works to fetch blocks 2018-11-11 21:34:55 -08:00
3d6bf1e08f
now uses viper for configuration 2018-11-03 08:32:24 -07:00
8ff07a4a0b
minor tweaks 2018-11-03 07:59:46 -07:00
43d43d3748
works now 2018-11-03 07:41:53 -07:00
ff9b67f543
Merge branch 'master' into next 2018-11-01 08:11:41 -07:00
25ccf2bc9e Merge branch 'add-ci' into 'master'
Add ci

See merge request sneakdotberlin/steem-block-db!1
2018-11-01 12:32:28 +00:00
c43df78d55 Add ci 2018-11-01 12:32:28 +00:00
11 changed files with 349 additions and 172 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
steem-block-db

30
.gitlab-ci.yml Normal file
View File

@ -0,0 +1,30 @@
image: golang:1.11
cache:
paths:
- /apt-cache
- /go/src/github.com
- /go/src/golang.org
- /go/src/google.golang.org
- /go/src/gopkg.in
stages:
- test
- build
before_script:
- mkdir /go/src/steem-block-db
- cp $CI_PROJECT_DIR/*.go /go/src/steem-block-db
- cd /go/src/steem-block-db
- GOPATH=/go go get
#unit_tests:
# stage: test
# script:
# - make test
build:
stage: build
script:
- cd /go/src/steem-block-db
- GOPATH=/go go build

View File

@ -1,4 +1,19 @@
default: run GOFILES := $(shell find . -type f -name '*.go' -not -name '*_test.go')
default: test
.PHONY: run build test
run: run:
go run *.go go run $(GOFILES)
build: steem-block-db
steem-block-db: *.go
go build
test:
go test
clean:
rm steem-block-db

153
app.go
View File

@ -1,14 +1,15 @@
package main package main
import "sync" import "sync"
import "time"
import log "github.com/sirupsen/logrus" import log "github.com/sirupsen/logrus"
import "encoding/json"
//import "encoding/json"
type App struct { type App struct {
datastore SteemDataStorer api SteemAPIShape
api *SteemAPI config *appconfig
currentNetworkBlockHeight BlockNumber db SteemDataStorer
currentLocalBlockHeight BlockNumber
lock *sync.Mutex lock *sync.Mutex
} }
@ -16,6 +17,7 @@ type appconfig struct {
logLevel log.Level logLevel log.Level
apiUrl string apiUrl string
redisUrl string redisUrl string
desiredFetcherThreads uint
} }
func NewApp(config *appconfig) *App { func NewApp(config *appconfig) *App {
@ -26,107 +28,80 @@ func NewApp(config *appconfig) *App {
func (self *App) init(config *appconfig) { func (self *App) init(config *appconfig) {
log.SetLevel(config.logLevel) log.SetLevel(config.logLevel)
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
log.SetFormatter(&log.JSONFormatter{})
//log.SetReportCaller(true)
self.api = NewSteemAPI(config.apiUrl) self.api = NewSteemAPI(config.apiUrl)
self.datastore = NewSteemDataStore(config.redisUrl) self.db = NewSteemDataStore(config.redisUrl)
self.lock = &sync.Mutex{} self.lock = &sync.Mutex{}
} self.config = config
func (self *App) updateCurrentBlockHeight() {
h := self.fetchCurrentBlockHeight()
if h > self.currentNetworkBlockHeight {
self.lock.Lock()
defer self.lock.Unlock()
self.currentNetworkBlockHeight = h
log.Infof("current block height is now %d", self.currentNetworkBlockHeight)
}
} }
func (self *App) main() { func (self *App) main() {
log.Infof("steem block data fetcher starting up...") log.Infof("steem block data fetcher starting up...")
//self.mainloop() self.mainloop()
}
/*
func (self *App) numFetchers() uint {
self.lock.Lock()
defer self.lock.Unlock()
return uint(len(*self.fetchingBlocks))
}
func (self *App) spawnNewFetcher(blockNum BlockNumber) {
log.Debugf("spawning fetcher for block %d", blockNum)
go func() {
// this is so hacky, make a queue like a grownup would you
time.Sleep(100 * time.Millisecond)
if self.datastore.HaveOpsForBlock(blockNum) {
log.Infof("already have ops for block %d, not re-fetching", blockNum)
return
}
//self.incrFetchers()
self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum))
//self.decrFetchers()
}()
}
func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) {
self.datastore.StoreBlockOps(blockNum, blockOps)
}
// note that parallelFetchAndStoreBlocks does not respect the
// limitation on number of desired fetchers, that is for the caller
func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) {
var diff = uint64(end) - uint64(start)
var i uint64
for i = 0; i < diff; i++ {
self.spawnNewFetcher(BlockNumber(uint64(start) + i))
}
}
func (self *App) populateFetchers() {
} }
func (self *App) mainloop() { func (self *App) mainloop() {
log.Infof("using %d fetching threads", self.config.desiredFetcherThreads)
log.Infof("using %d fetching threads", self.desiredFetchingThreads) batchSize := uint(3000)
//batchSize := uint(10)
self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() var start BlockNumber
var end BlockNumber
for { for {
self.spawnMoreFetchers() self.db.SetCurrentNetworkBlockHeight(self.fetchCurrentBlockHeight())
self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() if self.db.CurrentNetworkBlockHeight() == self.db.CurrentLocalBlockHeight() {
time.Sleep(1000 * time.Millisecond) //we are synced
} time.Sleep(1 * time.Second)
continue
} }
log.Infof("current network block height = %d", self.db.CurrentNetworkBlockHeight())
log.Infof("current local block height = %d", self.db.CurrentLocalBlockHeight())
// we are not synced
func (self *App) spawnMoreFetchers() { // how far behind are we?
} countMustFetch := uint(self.db.CurrentNetworkBlockHeight() - self.db.CurrentLocalBlockHeight())
log.Infof("we are %d blocks behind", countMustFetch)
/* start = self.db.CurrentLocalBlockHeight() + 1
self.updateCurrentBlockHeight() log.Infof("beginning fetch with start block %d", start)
log.Infof("current number of active fetchers: %d", self.numFetchers()) if countMustFetch <= batchSize {
time.Sleep(1500 * time.Millisecond) end = self.db.CurrentNetworkBlockHeight()
self.datastore.SetCurrentBlockHeight() log.Infof("fetch is within our batch size (%d), end block for fetch batch is %d", batchSize, end)
localHeight := self.datastore.CurrentBlockHeight()
log.Infof("our highest fetched block height is %d", localHeight)
if localHeight < self.currentNetworkBlockHeight {
// we need to fetch some blocks from the network
avail := self.desiredFetchingThreads - self.numFetchers()
diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight)
log.Infof("we need to fetch %d blocks", diff)
if uint64(diff) > uint64(avail) {
self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail)))
} else { } else {
// just spawn fetchers for the blocks we don't have end = BlockNumber(uint(start) + uint(batchSize) - uint(1))
// spawning will update the number of running fetchers log.Infof("fetch is too large for batch size (%d), end block for fetch batch is %d", batchSize, end)
self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight)
} }
}
//needFetchers := self.desiredFetchingThreads - self.numFetchers()
*/ bf := NewBlockFetcher(&BlockFetcherConfig{
api: self.api.(*SteemAPI),
desiredFetcherThreads: self.config.desiredFetcherThreads,
startBlock: start,
endBlock: end,
})
blocks := bf.fetch()
log.Infof("blockfetcher has returned")
self.pushBlocks(blocks)
}
}
func (self *App) pushBlocks(newBlocks *[]FetchedBlock) {
counter := 0
for _, newBlock := range *newBlocks {
counter += 1
err := self.db.StoreBlockOps(newBlock.blockNumber, newBlock.blockData)
if err != nil {
log.Panic(err)
}
}
log.Infof("pushed %d new blocks to db", counter)
self.db.UpdateCurrentLocalBlockHeight()
}
func (self *App) fetchCurrentBlockHeight() BlockNumber { func (self *App) fetchCurrentBlockHeight() BlockNumber {
r, err := self.api.GetDynamicGlobalProperties() r, err := self.api.GetDynamicGlobalProperties()
if err != nil { if err != nil {
log.Panicf("can't fetch global properties, bailing. err: %s", err) log.Panicf("can't fetch global properties, bailing. err: %s", err)

View File

@ -1,6 +1,8 @@
package main package main
import "encoding/json"
import "sync" import "sync"
import "time"
import log "github.com/sirupsen/logrus" import log "github.com/sirupsen/logrus"
type FetchedBlock struct { type FetchedBlock struct {
@ -14,7 +16,7 @@ type BlockFetcher struct {
desiredFetcherThreads uint desiredFetcherThreads uint
wantBlocks map[BlockNumber]bool wantBlocks map[BlockNumber]bool
fetchingBlocks map[BlockNumber]bool fetchingBlocks map[BlockNumber]bool
fetchedBlocks *[]FetchedBlock fetchedBlocks map[BlockNumber]*FetchedBlock
lock *sync.Mutex lock *sync.Mutex
workChannel chan BlockNumber workChannel chan BlockNumber
resultsChannel chan *FetchedBlock resultsChannel chan *FetchedBlock
@ -43,47 +45,52 @@ func (self *BlockFetcher) init(config *BlockFetcherConfig) {
self.lock = &sync.Mutex{} self.lock = &sync.Mutex{}
self.api = config.api self.api = config.api
self.desiredFetcherThreads = config.desiredFetcherThreads self.desiredFetcherThreads = config.desiredFetcherThreads
self.workChannel = make(chan BlockNumber) self.workChannel = make(chan BlockNumber)
self.resultsChannel = make(chan *FetchedBlock) self.resultsChannel = make(chan *FetchedBlock)
self.wantBlocks = make(map[BlockNumber]bool)
self.fetchedBlocks = make(map[BlockNumber]*FetchedBlock)
diff := int(uint(config.endBlock) - uint(config.startBlock)) diff := int(uint(config.endBlock) - uint(config.startBlock))
log.Debugf("diff is %d", diff)
for i := 0; i <= diff; i++ { for i := 0; i <= diff; i++ {
self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true
} }
log.Debugf("wantblocks[] is now %v", self.wantBlocks) log.Debugf("wantblocks[] is now %v", self.wantBlocks)
} }
func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) { func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) {
self.lock.Lock()
defer self.lock.Unlock()
if self.wantBlocks[blockNum] == false { if self.wantBlocks[blockNum] == false {
log.Panicf("shouldn't happen") log.Panicf("shouldn't happen")
} }
self.lock.Lock()
defer self.lock.Unlock()
delete(self.wantBlocks, blockNum) delete(self.wantBlocks, blockNum)
} }
func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) { func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) {
self.lock.Lock()
defer self.lock.Unlock()
if self.fetchingBlocks[blockNum] == false { if self.fetchingBlocks[blockNum] == false {
log.Panicf("shouldn't happen") log.Panicf("shouldn't happen")
} }
self.lock.Lock()
defer self.lock.Unlock()
delete(self.fetchingBlocks, blockNum) delete(self.fetchingBlocks, blockNum)
} }
func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) { func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) {
self.lock.Lock()
defer self.lock.Unlock()
if self.fetchingBlocks[blockNum] == true { if self.fetchingBlocks[blockNum] == true {
log.Panicf("shouldn't happen") log.Panicf("shouldn't happen")
} }
self.lock.Lock() if self.fetchingBlocks == nil {
defer self.lock.Unlock() self.fetchingBlocks = make(map[BlockNumber]bool)
}
if self.fetchingBlocks[blockNum] == false { if self.fetchingBlocks[blockNum] == false {
self.fetchingBlocks[blockNum] = true self.fetchingBlocks[blockNum] = true
} }
} }
func (self *BlockFetcher) fetcher(index int) { func (self *BlockFetcher) fetcher(index uint) {
log.Debugf("fetcher thread %d starting", index) log.Debugf("fetcher thread %d starting", index)
WorkLoop: WorkLoop:
@ -100,50 +107,88 @@ WorkLoop:
} else { } else {
// wait a sec and try again // wait a sec and try again
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
log.Infof("error fetching block %d, retry %d (%v)", blockNum, i+1, r.error)
} }
} }
} }
log.Debugf("fetcher thread %d ending", index) log.Debugf("fetcher thread %d ending", index)
} }
func (self *BlockFetcher) fetch() *[]FetchedBlock { func (self *BlockFetcher) sendWork(b BlockNumber) {
for i := 1; i < self.desiredFetcherThreads+1; i++ {
go self.fetcher(i)
}
for blockNum, _ := range self.wantBlocks {
// yay cheap goroutines, let them block on the unbuffered channel
go func() { go func() {
log.Debugf("waiting to send blockNum %d into the work channel", blockNum) // yay cheap goroutines, let them block on the unbuffered channel
self.workChannel <- blockNum log.Debugf("waiting to send blockNum %d into the work channel", b)
log.Debugf("sent blockNum %d into the work channel", blockNum) self.workChannel <- b
log.Debugf("sent blockNum %d into the work channel", b)
}() }()
} }
func (self *BlockFetcher) fetch() *[]FetchedBlock {
log.Debugf("blockfetcher beginning fetch")
startTime := time.Now()
for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ {
go self.fetcher(i)
}
self.lock.Lock()
for blockNum := range self.wantBlocks {
self.sendWork(blockNum)
}
self.lock.Unlock()
// now we have to start reading from the unbuffered resultsChannel // now we have to start reading from the unbuffered resultsChannel
// otherwise the workers will block when returning results // otherwise the workers will block when returning results
for {
select { select {
case result := <-self.resultsChannel: case result := <-self.resultsChannel:
self.receiveResult(result) self.receiveResult(result)
default: default:
if startTime.Add(30 * time.Second).Before(time.Now()) {
// fetch took too long, return anyway
log.Infof("30s fetcher batch timeout reached, but not done fetching")
// return what we have
var final []FetchedBlock
self.lock.Lock()
for _, value := range self.fetchedBlocks {
final = append(final, *value)
}
self.lock.Unlock()
self = nil //this BlockFetcher is now finished.
return &final
}
if self.Done() == true { if self.Done() == true {
log.Infof("blockfetcher %+v considers itself Done()", self)
// if we get here, it's because workList is now empty and there // if we get here, it's because workList is now empty and there
// are no more results in the results channel. // are no more results in the results channel.
close(self.workChannel) // shut down the workers close(self.workChannel) // shut down the workers
result := self.fetchedBlocks var final []FetchedBlock
self = nil //this BlockFetcher is now finished. self.lock.Lock()
return result for _, value := range self.fetchedBlocks {
final = append(final, *value)
} }
self.lock.Unlock()
self = nil //this BlockFetcher is now finished.
return &final
}
// in this case we are not done but got nothing from the result
// channel so just wait a little bit to get more results and
// check the channel again
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
//FIXME(sneak) we maybe need to handle a case here where wantBlocks never //FIXME(sneak) we maybe need to handle a case here where wantBlocks never
//empties but workers need to be re-dispatched.. //empties but workers need to be re-dispatched..
} }
} }
log.Panicf("this shouldn't happen")
return nil //shouldn't happen, return should happen from above
}
func (self *BlockFetcher) receiveResult(r *FetchedBlock) { func (self *BlockFetcher) receiveResult(r *FetchedBlock) {
log.Debugf("got result for blocknum %d", r.blockNumber) log.Debugf("got result for blocknum %d", r.blockNumber)
self.removeFetchingBlock(r.blockNumber) self.removeFetchingBlock(r.blockNumber)
self.lock.Lock() self.lock.Lock()
self.fetchedBlocks = append(self.fetchedBlocks, r) self.fetchedBlocks[r.blockNumber] = r
self.lock.Unlock() self.lock.Unlock()
self.removeWantBlock(r.blockNumber) self.removeWantBlock(r.blockNumber)
} }
@ -156,16 +201,16 @@ func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *Fetche
} }
r, err := self.api.GetOpsInBlock(blockNum) r, err := self.api.GetOpsInBlock(blockNum)
if err != nil { if err != nil {
result.error = err result.error = &err
return result return result
} }
bytes, err := json.Marshal(r) bytes, err := json.Marshal(r)
if err != nil { if err != nil {
result.error = err result.error = &err
return result return result
} }
count := len(*r) count := len(*r)
log.Infof("got %d operations for block %d", count, blockNum) log.Infof("got %d operations for block %d fetched from network", count, blockNum)
result.blockData = &bytes result.blockData = &bytes
result.error = nil // make sure this is nil if it worked result.error = nil // make sure this is nil if it worked
return result return result

53
blockfetcher_test.go Normal file
View File

@ -0,0 +1,53 @@
package main
import "testing"
import log "github.com/sirupsen/logrus"
/*
import "github.com/stretchr/testify/mock"
type MockedSteemAPI struct {
mock.Mock
}
func (m *MockedSteemAPI) DoSomething(number int) (bool, error) {
args := m.Called(number)
return args.Bool(0), args.Error(1)
}
*/
func TestBlockfetcherInit(t *testing.T) {
log.SetLevel(log.DebugLevel)
bf := NewBlockFetcher(&BlockFetcherConfig{
api: nil,
desiredFetcherThreads: 1,
startBlock: 10000,
endBlock: 10005,
})
if bf == nil {
t.Errorf("could not instantiate blockfetcher")
}
}
//can't actually fetch yet until we mock the api
/*
func TestBlockfetcherFetch(t *testing.T) {
log.SetLevel(log.DebugLevel)
bf := NewBlockFetcher(&BlockFetcherConfig{
api: nil,
desiredFetcherThreads: 1,
startBlock: 10000,
endBlock: 10005,
})
bf.fetch()
}
*/

61
db.go
View File

@ -10,8 +10,11 @@ import "strconv"
const appPrefix = "sbf" const appPrefix = "sbf"
type SteemDataStorer interface { type SteemDataStorer interface {
SetCurrentBlockHeight(BlockNumber) error SetCurrentNetworkBlockHeight(BlockNumber) error
CurrentBlockHeight() BlockNumber SetCurrentLocalBlockHeight(BlockNumber) error
UpdateCurrentLocalBlockHeight()
CurrentLocalBlockHeight() BlockNumber
CurrentNetworkBlockHeight() BlockNumber
HaveOpsForBlock(BlockNumber) bool HaveOpsForBlock(BlockNumber) bool
StoreBlockOps(BlockNumber, *[]byte) error StoreBlockOps(BlockNumber, *[]byte) error
} }
@ -24,11 +27,35 @@ type SteemDataStore struct {
func NewSteemDataStore(hostname string) SteemDataStorer { func NewSteemDataStore(hostname string) SteemDataStorer {
self := new(SteemDataStore) self := new(SteemDataStore)
self.kv = NewRedisKVStore(hostname) self.kv = NewRedisKVStore(hostname)
self.init()
return self return self
} }
func (self *SteemDataStore) SetCurrentBlockHeight(blockNum BlockNumber) error { func (self *SteemDataStore) init() {
keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) self.UpdateCurrentLocalBlockHeight()
}
func (self *SteemDataStore) UpdateCurrentLocalBlockHeight() {
cur := self.CurrentLocalBlockHeight()
next := self.FindHighestContiguousBlockInDb(cur)
if next != cur {
err := self.SetCurrentLocalBlockHeight(next)
log.Infof("current highest contig block in db is now %d", next)
if err != nil {
log.Panic(err)
}
return
}
}
func (self *SteemDataStore) SetCurrentLocalBlockHeight(blockNum BlockNumber) error {
keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix)
value := fmt.Sprintf("%d", blockNum)
return self.kv.Put(&keyname, &value)
}
func (self *SteemDataStore) SetCurrentNetworkBlockHeight(blockNum BlockNumber) error {
keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix)
value := fmt.Sprintf("%d", blockNum) value := fmt.Sprintf("%d", blockNum)
return self.kv.Put(&keyname, &value) return self.kv.Put(&keyname, &value)
} }
@ -42,7 +69,10 @@ func (self *SteemDataStore) FindHighestContiguousBlockInDb(from BlockNumber) Blo
for { for {
try = BlockNumber(uint64(last) + 1) try = BlockNumber(uint64(last) + 1)
keyname = fmt.Sprintf("%s.ops_in_block.%d", appPrefix, try) keyname = fmt.Sprintf("%s.ops_in_block.%d", appPrefix, try)
exists, _ := self.kv.Exists(&keyname) exists, err := self.kv.Exists(&keyname)
if err != nil {
log.Panic(err)
}
if exists == false { if exists == false {
log.Debugf("cannot find block %d in db, highest found is %d", try, last) log.Debugf("cannot find block %d in db, highest found is %d", try, last)
return last return last
@ -64,14 +94,27 @@ func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool {
return exists return exists
} }
func (self *SteemDataStore) CurrentBlockHeight() BlockNumber { func (self *SteemDataStore) CurrentNetworkBlockHeight() BlockNumber {
keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix)
val, err := self.kv.Get(&keyname) val, err := self.kv.Get(&keyname)
if err != nil { if err != nil {
// assume this is key not found, initialize key to default // assume this is key not found, initialize key to default
self.SetCurrentBlockHeight(0) self.SetCurrentNetworkBlockHeight(0)
// retry // retry
return self.CurrentBlockHeight() return self.CurrentNetworkBlockHeight()
}
intval, err := strconv.ParseUint(*val, 10, 64)
return BlockNumber(intval)
}
func (self *SteemDataStore) CurrentLocalBlockHeight() BlockNumber {
keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix)
val, err := self.kv.Get(&keyname)
if err != nil {
// assume this is key not found, initialize key to default
self.SetCurrentLocalBlockHeight(0)
// retry
return self.CurrentLocalBlockHeight()
} }
intval, err := strconv.ParseUint(*val, 10, 64) intval, err := strconv.ParseUint(*val, 10, 64)
return BlockNumber(intval) return BlockNumber(intval)

View File

@ -11,6 +11,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"time"
) )
type httpRPCClient interface { type httpRPCClient interface {
@ -47,9 +48,14 @@ type JSONRPC struct {
// New create new rpc client with given url // New create new rpc client with given url
func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC { func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC {
netClient := &http.Client{
Timeout: time.Second * 20,
}
rpc := &JSONRPC{ rpc := &JSONRPC{
url: url, url: url,
client: http.DefaultClient, client: netClient,
} }
for _, option := range options { for _, option := range options {
option(rpc) option(rpc)
@ -77,6 +83,7 @@ func (rpc *JSONRPC) Call(method string, params json.RawMessage) (json.RawMessage
defer response.Body.Close() defer response.Body.Close()
} }
if err != nil { if err != nil {
log.Infof("jsonrpc error: %v", err)
return nil, err return nil, err
} }

View File

@ -115,7 +115,7 @@ func (kv *BadgerKVStore) Put(key *string, value *string) error {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
err = txn.Commit(nil) err = txn.Commit()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

50
main.go
View File

@ -1,36 +1,36 @@
package main package main
//import "github.com/spf13/viper" import "github.com/spf13/viper"
//import "encoding/json"
import log "github.com/sirupsen/logrus" import log "github.com/sirupsen/logrus"
const steemAPIURL = "https://api.steemit.com" // STEEM_APIURL=https://api.steem.house ./steem-block-db
const redisUrl = "localhost:6379"
//const steemAPIURL = "http://10.100.202.175:8090"
//const steemAPIURL = "http://las2.local:8090"
func main() { func main() {
log.SetLevel(log.DebugLevel) viper.SetConfigName("steem")
var x *BlockFetcher viper.AddConfigPath("/etc/steem")
x = NewBlockFetcher(&BlockFetcherConfig{ viper.AddConfigPath("$HOME/.config/steem")
api: nil, viper.SetEnvPrefix("steem")
desiredFetcherThreads: 40, viper.BindEnv("debug")
startBlock: 10000, viper.BindEnv("redis")
endBlock: 10005, viper.BindEnv("apiurl")
}) viper.ReadInConfig() // Find and read the config file if exists
_ = x logLevel := log.InfoLevel
if viper.GetBool("debug") == true {
logLevel = log.DebugLevel
}
redis := "localhost:6379"
if viper.Get("redis") != nil {
redis = viper.GetString("redis")
}
apiurl := "https://api.steemit.com"
if viper.Get("apiurl") != nil {
apiurl = viper.GetString("apiurl")
} }
/*
func mainx() {
app := NewApp(&appconfig{ app := NewApp(&appconfig{
logLevel: log.DebugLevel, logLevel: logLevel,
apiUrl: steemAPIURL, apiUrl: apiurl,
redisUrl: redisUrl, redisUrl: redis,
fetcherThreads: 40, desiredFetcherThreads: 30,
}) })
app.main() app.main()
} }
*/

View File

@ -9,10 +9,15 @@ type SteemAPI struct {
rpc *JSONRPC rpc *JSONRPC
} }
type SteemAPIShape interface {
GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error)
GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse, error)
}
var EmptyParams = []string{} var EmptyParams = []string{}
var EmptyParamsRaw, _ = json.Marshal(EmptyParams) var EmptyParamsRaw, _ = json.Marshal(EmptyParams)
func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI { func NewSteemAPI(url string, options ...func(s SteemAPIShape)) *SteemAPI {
rpc := NewJSONRPC(url, func(x *JSONRPC) {}) rpc := NewJSONRPC(url, func(x *JSONRPC) {})
@ -29,7 +34,7 @@ func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI {
func (self *SteemAPI) GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) { func (self *SteemAPI) GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) {
var resp DynamicGlobalProperties var resp DynamicGlobalProperties
raw, err := self.rpc.Call("get_dynamic_global_properties", EmptyParamsRaw) raw, err := self.rpc.Call("condenser_api.get_dynamic_global_properties", EmptyParamsRaw)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -46,9 +51,12 @@ func (self *SteemAPI) GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse
// api.steemit.com. // api.steemit.com.
realOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: false} realOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: false}
rop, err := realOpsParams.MarshalJSON() rop, err := realOpsParams.MarshalJSON()
realOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop) if err != nil {
return nil, err
}
rawOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop)
var result []OperationObject var result []OperationObject
err = json.Unmarshal(realOpsResponse, &result) err = json.Unmarshal(rawOpsResponse, &result)
if err != nil { if err != nil {
return nil, err return nil, err
} }