diff --git a/app.go b/app.go index 70198d1..ac10e68 100644 --- a/app.go +++ b/app.go @@ -28,6 +28,11 @@ func NewApp(config *appconfig) *App { func (self *App) init(config *appconfig) { log.SetLevel(config.logLevel) + log.SetFormatter(&log.TextFormatter{ + FullTimestamp: true, + }) + log.SetFormatter(&log.JSONFormatter{}) + //log.SetReportCaller(true) self.api = NewSteemAPI(config.apiUrl) self.db = NewSteemDataStore(config.redisUrl) self.lock = &sync.Mutex{} @@ -41,7 +46,7 @@ func (self *App) main() { func (self *App) mainloop() { log.Infof("using %d fetching threads", self.config.desiredFetcherThreads) - batchSize := uint(1000) + batchSize := uint(3000) //batchSize := uint(10) var start BlockNumber var end BlockNumber diff --git a/blockfetcher.go b/blockfetcher.go index 1461fa1..7dcd4c1 100644 --- a/blockfetcher.go +++ b/blockfetcher.go @@ -107,6 +107,7 @@ WorkLoop: } else { // wait a sec and try again time.Sleep(1 * time.Second) + log.Infof("error fetching block %d, retry %d (%v)", blockNum, i+1, r.error) } } } @@ -124,6 +125,7 @@ func (self *BlockFetcher) sendWork(b BlockNumber) { func (self *BlockFetcher) fetch() *[]FetchedBlock { log.Debugf("blockfetcher beginning fetch") + startTime := time.Now() for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ { go self.fetcher(i) } @@ -141,6 +143,21 @@ func (self *BlockFetcher) fetch() *[]FetchedBlock { case result := <-self.resultsChannel: self.receiveResult(result) default: + + if startTime.Add(30 * time.Second).Before(time.Now()) { + // fetch took too long, return anyway + log.Infof("30s fetcher batch timeout reached, but not done fetching") + // return what we have + var final []FetchedBlock + self.lock.Lock() + for _, value := range self.fetchedBlocks { + final = append(final, *value) + } + self.lock.Unlock() + self = nil //this BlockFetcher is now finished. + return &final + } + if self.Done() == true { log.Infof("blockfetcher %+v considers itself Done()", self) // if we get here, it's because workList is now empty and there diff --git a/jsonrpc.go b/jsonrpc.go index d2fa802..fff6799 100644 --- a/jsonrpc.go +++ b/jsonrpc.go @@ -11,6 +11,7 @@ import ( "io" "io/ioutil" "net/http" + "time" ) type httpRPCClient interface { @@ -47,9 +48,14 @@ type JSONRPC struct { // New create new rpc client with given url func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC { + + netClient := &http.Client{ + Timeout: time.Second * 20, + } + rpc := &JSONRPC{ url: url, - client: http.DefaultClient, + client: netClient, } for _, option := range options { option(rpc) @@ -77,6 +83,7 @@ func (rpc *JSONRPC) Call(method string, params json.RawMessage) (json.RawMessage defer response.Body.Close() } if err != nil { + log.Infof("jsonrpc error: %v", err) return nil, err } diff --git a/main.go b/main.go index f8d1024..3d4de54 100644 --- a/main.go +++ b/main.go @@ -13,10 +13,7 @@ func main() { viper.BindEnv("debug") viper.BindEnv("redis") viper.BindEnv("apiurl") - err := viper.ReadInConfig() // Find and read the config file if exists - if err != nil { - log.Infof("error reading config file: %s \n", err) - } + viper.ReadInConfig() // Find and read the config file if exists logLevel := log.InfoLevel if viper.GetBool("debug") == true { logLevel = log.DebugLevel @@ -33,7 +30,7 @@ func main() { logLevel: logLevel, apiUrl: apiurl, redisUrl: redis, - desiredFetcherThreads: 40, + desiredFetcherThreads: 30, }) app.main() }