this now works to fetch blocks

This commit is contained in:
Jeffrey Paul 2018-11-11 21:34:55 -08:00
parent 3d6bf1e08f
commit 76d26a6f1e
Signed by: sneak
GPG Key ID: 052443F4DF2A55C2
4 changed files with 33 additions and 7 deletions

7
app.go
View File

@ -28,6 +28,11 @@ func NewApp(config *appconfig) *App {
func (self *App) init(config *appconfig) { func (self *App) init(config *appconfig) {
log.SetLevel(config.logLevel) log.SetLevel(config.logLevel)
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
log.SetFormatter(&log.JSONFormatter{})
//log.SetReportCaller(true)
self.api = NewSteemAPI(config.apiUrl) self.api = NewSteemAPI(config.apiUrl)
self.db = NewSteemDataStore(config.redisUrl) self.db = NewSteemDataStore(config.redisUrl)
self.lock = &sync.Mutex{} self.lock = &sync.Mutex{}
@ -41,7 +46,7 @@ func (self *App) main() {
func (self *App) mainloop() { func (self *App) mainloop() {
log.Infof("using %d fetching threads", self.config.desiredFetcherThreads) log.Infof("using %d fetching threads", self.config.desiredFetcherThreads)
batchSize := uint(1000) batchSize := uint(3000)
//batchSize := uint(10) //batchSize := uint(10)
var start BlockNumber var start BlockNumber
var end BlockNumber var end BlockNumber

View File

@ -107,6 +107,7 @@ WorkLoop:
} else { } else {
// wait a sec and try again // wait a sec and try again
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
log.Infof("error fetching block %d, retry %d (%v)", blockNum, i+1, r.error)
} }
} }
} }
@ -124,6 +125,7 @@ func (self *BlockFetcher) sendWork(b BlockNumber) {
func (self *BlockFetcher) fetch() *[]FetchedBlock { func (self *BlockFetcher) fetch() *[]FetchedBlock {
log.Debugf("blockfetcher beginning fetch") log.Debugf("blockfetcher beginning fetch")
startTime := time.Now()
for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ { for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ {
go self.fetcher(i) go self.fetcher(i)
} }
@ -141,6 +143,21 @@ func (self *BlockFetcher) fetch() *[]FetchedBlock {
case result := <-self.resultsChannel: case result := <-self.resultsChannel:
self.receiveResult(result) self.receiveResult(result)
default: default:
if startTime.Add(30 * time.Second).Before(time.Now()) {
// fetch took too long, return anyway
log.Infof("30s fetcher batch timeout reached, but not done fetching")
// return what we have
var final []FetchedBlock
self.lock.Lock()
for _, value := range self.fetchedBlocks {
final = append(final, *value)
}
self.lock.Unlock()
self = nil //this BlockFetcher is now finished.
return &final
}
if self.Done() == true { if self.Done() == true {
log.Infof("blockfetcher %+v considers itself Done()", self) log.Infof("blockfetcher %+v considers itself Done()", self)
// if we get here, it's because workList is now empty and there // if we get here, it's because workList is now empty and there

View File

@ -11,6 +11,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"time"
) )
type httpRPCClient interface { type httpRPCClient interface {
@ -47,9 +48,14 @@ type JSONRPC struct {
// New create new rpc client with given url // New create new rpc client with given url
func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC { func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC {
netClient := &http.Client{
Timeout: time.Second * 20,
}
rpc := &JSONRPC{ rpc := &JSONRPC{
url: url, url: url,
client: http.DefaultClient, client: netClient,
} }
for _, option := range options { for _, option := range options {
option(rpc) option(rpc)
@ -77,6 +83,7 @@ func (rpc *JSONRPC) Call(method string, params json.RawMessage) (json.RawMessage
defer response.Body.Close() defer response.Body.Close()
} }
if err != nil { if err != nil {
log.Infof("jsonrpc error: %v", err)
return nil, err return nil, err
} }

View File

@ -13,10 +13,7 @@ func main() {
viper.BindEnv("debug") viper.BindEnv("debug")
viper.BindEnv("redis") viper.BindEnv("redis")
viper.BindEnv("apiurl") viper.BindEnv("apiurl")
err := viper.ReadInConfig() // Find and read the config file if exists viper.ReadInConfig() // Find and read the config file if exists
if err != nil {
log.Infof("error reading config file: %s \n", err)
}
logLevel := log.InfoLevel logLevel := log.InfoLevel
if viper.GetBool("debug") == true { if viper.GetBool("debug") == true {
logLevel = log.DebugLevel logLevel = log.DebugLevel
@ -33,7 +30,7 @@ func main() {
logLevel: logLevel, logLevel: logLevel,
apiUrl: apiurl, apiUrl: apiurl,
redisUrl: redis, redisUrl: redis,
desiredFetcherThreads: 40, desiredFetcherThreads: 30,
}) })
app.main() app.main()
} }