steem-block-db/main.go

169 lines
4.5 KiB
Go
Raw Normal View History

2018-10-04 18:41:39 +00:00
package main
2018-10-31 08:48:53 +00:00
//import "github.com/spf13/viper"
//import "encoding/json"
//import "fmt"
import "sync"
import "time"
2018-10-31 09:44:19 +00:00
import "encoding/json"
2018-10-31 08:48:53 +00:00
import log "github.com/sirupsen/logrus"
2018-10-04 18:41:39 +00:00
2018-10-18 08:39:41 +00:00
const steemAPIURL = "https://api.steemit.com"
2018-10-18 07:09:30 +00:00
2018-10-04 18:41:39 +00:00
func main() {
2018-10-31 08:48:53 +00:00
processinit()
app := NewApp(&appconfig{})
app.main()
}
func processinit() {
//FIXME(sneak) use viper to set loglevel
2018-10-31 10:25:37 +00:00
//log.SetLevel(log.DebugLevel)
log.SetLevel(log.InfoLevel)
2018-10-31 08:48:53 +00:00
}
2018-10-31 09:44:19 +00:00
type BlockNumber uint64
2018-10-31 08:48:53 +00:00
type App struct {
2018-10-31 09:44:19 +00:00
datastore SteemDataStorer
2018-10-31 08:48:53 +00:00
api *SteemAPI
2018-10-31 09:44:19 +00:00
currentNetworkBlockHeight BlockNumber
currentLocalBlockHeight BlockNumber
currentFetchingThreads uint
desiredFetchingThreads uint
2018-10-31 08:48:53 +00:00
lock *sync.Mutex
}
type appconfig map[string]string
func NewApp(config *appconfig) *App {
self := new(App)
self.init(config)
return self
}
func (self *App) init(config *appconfig) {
self.api = NewSteemAPI(steemAPIURL)
self.datastore = NewSteemDataStore("./d")
2018-10-31 11:05:25 +00:00
self.desiredFetchingThreads = 40
2018-10-31 08:48:53 +00:00
self.currentFetchingThreads = 0
self.lock = &sync.Mutex{}
}
2018-10-25 13:12:43 +00:00
2018-10-31 08:48:53 +00:00
func (self *App) updateCurrentBlockHeight() {
h := self.fetchCurrentBlockHeight()
if h > self.currentNetworkBlockHeight {
self.lock.Lock()
defer self.lock.Unlock()
self.currentNetworkBlockHeight = h
log.Infof("current block height is now %d", self.currentNetworkBlockHeight)
}
}
func (self *App) main() {
log.Infof("steem block data fetcher starting up...")
self.mainloop()
}
2018-10-31 09:44:19 +00:00
func (self *App) numFetchers() uint {
2018-10-31 08:48:53 +00:00
self.lock.Lock()
defer self.lock.Unlock()
return self.currentFetchingThreads
}
2018-10-04 18:41:39 +00:00
2018-10-31 08:48:53 +00:00
func (self *App) incrFetchers() {
self.lock.Lock()
defer self.lock.Unlock()
self.currentFetchingThreads += 1
}
2018-10-04 18:41:39 +00:00
2018-10-31 08:48:53 +00:00
func (self *App) decrFetchers() {
self.lock.Lock()
defer self.lock.Unlock()
self.currentFetchingThreads -= 1
}
2018-10-28 16:31:26 +00:00
2018-10-31 09:44:19 +00:00
func (self *App) spawnNewFetcher(blockNum BlockNumber) {
2018-10-31 11:05:25 +00:00
log.Debugf("spawning fetcher for block %d", blockNum)
2018-10-31 09:44:19 +00:00
go func() {
2018-10-31 11:05:25 +00:00
// this is so hacky, make a queue like a grownup would you
time.Sleep(100 * time.Millisecond)
if self.datastore.HaveOpsForBlock(blockNum) {
log.Infof("already have ops for block %d, not re-fetching", blockNum)
return
}
2018-10-31 09:44:19 +00:00
self.incrFetchers()
self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum))
self.decrFetchers()
2018-10-31 10:25:37 +00:00
2018-10-31 09:44:19 +00:00
}()
}
func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) {
self.datastore.StoreBlockOps(blockNum, blockOps)
}
// note that parallelFetchAndStoreBlocks does not respect the
// limitation on number of desired fetchers, that is for the caller
func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) {
var diff = uint64(end) - uint64(start)
var i uint64
for i = 0; i < diff; i++ {
self.spawnNewFetcher(BlockNumber(uint64(start) + i))
}
2018-10-28 16:31:26 +00:00
}
2018-10-31 08:48:53 +00:00
func (self *App) mainloop() {
2018-10-31 09:44:19 +00:00
log.Infof("using %d fetching threads", self.desiredFetchingThreads)
2018-10-31 08:48:53 +00:00
for {
self.updateCurrentBlockHeight()
2018-10-31 11:05:25 +00:00
log.Infof("current number of active fetchers: %d", self.numFetchers())
time.Sleep(1500 * time.Millisecond)
self.datastore.SetCurrentBlockHeight()
2018-10-31 09:44:19 +00:00
localHeight := self.datastore.CurrentBlockHeight()
2018-10-31 10:25:37 +00:00
log.Infof("our highest fetched block height is %d", localHeight)
2018-10-31 09:44:19 +00:00
if localHeight < self.currentNetworkBlockHeight {
// we need to fetch some blocks from the network
avail := self.desiredFetchingThreads - self.numFetchers()
diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight)
log.Infof("we need to fetch %d blocks", diff)
if uint64(diff) > uint64(avail) {
2018-10-31 10:25:37 +00:00
self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail)))
2018-10-31 09:44:19 +00:00
} else {
// just spawn fetchers for the blocks we don't have
// spawning will update the number of running fetchers
self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight)
}
}
//needFetchers := self.desiredFetchingThreads - self.numFetchers()
2018-10-31 08:48:53 +00:00
}
}
2018-10-31 09:44:19 +00:00
func (self *App) fetchCurrentBlockHeight() BlockNumber {
2018-10-31 08:48:53 +00:00
r, err := self.api.GetDynamicGlobalProperties()
if err != nil {
panic("can't fetch global properties, bailing")
}
if r.LastIrreversibleBlockNum < 100 {
panic("can't fetch global properties, bailing")
}
return r.LastIrreversibleBlockNum
}
2018-10-31 09:44:19 +00:00
func (self *App) fetchBlockOps(blockNum BlockNumber) *[]byte {
r, err := self.api.GetOpsInBlock(blockNum)
2018-10-28 16:31:26 +00:00
if err != nil {
2018-10-31 11:05:25 +00:00
// just retry on error
// sloppy, but works
return self.fetchBlockOps(blockNum)
2018-10-28 16:31:26 +00:00
}
bytes, err := json.Marshal(r)
if err != nil {
panic(err)
}
2018-10-31 11:05:25 +00:00
count := len(*r)
log.Infof("got %d operations for block %d", count, blockNum)
2018-10-31 09:44:19 +00:00
return &bytes
//self.datastore.writeBlockOps(blockNum, bytes)
2018-10-28 16:31:26 +00:00
}