diff --git a/db.go b/db.go index f3a49f8..4ec19c5 100644 --- a/db.go +++ b/db.go @@ -5,6 +5,17 @@ import "io/ioutil" import "github.com/dgraph-io/badger" import "github.com/spf13/afero" import "github.com/go-redis/redis" +import "fmt" +import "strconv" + +const appPrefix = "steem-block-fetcher" + +type SteemDataStorer interface { + SetCurrentBlockHeight(BlockNumber) error + CurrentBlockHeight() BlockNumber + HaveOpsForBlock(BlockNumber) bool + StoreBlockOps(BlockNumber, *[]byte) error +} // SteemDataStore is the object with which the rest of this tool interacts type SteemDataStore struct { @@ -17,8 +28,28 @@ func NewSteemDataStore(dir string) *SteemDataStore { return self } -func (self *SteemDataStore) StoreBlockOps(blockNum int, blockOps []byte) { - panic("not implemented") +func (self *SteemDataStore) SetCurrentBlockHeight(blockNum BlockNumber) error { + panic("unimplemented") + return nil +} + +func (self *SteemDataStore) StoreBlockOps(blockNum BlockNumber, blockOps *[]byte) error { + panic("unimplemented") +} + +func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool { + panic("unimplemented") +} + +func (self *SteemDataStore) CurrentBlockHeight() BlockNumber { + keyname := fmt.Sprintf("%s:global:CurrentBlockHeight", appPrefix) + val, err := self.kv.Get(&keyname) + //FIXME(sneak) make this init the key to 0 if not found + if err != nil { + panic("unable to query redis") + } + intval, err := strconv.ParseUint(*val, 10, 64) + return BlockNumber(intval) } // KeyValueStorer is an interface for the backend kv store used by @@ -45,7 +76,7 @@ func NewRedisKVStore() *RedisKVStore { func (self *RedisKVStore) Get(keyname *string) (*string, error) { val, err := self.rc.Get(*keyname).Result() if err != nil { - panic("unable to fetch key from redis") + log.Panicf("unable to fetch key '%s' from redis", *keyname) //FIXME(sneak) we should probably distinguish between key not //existing and fetch error } diff --git a/jsonrpc.go b/jsonrpc.go index 5faf6fa..d2fa802 100644 --- a/jsonrpc.go +++ b/jsonrpc.go @@ -43,8 +43,6 @@ type JSONRPCRequest struct { type JSONRPC struct { url string client httpRPCClient - Debug bool - log *log.Logger } // New create new rpc client with given url @@ -53,7 +51,6 @@ func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC { url: url, client: http.DefaultClient, } - rpc.log = log.New() for _, option := range options { option(rpc) } @@ -88,9 +85,9 @@ func (rpc *JSONRPC) Call(method string, params json.RawMessage) (json.RawMessage return nil, err } - if rpc.Debug { - rpc.log.Println(fmt.Sprintf("%s\nRequest: %s\nResponse: %s\n", method, body, data)) - } + log.Debugf("%s", method) + log.Debugf("Request: %s", body) + log.Debugf("Response: %s", data) resp := new(JSONRPCResponse) if err := json.Unmarshal(data, resp); err != nil { diff --git a/main.go b/main.go index 387bcaa..9d3b5b2 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ package main //import "fmt" import "sync" import "time" +import "encoding/json" import log "github.com/sirupsen/logrus" const steemAPIURL = "https://api.steemit.com" @@ -20,13 +21,15 @@ func processinit() { log.SetLevel(log.DebugLevel) } +type BlockNumber uint64 + type App struct { - datastore *SteemDataStore + datastore SteemDataStorer api *SteemAPI - currentNetworkBlockHeight int - currentLocalBlockHeight int - currentFetchingThreads int - desiredFetchingThreads int + currentNetworkBlockHeight BlockNumber + currentLocalBlockHeight BlockNumber + currentFetchingThreads uint + desiredFetchingThreads uint lock *sync.Mutex } @@ -61,7 +64,7 @@ func (self *App) main() { self.mainloop() } -func (self *App) numFetchers() int { +func (self *App) numFetchers() uint { self.lock.Lock() defer self.lock.Unlock() return self.currentFetchingThreads @@ -79,18 +82,53 @@ func (self *App) decrFetchers() { self.currentFetchingThreads -= 1 } -func (self *App) spawnNewFetcher(blockNum int) { - self.incrFetchers() +func (self *App) spawnNewFetcher(blockNum BlockNumber) { + log.Infof("spawning fetcher for block %d", blockNum) + go func() { + self.incrFetchers() + self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum)) + self.decrFetchers() + }() } -func (self *App) mainloop() { - for { - self.updateCurrentBlockHeight() - time.Sleep(1 * time.Second) +func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) { + self.datastore.StoreBlockOps(blockNum, blockOps) +} + +// note that parallelFetchAndStoreBlocks does not respect the +// limitation on number of desired fetchers, that is for the caller +func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) { + var diff = uint64(end) - uint64(start) + var i uint64 + for i = 0; i < diff; i++ { + self.spawnNewFetcher(BlockNumber(uint64(start) + i)) } } -func (self *App) fetchCurrentBlockHeight() int { +func (self *App) mainloop() { + log.Infof("using %d fetching threads", self.desiredFetchingThreads) + for { + self.updateCurrentBlockHeight() + localHeight := self.datastore.CurrentBlockHeight() + time.Sleep(500 * time.Millisecond) + if localHeight < self.currentNetworkBlockHeight { + // we need to fetch some blocks from the network + avail := self.desiredFetchingThreads - self.numFetchers() + diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight) + log.Infof("we need to fetch %d blocks", diff) + if uint64(diff) > uint64(avail) { + panic("not implemented yet") + } else { + // just spawn fetchers for the blocks we don't have + // spawning will update the number of running fetchers + self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight) + } + } + //needFetchers := self.desiredFetchingThreads - self.numFetchers() + } +} + +func (self *App) fetchCurrentBlockHeight() BlockNumber { r, err := self.api.GetDynamicGlobalProperties() if err != nil { panic("can't fetch global properties, bailing") @@ -101,30 +139,8 @@ func (self *App) fetchCurrentBlockHeight() int { return r.LastIrreversibleBlockNum } -func fetchBlockRange(s *SteemAPI, state *SteemDataStore, startBlock int, endBlock int) *error { - log.Debugf("fetching block range %d to %d inclusive", startBlock, endBlock) - for i := startBlock; i <= endBlock; i++ { - //fetchSingleBlock(s, fs, i) - } - return nil -} - -/* -func (self *App) fetchSingleBlockOps(blockNum int) (*byte[]) { - tmpName := fmt.Sprintf("/blockOps/%d.json.tmp", blockNum) - realName := fmt.Sprintf("/blockOps/%d.json", blockNum) - - done, err := afero.Exists(fs, realName) - - if err != nil { - panic(err) - } - - if done { - return - } - - r, err := s.GetOpsInBlock(blockNum) +func (self *App) fetchBlockOps(blockNum BlockNumber) *[]byte { + r, err := self.api.GetOpsInBlock(blockNum) if err != nil { panic(err) } @@ -132,10 +148,6 @@ func (self *App) fetchSingleBlockOps(blockNum int) (*byte[]) { if err != nil { panic(err) } - err = afero.WriteFile(fs, tmpName, bytes, 0) - if err != nil { - panic(err) - } - fs.Rename(tmpName, realName) + return &bytes + //self.datastore.writeBlockOps(blockNum, bytes) } -*/ diff --git a/steemapi.go b/steemapi.go index 82968e1..f385132 100644 --- a/steemapi.go +++ b/steemapi.go @@ -6,10 +6,9 @@ import ( ) type SteemAPI struct { - url string - rpc *JSONRPC - Debug bool - log *log.Logger + url string + rpc *JSONRPC + log *log.Logger } var EmptyParams = []string{} @@ -17,7 +16,7 @@ var EmptyParamsRaw, _ = json.Marshal(EmptyParams) func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI { - rpc := NewJSONRPC(url, func(x *JSONRPC) { x.Debug = true }) + rpc := NewJSONRPC(url, func(x *JSONRPC) {}) self := &SteemAPI{ rpc: rpc, @@ -47,7 +46,7 @@ func (self *SteemAPI) GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesRe return &resp, nil } -func (self *SteemAPI) GetOpsInBlock(blockNum int) (GetOpsInBlockResponse, error) { +func (self *SteemAPI) GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse, error) { // first fetch virtual ops vOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: true} diff --git a/types.go b/types.go index b9a13b6..91888a3 100644 --- a/types.go +++ b/types.go @@ -4,50 +4,50 @@ import "encoding/json" import "github.com/joeshaw/iso8601" type OperationObject struct { - BlockNumber uint64 `json:"block"` + BlockNumber BlockNumber `json:"block"` OpInTx int `json:"op_in_trx"` Operation []json.RawMessage `json:"op"` Timestamp iso8601.Time `json:"timestamp"` TransactionID string `json:"trx_id"` - TransactionInBlock uint64 `json:"trx_in_block"` + TransactionInBlock BlockNumber `json:"trx_in_block"` VirtualOperation int `json:"virtual_op"` } type GetOpsInBlockRequestParams struct { - BlockNum int + BlockNum BlockNumber VirtualOps bool } type DynamicGlobalProperties struct { - ConfidentialSbdSupply string `json:"confidential_sbd_supply"` - ConfidentialSupply string `json:"confidential_supply"` - CurrentAslot int `json:"current_aslot"` - CurrentSbdSupply string `json:"current_sbd_supply"` - CurrentSupply string `json:"current_supply"` - CurrentWitness string `json:"current_witness"` - DelegationReturnPeriod int `json:"delegation_return_period"` - HeadBlockID string `json:"head_block_id"` - HeadBlockNumber int `json:"head_block_number"` - LastIrreversibleBlockNum int `json:"last_irreversible_block_num"` - MaximumBlockSize int `json:"maximum_block_size"` - NumPowWitnesses int `json:"num_pow_witnesses"` - ParticipationCount int `json:"participation_count"` - PendingRewardedVestingShares string `json:"pending_rewarded_vesting_shares"` - PendingRewardedVestingSteem string `json:"pending_rewarded_vesting_steem"` - RecentSlotsFilled string `json:"recent_slots_filled"` - ReverseAuctionSeconds int `json:"reverse_auction_seconds"` - SbdInterestRate int `json:"sbd_interest_rate"` - SbdPrintRate int `json:"sbd_print_rate"` - SbdStartPercent int `json:"sbd_start_percent"` - SbdStopPercent int `json:"sbd_stop_percent"` - Time string `json:"time"` - TotalPow int `json:"total_pow"` - TotalRewardFundSteem string `json:"total_reward_fund_steem"` - TotalRewardShares2 string `json:"total_reward_shares2"` - TotalVestingFundSteem string `json:"total_vesting_fund_steem"` - TotalVestingShares string `json:"total_vesting_shares"` - VirtualSupply string `json:"virtual_supply"` - VotePowerReserveRate int `json:"vote_power_reserve_rate"` + ConfidentialSbdSupply string `json:"confidential_sbd_supply"` + ConfidentialSupply string `json:"confidential_supply"` + CurrentAslot int `json:"current_aslot"` + CurrentSbdSupply string `json:"current_sbd_supply"` + CurrentSupply string `json:"current_supply"` + CurrentWitness string `json:"current_witness"` + DelegationReturnPeriod int `json:"delegation_return_period"` + HeadBlockID string `json:"head_block_id"` + HeadBlockNumber int `json:"head_block_number"` + LastIrreversibleBlockNum BlockNumber `json:"last_irreversible_block_num"` + MaximumBlockSize int `json:"maximum_block_size"` + NumPowWitnesses int `json:"num_pow_witnesses"` + ParticipationCount int `json:"participation_count"` + PendingRewardedVestingShares string `json:"pending_rewarded_vesting_shares"` + PendingRewardedVestingSteem string `json:"pending_rewarded_vesting_steem"` + RecentSlotsFilled string `json:"recent_slots_filled"` + ReverseAuctionSeconds int `json:"reverse_auction_seconds"` + SbdInterestRate int `json:"sbd_interest_rate"` + SbdPrintRate int `json:"sbd_print_rate"` + SbdStartPercent int `json:"sbd_start_percent"` + SbdStopPercent int `json:"sbd_stop_percent"` + Time string `json:"time"` + TotalPow int `json:"total_pow"` + TotalRewardFundSteem string `json:"total_reward_fund_steem"` + TotalRewardShares2 string `json:"total_reward_shares2"` + TotalVestingFundSteem string `json:"total_vesting_fund_steem"` + TotalVestingShares string `json:"total_vesting_shares"` + VirtualSupply string `json:"virtual_supply"` + VotePowerReserveRate int `json:"vote_power_reserve_rate"` } type GetOpsInBlockResponse *[]OperationObject