169 lines
4.5 KiB
Go
169 lines
4.5 KiB
Go
package main
|
|
|
|
//import "github.com/spf13/viper"
|
|
//import "encoding/json"
|
|
//import "fmt"
|
|
import "sync"
|
|
import "time"
|
|
import "encoding/json"
|
|
import log "github.com/sirupsen/logrus"
|
|
|
|
const steemAPIURL = "https://api.steemit.com"
|
|
|
|
func main() {
|
|
processinit()
|
|
app := NewApp(&appconfig{})
|
|
app.main()
|
|
}
|
|
|
|
func processinit() {
|
|
//FIXME(sneak) use viper to set loglevel
|
|
//log.SetLevel(log.DebugLevel)
|
|
log.SetLevel(log.InfoLevel)
|
|
}
|
|
|
|
type BlockNumber uint64
|
|
|
|
type App struct {
|
|
datastore SteemDataStorer
|
|
api *SteemAPI
|
|
currentNetworkBlockHeight BlockNumber
|
|
currentLocalBlockHeight BlockNumber
|
|
currentFetchingThreads uint
|
|
desiredFetchingThreads uint
|
|
lock *sync.Mutex
|
|
}
|
|
|
|
type appconfig map[string]string
|
|
|
|
func NewApp(config *appconfig) *App {
|
|
self := new(App)
|
|
self.init(config)
|
|
return self
|
|
}
|
|
|
|
func (self *App) init(config *appconfig) {
|
|
self.api = NewSteemAPI(steemAPIURL)
|
|
self.datastore = NewSteemDataStore("./d")
|
|
self.desiredFetchingThreads = 40
|
|
self.currentFetchingThreads = 0
|
|
self.lock = &sync.Mutex{}
|
|
}
|
|
|
|
func (self *App) updateCurrentBlockHeight() {
|
|
h := self.fetchCurrentBlockHeight()
|
|
if h > self.currentNetworkBlockHeight {
|
|
self.lock.Lock()
|
|
defer self.lock.Unlock()
|
|
self.currentNetworkBlockHeight = h
|
|
log.Infof("current block height is now %d", self.currentNetworkBlockHeight)
|
|
}
|
|
}
|
|
|
|
func (self *App) main() {
|
|
log.Infof("steem block data fetcher starting up...")
|
|
self.mainloop()
|
|
}
|
|
|
|
func (self *App) numFetchers() uint {
|
|
self.lock.Lock()
|
|
defer self.lock.Unlock()
|
|
return self.currentFetchingThreads
|
|
}
|
|
|
|
func (self *App) incrFetchers() {
|
|
self.lock.Lock()
|
|
defer self.lock.Unlock()
|
|
self.currentFetchingThreads += 1
|
|
}
|
|
|
|
func (self *App) decrFetchers() {
|
|
self.lock.Lock()
|
|
defer self.lock.Unlock()
|
|
self.currentFetchingThreads -= 1
|
|
}
|
|
|
|
func (self *App) spawnNewFetcher(blockNum BlockNumber) {
|
|
log.Debugf("spawning fetcher for block %d", blockNum)
|
|
go func() {
|
|
// this is so hacky, make a queue like a grownup would you
|
|
time.Sleep(100 * time.Millisecond)
|
|
if self.datastore.HaveOpsForBlock(blockNum) {
|
|
log.Infof("already have ops for block %d, not re-fetching", blockNum)
|
|
return
|
|
}
|
|
self.incrFetchers()
|
|
self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum))
|
|
self.decrFetchers()
|
|
|
|
}()
|
|
}
|
|
|
|
func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) {
|
|
self.datastore.StoreBlockOps(blockNum, blockOps)
|
|
}
|
|
|
|
// note that parallelFetchAndStoreBlocks does not respect the
|
|
// limitation on number of desired fetchers, that is for the caller
|
|
func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) {
|
|
var diff = uint64(end) - uint64(start)
|
|
var i uint64
|
|
for i = 0; i < diff; i++ {
|
|
self.spawnNewFetcher(BlockNumber(uint64(start) + i))
|
|
}
|
|
}
|
|
|
|
func (self *App) mainloop() {
|
|
log.Infof("using %d fetching threads", self.desiredFetchingThreads)
|
|
for {
|
|
self.updateCurrentBlockHeight()
|
|
log.Infof("current number of active fetchers: %d", self.numFetchers())
|
|
time.Sleep(1500 * time.Millisecond)
|
|
self.datastore.SetCurrentBlockHeight()
|
|
localHeight := self.datastore.CurrentBlockHeight()
|
|
log.Infof("our highest fetched block height is %d", localHeight)
|
|
if localHeight < self.currentNetworkBlockHeight {
|
|
// we need to fetch some blocks from the network
|
|
avail := self.desiredFetchingThreads - self.numFetchers()
|
|
diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight)
|
|
log.Infof("we need to fetch %d blocks", diff)
|
|
if uint64(diff) > uint64(avail) {
|
|
self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail)))
|
|
} else {
|
|
// just spawn fetchers for the blocks we don't have
|
|
// spawning will update the number of running fetchers
|
|
self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight)
|
|
}
|
|
}
|
|
//needFetchers := self.desiredFetchingThreads - self.numFetchers()
|
|
}
|
|
}
|
|
|
|
func (self *App) fetchCurrentBlockHeight() BlockNumber {
|
|
r, err := self.api.GetDynamicGlobalProperties()
|
|
if err != nil {
|
|
panic("can't fetch global properties, bailing")
|
|
}
|
|
if r.LastIrreversibleBlockNum < 100 {
|
|
panic("can't fetch global properties, bailing")
|
|
}
|
|
return r.LastIrreversibleBlockNum
|
|
}
|
|
|
|
func (self *App) fetchBlockOps(blockNum BlockNumber) *[]byte {
|
|
r, err := self.api.GetOpsInBlock(blockNum)
|
|
if err != nil {
|
|
// just retry on error
|
|
// sloppy, but works
|
|
return self.fetchBlockOps(blockNum)
|
|
}
|
|
bytes, err := json.Marshal(r)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
count := len(*r)
|
|
log.Infof("got %d operations for block %d", count, blockNum)
|
|
return &bytes
|
|
//self.datastore.writeBlockOps(blockNum, bytes)
|
|
}
|