works but doesn't sync up properly

This commit is contained in:
Jeffrey Paul 2018-10-31 03:25:37 -07:00
parent 26bc5ae878
commit 80033e3f33
Signed by: sneak
GPG Key ID: 052443F4DF2A55C2
3 changed files with 192 additions and 135 deletions

175
db.go
View File

@ -1,17 +1,16 @@
package main package main
import log "github.com/sirupsen/logrus" import log "github.com/sirupsen/logrus"
import "io/ioutil"
import "github.com/dgraph-io/badger" //import "io/ioutil"
import "github.com/spf13/afero"
import "github.com/go-redis/redis"
import "fmt" import "fmt"
import "strconv" import "strconv"
const appPrefix = "steem-block-fetcher" const appPrefix = "steem-block-fetcher"
type SteemDataStorer interface { type SteemDataStorer interface {
SetCurrentBlockHeight(BlockNumber) error SetCurrentBlockHeight() error
ForceSetCurrentBlockHeight(BlockNumber) error
CurrentBlockHeight() BlockNumber CurrentBlockHeight() BlockNumber
HaveOpsForBlock(BlockNumber) bool HaveOpsForBlock(BlockNumber) bool
StoreBlockOps(BlockNumber, *[]byte) error StoreBlockOps(BlockNumber, *[]byte) error
@ -28,13 +27,45 @@ func NewSteemDataStore(dir string) *SteemDataStore {
return self return self
} }
func (self *SteemDataStore) SetCurrentBlockHeight(blockNum BlockNumber) error { func (self *SteemDataStore) ForceSetCurrentBlockHeight(blockNum BlockNumber) error {
panic("unimplemented") keyname := fmt.Sprintf("%s:global:CurrentBlockHeight", appPrefix)
return nil value := fmt.Sprintf("%d", blockNum)
return self.kv.Put(&keyname, &value)
}
// this function searches for the highest contiguously stored blocknum
// and updates the memo in the db
func (self *SteemDataStore) SetCurrentBlockHeight() error {
nextVal := self.FindHighestContiguousBlockInDb(self.CurrentBlockHeight())
keyname := fmt.Sprintf("%s:global:CurrentBlockHeight", appPrefix)
value := fmt.Sprintf("%d", nextVal)
log.Infof("updating our current highest block in db to %d", nextVal)
return self.kv.Put(&keyname, &value)
}
func (self *SteemDataStore) FindHighestContiguousBlockInDb(from BlockNumber) BlockNumber {
last := from
var keyname string
var try BlockNumber
for {
try = BlockNumber(uint64(last) + 1)
keyname = fmt.Sprintf("%s:BlockOps:%d", appPrefix, try)
exists, _ := self.kv.Exists(&keyname)
if exists == false {
log.Debugf("cannot find block %d in db, highest found is %d", try, last)
return last
} else {
last = try
}
}
} }
func (self *SteemDataStore) StoreBlockOps(blockNum BlockNumber, blockOps *[]byte) error { func (self *SteemDataStore) StoreBlockOps(blockNum BlockNumber, blockOps *[]byte) error {
panic("unimplemented") keyname := fmt.Sprintf("%s:BlockOps:%d", appPrefix, blockNum)
value := string(*blockOps)
return self.kv.Put(&keyname, &value)
} }
func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool { func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool {
@ -44,130 +75,12 @@ func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool {
func (self *SteemDataStore) CurrentBlockHeight() BlockNumber { func (self *SteemDataStore) CurrentBlockHeight() BlockNumber {
keyname := fmt.Sprintf("%s:global:CurrentBlockHeight", appPrefix) keyname := fmt.Sprintf("%s:global:CurrentBlockHeight", appPrefix)
val, err := self.kv.Get(&keyname) val, err := self.kv.Get(&keyname)
//FIXME(sneak) make this init the key to 0 if not found
if err != nil { if err != nil {
panic("unable to query redis") // assume this is key not found, initialize key to default
self.ForceSetCurrentBlockHeight(0)
// retry
return self.CurrentBlockHeight()
} }
intval, err := strconv.ParseUint(*val, 10, 64) intval, err := strconv.ParseUint(*val, 10, 64)
return BlockNumber(intval) return BlockNumber(intval)
} }
// KeyValueStorer is an interface for the backend kv store used by
// SteemDataStore
// it could be fs, badgerdb, whatever
type KVStorer interface {
Open(string)
Get(*string) (*string, error)
Put(*string, *string) error
Close()
}
type RedisKVStore struct {
rc *redis.Client
}
func NewRedisKVStore() *RedisKVStore {
rkvs := new(RedisKVStore)
rkvs.Open("localhost:6379") //FIXME(sneak) use viper
return rkvs
}
func (self *RedisKVStore) Get(keyname *string) (*string, error) {
val, err := self.rc.Get(*keyname).Result()
if err != nil {
log.Panicf("unable to fetch key '%s' from redis", *keyname)
//FIXME(sneak) we should probably distinguish between key not
//existing and fetch error
}
return &val, nil
}
func (self *RedisKVStore) Put(keyname *string, value *string) error {
err := self.rc.Set(*keyname, *value, 0).Err()
if err != nil {
panic("unable to write to redis")
}
return nil
}
func (self *RedisKVStore) Close() {
self.rc.Close()
}
func (self *RedisKVStore) Open(hostname string) {
self.rc = redis.NewClient(&redis.Options{
Addr: hostname,
Password: "", // no password set
DB: 0, // use default DB
})
_, err := self.rc.Ping().Result()
if err != nil {
panic(err)
}
}
type AferoFSKVStore struct {
fs *afero.Fs
}
// BadgerKVStore is an object that conforms to KeyValueStorer for use
// by SteemDataStore to persist Steem data
type BadgerKVStore struct {
db *badger.DB
}
func NewBadgerKVStore(dir string) *BadgerKVStore {
kv := new(BadgerKVStore)
kv.Open(dir)
return kv
}
func (kv *BadgerKVStore) Open(dir string) {
dir, err := ioutil.TempDir("", "badger")
if err != nil {
log.Fatal(err)
}
opts := badger.DefaultOptions
opts.Dir = dir
opts.ValueDir = dir
kv.db, err = badger.Open(opts)
if err != nil {
log.Fatal(err)
}
}
func (kv *BadgerKVStore) Close() {
kv.db.Close()
}
func (kv *BadgerKVStore) Put(key *string, value *string) error {
txn := kv.db.NewTransaction(true) // Read-write txn
err := txn.Set([]byte(*key), []byte(*value))
if err != nil {
log.Fatal(err)
}
err = txn.Commit(nil)
if err != nil {
log.Fatal(err)
}
return nil
}
func (kv *BadgerKVStore) Get(key *string) (*string, error) {
txn := kv.db.NewTransaction(false)
item, err := txn.Get([]byte(*key))
if err != nil {
return nil, err
}
val, err := item.ValueCopy(nil)
if err != nil {
return nil, err
}
s := string(val)
return &s, nil
}

138
kvstore.go Normal file
View File

@ -0,0 +1,138 @@
package main
import log "github.com/sirupsen/logrus"
import "io/ioutil"
import "github.com/dgraph-io/badger"
import "github.com/spf13/afero"
import "github.com/go-redis/redis"
// KeyValueStorer is an interface for the backend kv store used by
// SteemDataStore
// it could be fs, badgerdb, whatever
type KVStorer interface {
Open(string)
Get(*string) (*string, error)
Exists(*string) (bool, error)
Put(*string, *string) error
Close()
}
type RedisKVStore struct {
rc *redis.Client
}
func NewRedisKVStore() *RedisKVStore {
rkvs := new(RedisKVStore)
rkvs.Open("localhost:6379") //FIXME(sneak) use viper
return rkvs
}
func (self *RedisKVStore) Exists(keyname *string) (bool, error) {
val, err := self.rc.Exists(*keyname).Result()
if err != nil {
return false, err
}
if val == int64(1) {
return true, nil
} else {
return false, nil
}
}
func (self *RedisKVStore) Get(keyname *string) (*string, error) {
val, err := self.rc.Get(*keyname).Result()
if err != nil {
return nil, err
}
return &val, nil
}
func (self *RedisKVStore) Put(keyname *string, value *string) error {
err := self.rc.Set(*keyname, *value, 0).Err()
if err != nil {
panic("unable to write to redis")
}
return nil
}
func (self *RedisKVStore) Close() {
self.rc.Close()
}
func (self *RedisKVStore) Open(hostname string) {
self.rc = redis.NewClient(&redis.Options{
Addr: hostname,
Password: "", // no password set
DB: 0, // use default DB
})
_, err := self.rc.Ping().Result()
if err != nil {
panic(err)
}
}
type AferoFSKVStore struct {
fs *afero.Fs
}
// BadgerKVStore is an object that conforms to KeyValueStorer for use
// by SteemDataStore to persist Steem data
type BadgerKVStore struct {
db *badger.DB
}
func NewBadgerKVStore(dir string) *BadgerKVStore {
kv := new(BadgerKVStore)
kv.Open(dir)
return kv
}
func (kv *BadgerKVStore) Open(dir string) {
dir, err := ioutil.TempDir("", "badger")
if err != nil {
log.Fatal(err)
}
opts := badger.DefaultOptions
opts.Dir = dir
opts.ValueDir = dir
kv.db, err = badger.Open(opts)
if err != nil {
log.Fatal(err)
}
}
func (kv *BadgerKVStore) Close() {
kv.db.Close()
}
func (kv *BadgerKVStore) Put(key *string, value *string) error {
txn := kv.db.NewTransaction(true) // Read-write txn
err := txn.Set([]byte(*key), []byte(*value))
if err != nil {
log.Fatal(err)
}
err = txn.Commit(nil)
if err != nil {
log.Fatal(err)
}
return nil
}
func (kv *BadgerKVStore) Get(key *string) (*string, error) {
txn := kv.db.NewTransaction(false)
item, err := txn.Get([]byte(*key))
if err != nil {
return nil, err
}
val, err := item.ValueCopy(nil)
if err != nil {
return nil, err
}
s := string(val)
return &s, nil
}

14
main.go
View File

@ -18,7 +18,8 @@ func main() {
func processinit() { func processinit() {
//FIXME(sneak) use viper to set loglevel //FIXME(sneak) use viper to set loglevel
log.SetLevel(log.DebugLevel) //log.SetLevel(log.DebugLevel)
log.SetLevel(log.InfoLevel)
} }
type BlockNumber uint64 type BlockNumber uint64
@ -44,7 +45,7 @@ func NewApp(config *appconfig) *App {
func (self *App) init(config *appconfig) { func (self *App) init(config *appconfig) {
self.api = NewSteemAPI(steemAPIURL) self.api = NewSteemAPI(steemAPIURL)
self.datastore = NewSteemDataStore("./d") self.datastore = NewSteemDataStore("./d")
self.desiredFetchingThreads = 10 self.desiredFetchingThreads = 20
self.currentFetchingThreads = 0 self.currentFetchingThreads = 0
self.lock = &sync.Mutex{} self.lock = &sync.Mutex{}
} }
@ -87,7 +88,10 @@ func (self *App) spawnNewFetcher(blockNum BlockNumber) {
go func() { go func() {
self.incrFetchers() self.incrFetchers()
self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum)) self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum))
self.datastore.SetCurrentBlockHeight()
time.Sleep(250 * time.Millisecond)
self.decrFetchers() self.decrFetchers()
}() }()
} }
@ -110,14 +114,16 @@ func (self *App) mainloop() {
for { for {
self.updateCurrentBlockHeight() self.updateCurrentBlockHeight()
localHeight := self.datastore.CurrentBlockHeight() localHeight := self.datastore.CurrentBlockHeight()
time.Sleep(500 * time.Millisecond) log.Infof("our highest fetched block height is %d", localHeight)
log.Infof("current number of active fetchers: %d", self.numFetchers())
time.Sleep(1 * time.Second)
if localHeight < self.currentNetworkBlockHeight { if localHeight < self.currentNetworkBlockHeight {
// we need to fetch some blocks from the network // we need to fetch some blocks from the network
avail := self.desiredFetchingThreads - self.numFetchers() avail := self.desiredFetchingThreads - self.numFetchers()
diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight) diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight)
log.Infof("we need to fetch %d blocks", diff) log.Infof("we need to fetch %d blocks", diff)
if uint64(diff) > uint64(avail) { if uint64(diff) > uint64(avail) {
panic("not implemented yet") self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail)))
} else { } else {
// just spawn fetchers for the blocks we don't have // just spawn fetchers for the blocks we don't have
// spawning will update the number of running fetchers // spawning will update the number of running fetchers