@@ -1,4 +1,19 @@ | |||
default: run | |||
GOFILES := $(shell find . -type f -name '*.go' -not -name '*_test.go') | |||
default: test | |||
.PHONY: run build test | |||
run: | |||
go run *.go | |||
go run $(GOFILES) | |||
build: steem-block-db | |||
steem-block-db: *.go | |||
go build | |||
test: | |||
go test | |||
clean: | |||
rm steem-block-db |
@@ -1,21 +1,23 @@ | |||
package main | |||
import "sync" | |||
import "time" | |||
import log "github.com/sirupsen/logrus" | |||
import "encoding/json" | |||
//import "encoding/json" | |||
type App struct { | |||
datastore SteemDataStorer | |||
api *SteemAPI | |||
currentNetworkBlockHeight BlockNumber | |||
currentLocalBlockHeight BlockNumber | |||
lock *sync.Mutex | |||
api SteemAPIShape | |||
config *appconfig | |||
db SteemDataStorer | |||
lock *sync.Mutex | |||
} | |||
type appconfig struct { | |||
logLevel log.Level | |||
apiUrl string | |||
redisUrl string | |||
logLevel log.Level | |||
apiUrl string | |||
redisUrl string | |||
desiredFetcherThreads uint | |||
} | |||
func NewApp(config *appconfig) *App { | |||
@@ -27,106 +29,71 @@ func NewApp(config *appconfig) *App { | |||
func (self *App) init(config *appconfig) { | |||
log.SetLevel(config.logLevel) | |||
self.api = NewSteemAPI(config.apiUrl) | |||
self.datastore = NewSteemDataStore(config.redisUrl) | |||
self.db = NewSteemDataStore(config.redisUrl) | |||
self.lock = &sync.Mutex{} | |||
} | |||
func (self *App) updateCurrentBlockHeight() { | |||
h := self.fetchCurrentBlockHeight() | |||
if h > self.currentNetworkBlockHeight { | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
self.currentNetworkBlockHeight = h | |||
log.Infof("current block height is now %d", self.currentNetworkBlockHeight) | |||
} | |||
self.config = config | |||
} | |||
func (self *App) main() { | |||
log.Infof("steem block data fetcher starting up...") | |||
//self.mainloop() | |||
} | |||
/* | |||
func (self *App) numFetchers() uint { | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
return uint(len(*self.fetchingBlocks)) | |||
} | |||
func (self *App) spawnNewFetcher(blockNum BlockNumber) { | |||
log.Debugf("spawning fetcher for block %d", blockNum) | |||
go func() { | |||
// this is so hacky, make a queue like a grownup would you | |||
time.Sleep(100 * time.Millisecond) | |||
if self.datastore.HaveOpsForBlock(blockNum) { | |||
log.Infof("already have ops for block %d, not re-fetching", blockNum) | |||
return | |||
} | |||
//self.incrFetchers() | |||
self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum)) | |||
//self.decrFetchers() | |||
}() | |||
} | |||
func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) { | |||
self.datastore.StoreBlockOps(blockNum, blockOps) | |||
} | |||
// note that parallelFetchAndStoreBlocks does not respect the | |||
// limitation on number of desired fetchers, that is for the caller | |||
func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) { | |||
var diff = uint64(end) - uint64(start) | |||
var i uint64 | |||
for i = 0; i < diff; i++ { | |||
self.spawnNewFetcher(BlockNumber(uint64(start) + i)) | |||
} | |||
} | |||
func (self *App) populateFetchers() { | |||
self.mainloop() | |||
} | |||
func (self *App) mainloop() { | |||
log.Infof("using %d fetching threads", self.config.desiredFetcherThreads) | |||
// we are going to do batches of 5,000 blocks | |||
batchSize := uint(1000) | |||
var start BlockNumber | |||
var end BlockNumber | |||
for { | |||
self.db.SetCurrentNetworkBlockHeight(self.fetchCurrentBlockHeight()) | |||
if self.db.CurrentNetworkBlockHeight() == self.db.CurrentLocalBlockHeight() { | |||
//we are synced | |||
time.Sleep(1 * time.Second) | |||
continue | |||
} | |||
// we are not synced | |||
// how far behind are we? | |||
countMustFetch := uint(self.db.CurrentNetworkBlockHeight() - self.db.CurrentLocalBlockHeight()) | |||
start = self.db.CurrentLocalBlockHeight() + 1 | |||
log.Infof("beginning fetch with start block %d", start) | |||
if countMustFetch <= batchSize { | |||
end = self.db.CurrentNetworkBlockHeight() | |||
log.Infof("fetch is within our batch size (%d), end block for fetch batch is %d", batchSize, end) | |||
} else { | |||
end = BlockNumber(uint(start) + uint(batchSize) - uint(1)) | |||
log.Infof("fetch is too large for batch size (%d), end block for fetch batch is %d", batchSize, end) | |||
} | |||
log.Infof("using %d fetching threads", self.desiredFetchingThreads) | |||
self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() | |||
bf := NewBlockFetcher(&BlockFetcherConfig{ | |||
api: self.api.(*SteemAPI), | |||
desiredFetcherThreads: self.config.desiredFetcherThreads, | |||
startBlock: start, | |||
endBlock: end, | |||
}) | |||
for { | |||
self.spawnMoreFetchers() | |||
self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() | |||
time.Sleep(1000 * time.Millisecond) | |||
blocks := bf.fetch() | |||
log.Infof("blockfetcher has returned") | |||
self.pushBlocks(blocks) | |||
} | |||
} | |||
func (self *App) spawnMoreFetchers() { | |||
} | |||
/* | |||
self.updateCurrentBlockHeight() | |||
log.Infof("current number of active fetchers: %d", self.numFetchers()) | |||
time.Sleep(1500 * time.Millisecond) | |||
self.datastore.SetCurrentBlockHeight() | |||
localHeight := self.datastore.CurrentBlockHeight() | |||
log.Infof("our highest fetched block height is %d", localHeight) | |||
if localHeight < self.currentNetworkBlockHeight { | |||
// we need to fetch some blocks from the network | |||
avail := self.desiredFetchingThreads - self.numFetchers() | |||
diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight) | |||
log.Infof("we need to fetch %d blocks", diff) | |||
if uint64(diff) > uint64(avail) { | |||
self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail))) | |||
} else { | |||
// just spawn fetchers for the blocks we don't have | |||
// spawning will update the number of running fetchers | |||
self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight) | |||
func (self *App) pushBlocks(newBlocks *[]FetchedBlock) { | |||
counter := 0 | |||
for _, newBlock := range *newBlocks { | |||
counter += 1 | |||
err := self.db.StoreBlockOps(newBlock.blockNumber, newBlock.blockData) | |||
if err != nil { | |||
log.Panic(err) | |||
} | |||
} | |||
//needFetchers := self.desiredFetchingThreads - self.numFetchers() | |||
*/ | |||
log.Infof("pushed %d new blocks to db", counter) | |||
self.db.UpdateCurrentLocalBlockHeight() | |||
} | |||
func (self *App) fetchCurrentBlockHeight() BlockNumber { | |||
r, err := self.api.GetDynamicGlobalProperties() | |||
if err != nil { | |||
log.Panicf("can't fetch global properties, bailing. err: %s", err) |
@@ -1,6 +1,8 @@ | |||
package main | |||
import "encoding/json" | |||
import "sync" | |||
import "time" | |||
import log "github.com/sirupsen/logrus" | |||
type FetchedBlock struct { | |||
@@ -14,7 +16,7 @@ type BlockFetcher struct { | |||
desiredFetcherThreads uint | |||
wantBlocks map[BlockNumber]bool | |||
fetchingBlocks map[BlockNumber]bool | |||
fetchedBlocks *[]FetchedBlock | |||
fetchedBlocks map[BlockNumber]*FetchedBlock | |||
lock *sync.Mutex | |||
workChannel chan BlockNumber | |||
resultsChannel chan *FetchedBlock | |||
@@ -43,47 +45,52 @@ func (self *BlockFetcher) init(config *BlockFetcherConfig) { | |||
self.lock = &sync.Mutex{} | |||
self.api = config.api | |||
self.desiredFetcherThreads = config.desiredFetcherThreads | |||
self.workChannel = make(chan BlockNumber) | |||
self.resultsChannel = make(chan *FetchedBlock) | |||
self.wantBlocks = make(map[BlockNumber]bool) | |||
self.fetchedBlocks = make(map[BlockNumber]*FetchedBlock) | |||
diff := int(uint(config.endBlock) - uint(config.startBlock)) | |||
log.Debugf("diff is %d", diff) | |||
for i := 0; i <= diff; i++ { | |||
self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true | |||
} | |||
log.Debugf("wantblocks[] is now %v", self.wantBlocks) | |||
} | |||
func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) { | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
if self.wantBlocks[blockNum] == false { | |||
log.Panicf("shouldn't happen") | |||
} | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
delete(self.wantBlocks, blockNum) | |||
} | |||
func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) { | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
if self.fetchingBlocks[blockNum] == false { | |||
log.Panicf("shouldn't happen") | |||
} | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
delete(self.fetchingBlocks, blockNum) | |||
} | |||
func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) { | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
if self.fetchingBlocks[blockNum] == true { | |||
log.Panicf("shouldn't happen") | |||
} | |||
self.lock.Lock() | |||
defer self.lock.Unlock() | |||
if self.fetchingBlocks == nil { | |||
self.fetchingBlocks = make(map[BlockNumber]bool) | |||
} | |||
if self.fetchingBlocks[blockNum] == false { | |||
self.fetchingBlocks[blockNum] = true | |||
} | |||
} | |||
func (self *BlockFetcher) fetcher(index int) { | |||
func (self *BlockFetcher) fetcher(index uint) { | |||
log.Debugf("fetcher thread %d starting", index) | |||
WorkLoop: | |||
@@ -106,44 +113,65 @@ WorkLoop: | |||
log.Debugf("fetcher thread %d ending", index) | |||
} | |||
func (self *BlockFetcher) sendWork(b BlockNumber) { | |||
go func() { | |||
// yay cheap goroutines, let them block on the unbuffered channel | |||
log.Debugf("waiting to send blockNum %d into the work channel", b) | |||
self.workChannel <- b | |||
log.Debugf("sent blockNum %d into the work channel", b) | |||
}() | |||
} | |||
func (self *BlockFetcher) fetch() *[]FetchedBlock { | |||
for i := 1; i < self.desiredFetcherThreads+1; i++ { | |||
log.Debugf("blockfetcher beginning fetch") | |||
for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ { | |||
go self.fetcher(i) | |||
} | |||
for blockNum, _ := range self.wantBlocks { | |||
// yay cheap goroutines, let them block on the unbuffered channel | |||
go func() { | |||
log.Debugf("waiting to send blockNum %d into the work channel", blockNum) | |||
self.workChannel <- blockNum | |||
log.Debugf("sent blockNum %d into the work channel", blockNum) | |||
}() | |||
self.lock.Lock() | |||
for blockNum := range self.wantBlocks { | |||
self.sendWork(blockNum) | |||
} | |||
self.lock.Unlock() | |||
// now we have to start reading from the unbuffered resultsChannel | |||
// otherwise the workers will block when returning results | |||
select { | |||
case result := <-self.resultsChannel: | |||
self.receiveResult(result) | |||
default: | |||
if self.Done() == true { | |||
// if we get here, it's because workList is now empty and there | |||
// are no more results in the results channel. | |||
close(self.workChannel) // shut down the workers | |||
result := self.fetchedBlocks | |||
self = nil //this BlockFetcher is now finished. | |||
return result | |||
for { | |||
select { | |||
case result := <-self.resultsChannel: | |||
self.receiveResult(result) | |||
default: | |||
if self.Done() == true { | |||
log.Infof("blockfetcher %+v considers itself Done()", self) | |||
// if we get here, it's because workList is now empty and there | |||
// are no more results in the results channel. | |||
close(self.workChannel) // shut down the workers | |||
var final []FetchedBlock | |||
self.lock.Lock() | |||
for _, value := range self.fetchedBlocks { | |||
final = append(final, *value) | |||
} | |||
self.lock.Unlock() | |||
self = nil //this BlockFetcher is now finished. | |||
return &final | |||
} | |||
// in this case we are not done but got nothing from the result | |||
// channel so just wait a little bit to get more results and | |||
// check the channel again | |||
time.Sleep(10 * time.Millisecond) | |||
//FIXME(sneak) we maybe need to handle a case here where wantBlocks never | |||
//empties but workers need to be re-dispatched.. | |||
} | |||
time.Sleep(10 * time.Millisecond) | |||
//FIXME(sneak) we maybe need to handle a case here where wantBlocks never | |||
//empties but workers need to be re-dispatched.. | |||
} | |||
log.Panicf("this shouldn't happen") | |||
return nil //shouldn't happen, return should happen from above | |||
} | |||
func (self *BlockFetcher) receiveResult(r *FetchedBlock) { | |||
log.Debugf("got result for blocknum %d", r.blockNumber) | |||
self.removeFetchingBlock(r.blockNumber) | |||
self.lock.Lock() | |||
self.fetchedBlocks = append(self.fetchedBlocks, r) | |||
self.fetchedBlocks[r.blockNumber] = r | |||
self.lock.Unlock() | |||
self.removeWantBlock(r.blockNumber) | |||
} | |||
@@ -156,16 +184,16 @@ func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *Fetche | |||
} | |||
r, err := self.api.GetOpsInBlock(blockNum) | |||
if err != nil { | |||
result.error = err | |||
result.error = &err | |||
return result | |||
} | |||
bytes, err := json.Marshal(r) | |||
if err != nil { | |||
result.error = err | |||
result.error = &err | |||
return result | |||
} | |||
count := len(*r) | |||
log.Infof("got %d operations for block %d", count, blockNum) | |||
log.Infof("got %d operations for block %d fetched from network", count, blockNum) | |||
result.blockData = &bytes | |||
result.error = nil // make sure this is nil if it worked | |||
return result |
@@ -0,0 +1,53 @@ | |||
package main | |||
import "testing" | |||
import log "github.com/sirupsen/logrus" | |||
/* | |||
import "github.com/stretchr/testify/mock" | |||
type MockedSteemAPI struct { | |||
mock.Mock | |||
} | |||
func (m *MockedSteemAPI) DoSomething(number int) (bool, error) { | |||
args := m.Called(number) | |||
return args.Bool(0), args.Error(1) | |||
} | |||
*/ | |||
func TestBlockfetcherInit(t *testing.T) { | |||
log.SetLevel(log.DebugLevel) | |||
bf := NewBlockFetcher(&BlockFetcherConfig{ | |||
api: nil, | |||
desiredFetcherThreads: 1, | |||
startBlock: 10000, | |||
endBlock: 10005, | |||
}) | |||
if bf == nil { | |||
t.Errorf("could not instantiate blockfetcher") | |||
} | |||
} | |||
//can't actually fetch yet until we mock the api | |||
/* | |||
func TestBlockfetcherFetch(t *testing.T) { | |||
log.SetLevel(log.DebugLevel) | |||
bf := NewBlockFetcher(&BlockFetcherConfig{ | |||
api: nil, | |||
desiredFetcherThreads: 1, | |||
startBlock: 10000, | |||
endBlock: 10005, | |||
}) | |||
bf.fetch() | |||
} | |||
*/ |
@@ -10,8 +10,11 @@ import "strconv" | |||
const appPrefix = "sbf" | |||
type SteemDataStorer interface { | |||
SetCurrentBlockHeight(BlockNumber) error | |||
CurrentBlockHeight() BlockNumber | |||
SetCurrentNetworkBlockHeight(BlockNumber) error | |||
SetCurrentLocalBlockHeight(BlockNumber) error | |||
UpdateCurrentLocalBlockHeight() | |||
CurrentLocalBlockHeight() BlockNumber | |||
CurrentNetworkBlockHeight() BlockNumber | |||
HaveOpsForBlock(BlockNumber) bool | |||
StoreBlockOps(BlockNumber, *[]byte) error | |||
} | |||
@@ -24,11 +27,35 @@ type SteemDataStore struct { | |||
func NewSteemDataStore(hostname string) SteemDataStorer { | |||
self := new(SteemDataStore) | |||
self.kv = NewRedisKVStore(hostname) | |||
self.init() | |||
return self | |||
} | |||
func (self *SteemDataStore) SetCurrentBlockHeight(blockNum BlockNumber) error { | |||
keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) | |||
func (self *SteemDataStore) init() { | |||
self.UpdateCurrentLocalBlockHeight() | |||
} | |||
func (self *SteemDataStore) UpdateCurrentLocalBlockHeight() { | |||
cur := self.CurrentLocalBlockHeight() | |||
next := self.FindHighestContiguousBlockInDb(cur) | |||
if next != cur { | |||
err := self.SetCurrentLocalBlockHeight(next) | |||
log.Infof("current highest contig block in db is now %d", next) | |||
if err != nil { | |||
log.Panic(err) | |||
} | |||
return | |||
} | |||
} | |||
func (self *SteemDataStore) SetCurrentLocalBlockHeight(blockNum BlockNumber) error { | |||
keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix) | |||
value := fmt.Sprintf("%d", blockNum) | |||
return self.kv.Put(&keyname, &value) | |||
} | |||
func (self *SteemDataStore) SetCurrentNetworkBlockHeight(blockNum BlockNumber) error { | |||
keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix) | |||
value := fmt.Sprintf("%d", blockNum) | |||
return self.kv.Put(&keyname, &value) | |||
} | |||
@@ -42,7 +69,10 @@ func (self *SteemDataStore) FindHighestContiguousBlockInDb(from BlockNumber) Blo | |||
for { | |||
try = BlockNumber(uint64(last) + 1) | |||
keyname = fmt.Sprintf("%s.ops_in_block.%d", appPrefix, try) | |||
exists, _ := self.kv.Exists(&keyname) | |||
exists, err := self.kv.Exists(&keyname) | |||
if err != nil { | |||
log.Panic(err) | |||
} | |||
if exists == false { | |||
log.Debugf("cannot find block %d in db, highest found is %d", try, last) | |||
return last | |||
@@ -64,14 +94,27 @@ func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool { | |||
return exists | |||
} | |||
func (self *SteemDataStore) CurrentBlockHeight() BlockNumber { | |||
keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) | |||
func (self *SteemDataStore) CurrentNetworkBlockHeight() BlockNumber { | |||
keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix) | |||
val, err := self.kv.Get(&keyname) | |||
if err != nil { | |||
// assume this is key not found, initialize key to default | |||
self.SetCurrentNetworkBlockHeight(0) | |||
// retry | |||
return self.CurrentNetworkBlockHeight() | |||
} | |||
intval, err := strconv.ParseUint(*val, 10, 64) | |||
return BlockNumber(intval) | |||
} | |||
func (self *SteemDataStore) CurrentLocalBlockHeight() BlockNumber { | |||
keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix) | |||
val, err := self.kv.Get(&keyname) | |||
if err != nil { | |||
// assume this is key not found, initialize key to default | |||
self.SetCurrentBlockHeight(0) | |||
self.SetCurrentLocalBlockHeight(0) | |||
// retry | |||
return self.CurrentBlockHeight() | |||
return self.CurrentLocalBlockHeight() | |||
} | |||
intval, err := strconv.ParseUint(*val, 10, 64) | |||
return BlockNumber(intval) |
@@ -11,26 +11,11 @@ const redisUrl = "localhost:6379" | |||
//const steemAPIURL = "http://las2.local:8090" | |||
func main() { | |||
log.SetLevel(log.DebugLevel) | |||
var x *BlockFetcher | |||
x = NewBlockFetcher(&BlockFetcherConfig{ | |||
api: nil, | |||
desiredFetcherThreads: 40, | |||
startBlock: 10000, | |||
endBlock: 10005, | |||
}) | |||
_ = x | |||
} | |||
/* | |||
func mainx() { | |||
app := NewApp(&appconfig{ | |||
logLevel: log.DebugLevel, | |||
apiUrl: steemAPIURL, | |||
redisUrl: redisUrl, | |||
fetcherThreads: 40, | |||
logLevel: log.InfoLevel, | |||
apiUrl: steemAPIURL, | |||
redisUrl: redisUrl, | |||
desiredFetcherThreads: 20, | |||
}) | |||
app.main() | |||
} | |||
*/ |
@@ -9,10 +9,15 @@ type SteemAPI struct { | |||
rpc *JSONRPC | |||
} | |||
type SteemAPIShape interface { | |||
GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) | |||
GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse, error) | |||
} | |||
var EmptyParams = []string{} | |||
var EmptyParamsRaw, _ = json.Marshal(EmptyParams) | |||
func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI { | |||
func NewSteemAPI(url string, options ...func(s SteemAPIShape)) *SteemAPI { | |||
rpc := NewJSONRPC(url, func(x *JSONRPC) {}) | |||
@@ -46,9 +51,12 @@ func (self *SteemAPI) GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse | |||
// api.steemit.com. | |||
realOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: false} | |||
rop, err := realOpsParams.MarshalJSON() | |||
realOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop) | |||
if err != nil { | |||
return nil, err | |||
} | |||
rawOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop) | |||
var result []OperationObject | |||
err = json.Unmarshal(realOpsResponse, &result) | |||
err = json.Unmarshal(rawOpsResponse, &result) | |||
if err != nil { | |||
return nil, err | |||
} |