making progress

This commit is contained in:
Jeffrey Paul 2018-10-31 02:44:19 -07:00
parent b8f85eae72
commit 26bc5ae878
Signed by: sneak
GPG Key ID: 052443F4DF2A55C2
5 changed files with 129 additions and 90 deletions

37
db.go
View File

@ -5,6 +5,17 @@ import "io/ioutil"
import "github.com/dgraph-io/badger" import "github.com/dgraph-io/badger"
import "github.com/spf13/afero" import "github.com/spf13/afero"
import "github.com/go-redis/redis" import "github.com/go-redis/redis"
import "fmt"
import "strconv"
const appPrefix = "steem-block-fetcher"
type SteemDataStorer interface {
SetCurrentBlockHeight(BlockNumber) error
CurrentBlockHeight() BlockNumber
HaveOpsForBlock(BlockNumber) bool
StoreBlockOps(BlockNumber, *[]byte) error
}
// SteemDataStore is the object with which the rest of this tool interacts // SteemDataStore is the object with which the rest of this tool interacts
type SteemDataStore struct { type SteemDataStore struct {
@ -17,8 +28,28 @@ func NewSteemDataStore(dir string) *SteemDataStore {
return self return self
} }
func (self *SteemDataStore) StoreBlockOps(blockNum int, blockOps []byte) { func (self *SteemDataStore) SetCurrentBlockHeight(blockNum BlockNumber) error {
panic("not implemented") panic("unimplemented")
return nil
}
func (self *SteemDataStore) StoreBlockOps(blockNum BlockNumber, blockOps *[]byte) error {
panic("unimplemented")
}
func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool {
panic("unimplemented")
}
func (self *SteemDataStore) CurrentBlockHeight() BlockNumber {
keyname := fmt.Sprintf("%s:global:CurrentBlockHeight", appPrefix)
val, err := self.kv.Get(&keyname)
//FIXME(sneak) make this init the key to 0 if not found
if err != nil {
panic("unable to query redis")
}
intval, err := strconv.ParseUint(*val, 10, 64)
return BlockNumber(intval)
} }
// KeyValueStorer is an interface for the backend kv store used by // KeyValueStorer is an interface for the backend kv store used by
@ -45,7 +76,7 @@ func NewRedisKVStore() *RedisKVStore {
func (self *RedisKVStore) Get(keyname *string) (*string, error) { func (self *RedisKVStore) Get(keyname *string) (*string, error) {
val, err := self.rc.Get(*keyname).Result() val, err := self.rc.Get(*keyname).Result()
if err != nil { if err != nil {
panic("unable to fetch key from redis") log.Panicf("unable to fetch key '%s' from redis", *keyname)
//FIXME(sneak) we should probably distinguish between key not //FIXME(sneak) we should probably distinguish between key not
//existing and fetch error //existing and fetch error
} }

View File

@ -43,8 +43,6 @@ type JSONRPCRequest struct {
type JSONRPC struct { type JSONRPC struct {
url string url string
client httpRPCClient client httpRPCClient
Debug bool
log *log.Logger
} }
// New create new rpc client with given url // New create new rpc client with given url
@ -53,7 +51,6 @@ func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC {
url: url, url: url,
client: http.DefaultClient, client: http.DefaultClient,
} }
rpc.log = log.New()
for _, option := range options { for _, option := range options {
option(rpc) option(rpc)
} }
@ -88,9 +85,9 @@ func (rpc *JSONRPC) Call(method string, params json.RawMessage) (json.RawMessage
return nil, err return nil, err
} }
if rpc.Debug { log.Debugf("%s", method)
rpc.log.Println(fmt.Sprintf("%s\nRequest: %s\nResponse: %s\n", method, body, data)) log.Debugf("Request: %s", body)
} log.Debugf("Response: %s", data)
resp := new(JSONRPCResponse) resp := new(JSONRPCResponse)
if err := json.Unmarshal(data, resp); err != nil { if err := json.Unmarshal(data, resp); err != nil {

96
main.go
View File

@ -5,6 +5,7 @@ package main
//import "fmt" //import "fmt"
import "sync" import "sync"
import "time" import "time"
import "encoding/json"
import log "github.com/sirupsen/logrus" import log "github.com/sirupsen/logrus"
const steemAPIURL = "https://api.steemit.com" const steemAPIURL = "https://api.steemit.com"
@ -20,13 +21,15 @@ func processinit() {
log.SetLevel(log.DebugLevel) log.SetLevel(log.DebugLevel)
} }
type BlockNumber uint64
type App struct { type App struct {
datastore *SteemDataStore datastore SteemDataStorer
api *SteemAPI api *SteemAPI
currentNetworkBlockHeight int currentNetworkBlockHeight BlockNumber
currentLocalBlockHeight int currentLocalBlockHeight BlockNumber
currentFetchingThreads int currentFetchingThreads uint
desiredFetchingThreads int desiredFetchingThreads uint
lock *sync.Mutex lock *sync.Mutex
} }
@ -61,7 +64,7 @@ func (self *App) main() {
self.mainloop() self.mainloop()
} }
func (self *App) numFetchers() int { func (self *App) numFetchers() uint {
self.lock.Lock() self.lock.Lock()
defer self.lock.Unlock() defer self.lock.Unlock()
return self.currentFetchingThreads return self.currentFetchingThreads
@ -79,18 +82,53 @@ func (self *App) decrFetchers() {
self.currentFetchingThreads -= 1 self.currentFetchingThreads -= 1
} }
func (self *App) spawnNewFetcher(blockNum int) { func (self *App) spawnNewFetcher(blockNum BlockNumber) {
log.Infof("spawning fetcher for block %d", blockNum)
go func() {
self.incrFetchers() self.incrFetchers()
self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum))
self.decrFetchers()
}()
} }
func (self *App) mainloop() { func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) {
for { self.datastore.StoreBlockOps(blockNum, blockOps)
self.updateCurrentBlockHeight() }
time.Sleep(1 * time.Second)
// note that parallelFetchAndStoreBlocks does not respect the
// limitation on number of desired fetchers, that is for the caller
func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) {
var diff = uint64(end) - uint64(start)
var i uint64
for i = 0; i < diff; i++ {
self.spawnNewFetcher(BlockNumber(uint64(start) + i))
} }
} }
func (self *App) fetchCurrentBlockHeight() int { func (self *App) mainloop() {
log.Infof("using %d fetching threads", self.desiredFetchingThreads)
for {
self.updateCurrentBlockHeight()
localHeight := self.datastore.CurrentBlockHeight()
time.Sleep(500 * time.Millisecond)
if localHeight < self.currentNetworkBlockHeight {
// we need to fetch some blocks from the network
avail := self.desiredFetchingThreads - self.numFetchers()
diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight)
log.Infof("we need to fetch %d blocks", diff)
if uint64(diff) > uint64(avail) {
panic("not implemented yet")
} else {
// just spawn fetchers for the blocks we don't have
// spawning will update the number of running fetchers
self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight)
}
}
//needFetchers := self.desiredFetchingThreads - self.numFetchers()
}
}
func (self *App) fetchCurrentBlockHeight() BlockNumber {
r, err := self.api.GetDynamicGlobalProperties() r, err := self.api.GetDynamicGlobalProperties()
if err != nil { if err != nil {
panic("can't fetch global properties, bailing") panic("can't fetch global properties, bailing")
@ -101,30 +139,8 @@ func (self *App) fetchCurrentBlockHeight() int {
return r.LastIrreversibleBlockNum return r.LastIrreversibleBlockNum
} }
func fetchBlockRange(s *SteemAPI, state *SteemDataStore, startBlock int, endBlock int) *error { func (self *App) fetchBlockOps(blockNum BlockNumber) *[]byte {
log.Debugf("fetching block range %d to %d inclusive", startBlock, endBlock) r, err := self.api.GetOpsInBlock(blockNum)
for i := startBlock; i <= endBlock; i++ {
//fetchSingleBlock(s, fs, i)
}
return nil
}
/*
func (self *App) fetchSingleBlockOps(blockNum int) (*byte[]) {
tmpName := fmt.Sprintf("/blockOps/%d.json.tmp", blockNum)
realName := fmt.Sprintf("/blockOps/%d.json", blockNum)
done, err := afero.Exists(fs, realName)
if err != nil {
panic(err)
}
if done {
return
}
r, err := s.GetOpsInBlock(blockNum)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -132,10 +148,6 @@ func (self *App) fetchSingleBlockOps(blockNum int) (*byte[]) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = afero.WriteFile(fs, tmpName, bytes, 0) return &bytes
if err != nil { //self.datastore.writeBlockOps(blockNum, bytes)
panic(err)
}
fs.Rename(tmpName, realName)
} }
*/

View File

@ -8,7 +8,6 @@ import (
type SteemAPI struct { type SteemAPI struct {
url string url string
rpc *JSONRPC rpc *JSONRPC
Debug bool
log *log.Logger log *log.Logger
} }
@ -17,7 +16,7 @@ var EmptyParamsRaw, _ = json.Marshal(EmptyParams)
func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI { func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI {
rpc := NewJSONRPC(url, func(x *JSONRPC) { x.Debug = true }) rpc := NewJSONRPC(url, func(x *JSONRPC) {})
self := &SteemAPI{ self := &SteemAPI{
rpc: rpc, rpc: rpc,
@ -47,7 +46,7 @@ func (self *SteemAPI) GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesRe
return &resp, nil return &resp, nil
} }
func (self *SteemAPI) GetOpsInBlock(blockNum int) (GetOpsInBlockResponse, error) { func (self *SteemAPI) GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse, error) {
// first fetch virtual ops // first fetch virtual ops
vOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: true} vOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: true}

View File

@ -4,17 +4,17 @@ import "encoding/json"
import "github.com/joeshaw/iso8601" import "github.com/joeshaw/iso8601"
type OperationObject struct { type OperationObject struct {
BlockNumber uint64 `json:"block"` BlockNumber BlockNumber `json:"block"`
OpInTx int `json:"op_in_trx"` OpInTx int `json:"op_in_trx"`
Operation []json.RawMessage `json:"op"` Operation []json.RawMessage `json:"op"`
Timestamp iso8601.Time `json:"timestamp"` Timestamp iso8601.Time `json:"timestamp"`
TransactionID string `json:"trx_id"` TransactionID string `json:"trx_id"`
TransactionInBlock uint64 `json:"trx_in_block"` TransactionInBlock BlockNumber `json:"trx_in_block"`
VirtualOperation int `json:"virtual_op"` VirtualOperation int `json:"virtual_op"`
} }
type GetOpsInBlockRequestParams struct { type GetOpsInBlockRequestParams struct {
BlockNum int BlockNum BlockNumber
VirtualOps bool VirtualOps bool
} }
@ -28,7 +28,7 @@ type DynamicGlobalProperties struct {
DelegationReturnPeriod int `json:"delegation_return_period"` DelegationReturnPeriod int `json:"delegation_return_period"`
HeadBlockID string `json:"head_block_id"` HeadBlockID string `json:"head_block_id"`
HeadBlockNumber int `json:"head_block_number"` HeadBlockNumber int `json:"head_block_number"`
LastIrreversibleBlockNum int `json:"last_irreversible_block_num"` LastIrreversibleBlockNum BlockNumber `json:"last_irreversible_block_num"`
MaximumBlockSize int `json:"maximum_block_size"` MaximumBlockSize int `json:"maximum_block_size"`
NumPowWitnesses int `json:"num_pow_witnesses"` NumPowWitnesses int `json:"num_pow_witnesses"`
ParticipationCount int `json:"participation_count"` ParticipationCount int `json:"participation_count"`