@@ -0,0 +1,160 @@ | |||
package main | |||
type FetchedBlock struct { | |||
blockNumber BlockNumber | |||
data []byte | |||
} | |||
type App struct { | |||
datastore SteemDataStorer | |||
api *SteemAPI | |||
currentNetworkBlockHeight BlockNumber | |||
currentLocalBlockHeight BlockNumber | |||
desiredFetcherThreads uint | |||
wantBlocks []BlockNumber | |||
fetchingBlocks []BlockNumber | |||
fetchedBlocks *[]FetchedBlock | |||
lock *sync.Mutex | |||
} | |||
type appconfig struct { | |||
logLevel log.Level | |||
apiUrl string | |||
redisUrl string | |||
} | |||
func NewApp(config *appconfig) *App { | |||
self := new(App) | |||
self.init(config) | |||
return self | |||
} | |||
func (self *App) init(config *appconfig) { | |||
log.SetLevel(config.logLevel) | |||
self.api = NewSteemAPI(config.apiUrl) | |||
self.datastore = NewSteemDataStore(config.redisUrl) | |||
self.desiredFetcherThreads = config.fetcherThreads | |||
self.lock = &sync.Mutex{} | |||
} | |||
func (self *App) updateCurrentBlockHeight() { | |||
h := self.fetchCurrentBlockHeight() | |||
if h > self.currentNetworkBlockHeight { | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
self.currentNetworkBlockHeight = h | |||
log.Infof("current block height is now %d", self.currentNetworkBlockHeight) | |||
} | |||
} | |||
func (self *App) main() { | |||
log.Infof("steem block data fetcher starting up...") | |||
self.mainloop() | |||
} | |||
func (self *App) numFetchers() uint { | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
return uint(len(*self.fetchingBlocks)) | |||
} | |||
func (self *App) spawnNewFetcher(blockNum BlockNumber) { | |||
log.Debugf("spawning fetcher for block %d", blockNum) | |||
go func() { | |||
// this is so hacky, make a queue like a grownup would you | |||
time.Sleep(100 * time.Millisecond) | |||
if self.datastore.HaveOpsForBlock(blockNum) { | |||
log.Infof("already have ops for block %d, not re-fetching", blockNum) | |||
return | |||
} | |||
//self.incrFetchers() | |||
self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum)) | |||
//self.decrFetchers() | |||
}() | |||
} | |||
func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) { | |||
self.datastore.StoreBlockOps(blockNum, blockOps) | |||
} | |||
// note that parallelFetchAndStoreBlocks does not respect the | |||
// limitation on number of desired fetchers, that is for the caller | |||
func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) { | |||
var diff = uint64(end) - uint64(start) | |||
var i uint64 | |||
for i = 0; i < diff; i++ { | |||
self.spawnNewFetcher(BlockNumber(uint64(start) + i)) | |||
} | |||
} | |||
func (self *App) populateFetchers() { | |||
} | |||
func (self *App) mainloop() { | |||
log.Infof("using %d fetching threads", self.desiredFetchingThreads) | |||
self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() | |||
for { | |||
self.spawnMoreFetchers() | |||
self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() | |||
time.Sleep(1000 * time.Millisecond) | |||
} | |||
} | |||
func (self *App) spawnMoreFetchers() { | |||
} | |||
/* | |||
self.updateCurrentBlockHeight() | |||
log.Infof("current number of active fetchers: %d", self.numFetchers()) | |||
time.Sleep(1500 * time.Millisecond) | |||
self.datastore.SetCurrentBlockHeight() | |||
localHeight := self.datastore.CurrentBlockHeight() | |||
log.Infof("our highest fetched block height is %d", localHeight) | |||
if localHeight < self.currentNetworkBlockHeight { | |||
// we need to fetch some blocks from the network | |||
avail := self.desiredFetchingThreads - self.numFetchers() | |||
diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight) | |||
log.Infof("we need to fetch %d blocks", diff) | |||
if uint64(diff) > uint64(avail) { | |||
self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail))) | |||
} else { | |||
// just spawn fetchers for the blocks we don't have | |||
// spawning will update the number of running fetchers | |||
self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight) | |||
} | |||
} | |||
//needFetchers := self.desiredFetchingThreads - self.numFetchers() | |||
*/ | |||
func (self *App) fetchCurrentBlockHeight() BlockNumber { | |||
r, err := self.api.GetDynamicGlobalProperties() | |||
if err != nil { | |||
log.Panicf("can't fetch global properties, bailing. err: %s", err) | |||
} | |||
if r.LastIrreversibleBlockNum < 100 { | |||
log.Panicf("can't fetch global properties, bailing") | |||
} | |||
return r.LastIrreversibleBlockNum | |||
} | |||
func (self *App) fetchBlockOps(blockNum BlockNumber) *[]byte { | |||
r, err := self.api.GetOpsInBlock(blockNum) | |||
if err != nil { | |||
// just retry on error | |||
// sloppy, but works | |||
return self.fetchBlockOps(blockNum) | |||
} | |||
bytes, err := json.Marshal(r) | |||
if err != nil { | |||
panic(err) | |||
} | |||
count := len(*r) | |||
log.Infof("got %d operations for block %d", count, blockNum) | |||
return &bytes | |||
//self.datastore.writeBlockOps(blockNum, bytes) | |||
} |
@@ -10,8 +10,7 @@ import "strconv" | |||
const appPrefix = "sbf" | |||
type SteemDataStorer interface { | |||
SetCurrentBlockHeight() error | |||
ForceSetCurrentBlockHeight(BlockNumber) error | |||
SetCurrentBlockHeight(BlockNumber) error | |||
CurrentBlockHeight() BlockNumber | |||
HaveOpsForBlock(BlockNumber) bool | |||
StoreBlockOps(BlockNumber, *[]byte) error | |||
@@ -22,28 +21,18 @@ type SteemDataStore struct { | |||
kv KVStorer | |||
} | |||
func NewSteemDataStore(dir string) *SteemDataStore { | |||
func NewSteemDataStore(hostname string) SteemDataStorer { | |||
self := new(SteemDataStore) | |||
self.kv = NewRedisKVStore() | |||
self.kv = NewRedisKVStore(hostname) | |||
return self | |||
} | |||
func (self *SteemDataStore) ForceSetCurrentBlockHeight(blockNum BlockNumber) error { | |||
keyname := fmt.Sprintf("%s.global.CurrentBlockHeight", appPrefix) | |||
func (self *SteemDataStore) SetCurrentBlockHeight(blockNum BlockNumber) error { | |||
keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) | |||
value := fmt.Sprintf("%d", blockNum) | |||
return self.kv.Put(&keyname, &value) | |||
} | |||
// this function searches for the highest contiguously stored blocknum | |||
// and updates the memo in the db | |||
func (self *SteemDataStore) SetCurrentBlockHeight() error { | |||
nextVal := self.FindHighestContiguousBlockInDb(self.CurrentBlockHeight()) | |||
keyname := fmt.Sprintf("%s.global.CurrentBlockHeight", appPrefix) | |||
value := fmt.Sprintf("%d", nextVal) | |||
log.Infof("updating our current highest block in db to %d", nextVal) | |||
return self.kv.Put(&keyname, &value) | |||
} | |||
func (self *SteemDataStore) FindHighestContiguousBlockInDb(from BlockNumber) BlockNumber { | |||
last := from | |||
@@ -76,11 +65,11 @@ func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool { | |||
} | |||
func (self *SteemDataStore) CurrentBlockHeight() BlockNumber { | |||
keyname := fmt.Sprintf("%s.global.CurrentBlockHeight", appPrefix) | |||
keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) | |||
val, err := self.kv.Get(&keyname) | |||
if err != nil { | |||
// assume this is key not found, initialize key to default | |||
self.ForceSetCurrentBlockHeight(0) | |||
self.SetCurrentBlockHeight(0) | |||
// retry | |||
return self.CurrentBlockHeight() | |||
} |
@@ -22,9 +22,9 @@ type RedisKVStore struct { | |||
rc *redis.Client | |||
} | |||
func NewRedisKVStore() *RedisKVStore { | |||
func NewRedisKVStore(hostname string) *RedisKVStore { | |||
rkvs := new(RedisKVStore) | |||
rkvs.Open("localhost:6379") //FIXME(sneak) use viper | |||
rkvs.Open(hostname) //FIXME(sneak) use viper | |||
return rkvs | |||
} | |||
@@ -9,160 +9,17 @@ import "encoding/json" | |||
import log "github.com/sirupsen/logrus" | |||
const steemAPIURL = "https://api.steemit.com" | |||
const redisUrl = "localhost:6379" | |||
//const steemAPIURL = "http://10.100.202.175:8090" | |||
//const steemAPIURL = "http://las2.local:8090" | |||
func main() { | |||
processinit() | |||
app := NewApp(&appconfig{}) | |||
app := NewApp(&appconfig{ | |||
logLevel: log.DebugLevel, | |||
apiUrl: steemAPIURL, | |||
redisUrl: redisUrl, | |||
fetcherThreads: 40, | |||
}) | |||
app.main() | |||
} | |||
func processinit() { | |||
//FIXME(sneak) use viper to set loglevel | |||
//log.SetLevel(log.DebugLevel) | |||
log.SetLevel(log.InfoLevel) | |||
} | |||
type BlockNumber uint64 | |||
type App struct { | |||
datastore SteemDataStorer | |||
api *SteemAPI | |||
currentNetworkBlockHeight BlockNumber | |||
currentLocalBlockHeight BlockNumber | |||
currentFetchingThreads uint | |||
desiredFetchingThreads uint | |||
lock *sync.Mutex | |||
} | |||
type appconfig map[string]string | |||
func NewApp(config *appconfig) *App { | |||
self := new(App) | |||
self.init(config) | |||
return self | |||
} | |||
func (self *App) init(config *appconfig) { | |||
self.api = NewSteemAPI(steemAPIURL) | |||
self.datastore = NewSteemDataStore("./d") | |||
self.desiredFetchingThreads = 40 | |||
self.currentFetchingThreads = 0 | |||
self.lock = &sync.Mutex{} | |||
} | |||
func (self *App) updateCurrentBlockHeight() { | |||
h := self.fetchCurrentBlockHeight() | |||
if h > self.currentNetworkBlockHeight { | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
self.currentNetworkBlockHeight = h | |||
log.Infof("current block height is now %d", self.currentNetworkBlockHeight) | |||
} | |||
} | |||
func (self *App) main() { | |||
log.Infof("steem block data fetcher starting up...") | |||
self.mainloop() | |||
} | |||
func (self *App) numFetchers() uint { | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
return self.currentFetchingThreads | |||
} | |||
func (self *App) incrFetchers() { | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
self.currentFetchingThreads += 1 | |||
} | |||
func (self *App) decrFetchers() { | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
self.currentFetchingThreads -= 1 | |||
} | |||
func (self *App) spawnNewFetcher(blockNum BlockNumber) { | |||
log.Debugf("spawning fetcher for block %d", blockNum) | |||
go func() { | |||
// this is so hacky, make a queue like a grownup would you | |||
time.Sleep(100 * time.Millisecond) | |||
if self.datastore.HaveOpsForBlock(blockNum) { | |||
log.Infof("already have ops for block %d, not re-fetching", blockNum) | |||
return | |||
} | |||
self.incrFetchers() | |||
self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum)) | |||
self.decrFetchers() | |||
}() | |||
} | |||
func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) { | |||
self.datastore.StoreBlockOps(blockNum, blockOps) | |||
} | |||
// note that parallelFetchAndStoreBlocks does not respect the | |||
// limitation on number of desired fetchers, that is for the caller | |||
func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) { | |||
var diff = uint64(end) - uint64(start) | |||
var i uint64 | |||
for i = 0; i < diff; i++ { | |||
self.spawnNewFetcher(BlockNumber(uint64(start) + i)) | |||
} | |||
} | |||
func (self *App) mainloop() { | |||
log.Infof("using %d fetching threads", self.desiredFetchingThreads) | |||
for { | |||
self.updateCurrentBlockHeight() | |||
log.Infof("current number of active fetchers: %d", self.numFetchers()) | |||
time.Sleep(1500 * time.Millisecond) | |||
self.datastore.SetCurrentBlockHeight() | |||
localHeight := self.datastore.CurrentBlockHeight() | |||
log.Infof("our highest fetched block height is %d", localHeight) | |||
if localHeight < self.currentNetworkBlockHeight { | |||
// we need to fetch some blocks from the network | |||
avail := self.desiredFetchingThreads - self.numFetchers() | |||
diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight) | |||
log.Infof("we need to fetch %d blocks", diff) | |||
if uint64(diff) > uint64(avail) { | |||
self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail))) | |||
} else { | |||
// just spawn fetchers for the blocks we don't have | |||
// spawning will update the number of running fetchers | |||
self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight) | |||
} | |||
} | |||
//needFetchers := self.desiredFetchingThreads - self.numFetchers() | |||
} | |||
} | |||
func (self *App) fetchCurrentBlockHeight() BlockNumber { | |||
r, err := self.api.GetDynamicGlobalProperties() | |||
if err != nil { | |||
panic("can't fetch global properties, bailing") | |||
} | |||
if r.LastIrreversibleBlockNum < 100 { | |||
panic("can't fetch global properties, bailing") | |||
} | |||
return r.LastIrreversibleBlockNum | |||
} | |||
func (self *App) fetchBlockOps(blockNum BlockNumber) *[]byte { | |||
r, err := self.api.GetOpsInBlock(blockNum) | |||
if err != nil { | |||
// just retry on error | |||
// sloppy, but works | |||
return self.fetchBlockOps(blockNum) | |||
} | |||
bytes, err := json.Marshal(r) | |||
if err != nil { | |||
panic(err) | |||
} | |||
count := len(*r) | |||
log.Infof("got %d operations for block %d", count, blockNum) | |||
return &bytes | |||
//self.datastore.writeBlockOps(blockNum, bytes) | |||
} |
@@ -1,14 +1,11 @@ | |||
package main | |||
import ( | |||
"encoding/json" | |||
log "github.com/sirupsen/logrus" | |||
) | |||
import "encoding/json" | |||
import log "github.com/sirupsen/logrus" | |||
type SteemAPI struct { | |||
url string | |||
rpc *JSONRPC | |||
log *log.Logger | |||
} | |||
var EmptyParams = []string{} | |||
@@ -22,8 +19,6 @@ func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI { | |||
rpc: rpc, | |||
} | |||
self.log = log.New() | |||
for _, option := range options { | |||
option(self) | |||
} |
@@ -3,6 +3,8 @@ package main | |||
import "encoding/json" | |||
import "github.com/joeshaw/iso8601" | |||
type BlockNumber uint64 | |||
type OperationObject struct { | |||
BlockNumber BlockNumber `json:"block"` | |||
OpInTx int `json:"op_in_trx"` | |||
@@ -18,6 +20,11 @@ type GetOpsInBlockRequestParams struct { | |||
VirtualOps bool | |||
} | |||
func (r *GetOpsInBlockRequestParams) MarshalJSON() ([]byte, error) { | |||
arr := []interface{}{r.BlockNum, r.VirtualOps} | |||
return json.Marshal(arr) | |||
} | |||
type DynamicGlobalProperties struct { | |||
ConfidentialSbdSupply string `json:"confidential_sbd_supply"` | |||
ConfidentialSupply string `json:"confidential_supply"` | |||
@@ -51,9 +58,5 @@ type DynamicGlobalProperties struct { | |||
} | |||
type GetOpsInBlockResponse *[]OperationObject | |||
type GetDynamicGlobalPropertiesResponse *DynamicGlobalProperties | |||
func (r *GetOpsInBlockRequestParams) MarshalJSON() ([]byte, error) { | |||
arr := []interface{}{r.BlockNum, r.VirtualOps} | |||
return json.Marshal(arr) | |||
} | |||
type GetDynamicGlobalPropertiesResponse *DynamicGlobalProperties |