diff --git a/db.go b/db.go index 4ec19c5..03b9307 100644 --- a/db.go +++ b/db.go @@ -1,17 +1,16 @@ package main import log "github.com/sirupsen/logrus" -import "io/ioutil" -import "github.com/dgraph-io/badger" -import "github.com/spf13/afero" -import "github.com/go-redis/redis" + +//import "io/ioutil" import "fmt" import "strconv" const appPrefix = "steem-block-fetcher" type SteemDataStorer interface { - SetCurrentBlockHeight(BlockNumber) error + SetCurrentBlockHeight() error + ForceSetCurrentBlockHeight(BlockNumber) error CurrentBlockHeight() BlockNumber HaveOpsForBlock(BlockNumber) bool StoreBlockOps(BlockNumber, *[]byte) error @@ -28,13 +27,45 @@ func NewSteemDataStore(dir string) *SteemDataStore { return self } -func (self *SteemDataStore) SetCurrentBlockHeight(blockNum BlockNumber) error { - panic("unimplemented") - return nil +func (self *SteemDataStore) ForceSetCurrentBlockHeight(blockNum BlockNumber) error { + keyname := fmt.Sprintf("%s:global:CurrentBlockHeight", appPrefix) + value := fmt.Sprintf("%d", blockNum) + return self.kv.Put(&keyname, &value) +} + +// this function searches for the highest contiguously stored blocknum +// and updates the memo in the db +func (self *SteemDataStore) SetCurrentBlockHeight() error { + nextVal := self.FindHighestContiguousBlockInDb(self.CurrentBlockHeight()) + keyname := fmt.Sprintf("%s:global:CurrentBlockHeight", appPrefix) + value := fmt.Sprintf("%d", nextVal) + log.Infof("updating our current highest block in db to %d", nextVal) + return self.kv.Put(&keyname, &value) +} + +func (self *SteemDataStore) FindHighestContiguousBlockInDb(from BlockNumber) BlockNumber { + last := from + + var keyname string + var try BlockNumber + + for { + try = BlockNumber(uint64(last) + 1) + keyname = fmt.Sprintf("%s:BlockOps:%d", appPrefix, try) + exists, _ := self.kv.Exists(&keyname) + if exists == false { + log.Debugf("cannot find block %d in db, highest found is %d", try, last) + return last + } else { + last = try + } + } } func (self *SteemDataStore) StoreBlockOps(blockNum BlockNumber, blockOps *[]byte) error { - panic("unimplemented") + keyname := fmt.Sprintf("%s:BlockOps:%d", appPrefix, blockNum) + value := string(*blockOps) + return self.kv.Put(&keyname, &value) } func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool { @@ -44,130 +75,12 @@ func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool { func (self *SteemDataStore) CurrentBlockHeight() BlockNumber { keyname := fmt.Sprintf("%s:global:CurrentBlockHeight", appPrefix) val, err := self.kv.Get(&keyname) - //FIXME(sneak) make this init the key to 0 if not found if err != nil { - panic("unable to query redis") + // assume this is key not found, initialize key to default + self.ForceSetCurrentBlockHeight(0) + // retry + return self.CurrentBlockHeight() } intval, err := strconv.ParseUint(*val, 10, 64) return BlockNumber(intval) } - -// KeyValueStorer is an interface for the backend kv store used by -// SteemDataStore -// it could be fs, badgerdb, whatever - -type KVStorer interface { - Open(string) - Get(*string) (*string, error) - Put(*string, *string) error - Close() -} - -type RedisKVStore struct { - rc *redis.Client -} - -func NewRedisKVStore() *RedisKVStore { - rkvs := new(RedisKVStore) - rkvs.Open("localhost:6379") //FIXME(sneak) use viper - return rkvs -} - -func (self *RedisKVStore) Get(keyname *string) (*string, error) { - val, err := self.rc.Get(*keyname).Result() - if err != nil { - log.Panicf("unable to fetch key '%s' from redis", *keyname) - //FIXME(sneak) we should probably distinguish between key not - //existing and fetch error - } - return &val, nil -} - -func (self *RedisKVStore) Put(keyname *string, value *string) error { - err := self.rc.Set(*keyname, *value, 0).Err() - if err != nil { - panic("unable to write to redis") - } - return nil -} - -func (self *RedisKVStore) Close() { - self.rc.Close() -} - -func (self *RedisKVStore) Open(hostname string) { - self.rc = redis.NewClient(&redis.Options{ - Addr: hostname, - Password: "", // no password set - DB: 0, // use default DB - }) - - _, err := self.rc.Ping().Result() - if err != nil { - panic(err) - } -} - -type AferoFSKVStore struct { - fs *afero.Fs -} - -// BadgerKVStore is an object that conforms to KeyValueStorer for use -// by SteemDataStore to persist Steem data - -type BadgerKVStore struct { - db *badger.DB -} - -func NewBadgerKVStore(dir string) *BadgerKVStore { - kv := new(BadgerKVStore) - kv.Open(dir) - return kv -} - -func (kv *BadgerKVStore) Open(dir string) { - dir, err := ioutil.TempDir("", "badger") - if err != nil { - log.Fatal(err) - } - - opts := badger.DefaultOptions - opts.Dir = dir - opts.ValueDir = dir - kv.db, err = badger.Open(opts) - if err != nil { - log.Fatal(err) - } -} - -func (kv *BadgerKVStore) Close() { - kv.db.Close() -} - -func (kv *BadgerKVStore) Put(key *string, value *string) error { - txn := kv.db.NewTransaction(true) // Read-write txn - err := txn.Set([]byte(*key), []byte(*value)) - if err != nil { - log.Fatal(err) - } - err = txn.Commit(nil) - if err != nil { - log.Fatal(err) - } - return nil -} - -func (kv *BadgerKVStore) Get(key *string) (*string, error) { - txn := kv.db.NewTransaction(false) - item, err := txn.Get([]byte(*key)) - if err != nil { - return nil, err - } - - val, err := item.ValueCopy(nil) - if err != nil { - return nil, err - } - s := string(val) - return &s, nil -} diff --git a/kvstore.go b/kvstore.go new file mode 100644 index 0000000..f1e5cb8 --- /dev/null +++ b/kvstore.go @@ -0,0 +1,138 @@ +package main + +import log "github.com/sirupsen/logrus" +import "io/ioutil" +import "github.com/dgraph-io/badger" +import "github.com/spf13/afero" +import "github.com/go-redis/redis" + +// KeyValueStorer is an interface for the backend kv store used by +// SteemDataStore +// it could be fs, badgerdb, whatever + +type KVStorer interface { + Open(string) + Get(*string) (*string, error) + Exists(*string) (bool, error) + Put(*string, *string) error + Close() +} + +type RedisKVStore struct { + rc *redis.Client +} + +func NewRedisKVStore() *RedisKVStore { + rkvs := new(RedisKVStore) + rkvs.Open("localhost:6379") //FIXME(sneak) use viper + return rkvs +} + +func (self *RedisKVStore) Exists(keyname *string) (bool, error) { + val, err := self.rc.Exists(*keyname).Result() + if err != nil { + return false, err + } + if val == int64(1) { + return true, nil + } else { + return false, nil + } +} + +func (self *RedisKVStore) Get(keyname *string) (*string, error) { + val, err := self.rc.Get(*keyname).Result() + if err != nil { + return nil, err + } + return &val, nil +} + +func (self *RedisKVStore) Put(keyname *string, value *string) error { + err := self.rc.Set(*keyname, *value, 0).Err() + if err != nil { + panic("unable to write to redis") + } + return nil +} + +func (self *RedisKVStore) Close() { + self.rc.Close() +} + +func (self *RedisKVStore) Open(hostname string) { + self.rc = redis.NewClient(&redis.Options{ + Addr: hostname, + Password: "", // no password set + DB: 0, // use default DB + }) + + _, err := self.rc.Ping().Result() + if err != nil { + panic(err) + } +} + +type AferoFSKVStore struct { + fs *afero.Fs +} + +// BadgerKVStore is an object that conforms to KeyValueStorer for use +// by SteemDataStore to persist Steem data + +type BadgerKVStore struct { + db *badger.DB +} + +func NewBadgerKVStore(dir string) *BadgerKVStore { + kv := new(BadgerKVStore) + kv.Open(dir) + return kv +} + +func (kv *BadgerKVStore) Open(dir string) { + dir, err := ioutil.TempDir("", "badger") + if err != nil { + log.Fatal(err) + } + + opts := badger.DefaultOptions + opts.Dir = dir + opts.ValueDir = dir + kv.db, err = badger.Open(opts) + if err != nil { + log.Fatal(err) + } +} + +func (kv *BadgerKVStore) Close() { + kv.db.Close() +} + +func (kv *BadgerKVStore) Put(key *string, value *string) error { + txn := kv.db.NewTransaction(true) // Read-write txn + err := txn.Set([]byte(*key), []byte(*value)) + if err != nil { + log.Fatal(err) + } + err = txn.Commit(nil) + if err != nil { + log.Fatal(err) + } + return nil +} + +func (kv *BadgerKVStore) Get(key *string) (*string, error) { + txn := kv.db.NewTransaction(false) + item, err := txn.Get([]byte(*key)) + if err != nil { + return nil, err + } + + val, err := item.ValueCopy(nil) + if err != nil { + return nil, err + } + s := string(val) + return &s, nil +} diff --git a/main.go b/main.go index 9d3b5b2..ffbd187 100644 --- a/main.go +++ b/main.go @@ -18,7 +18,8 @@ func main() { func processinit() { //FIXME(sneak) use viper to set loglevel - log.SetLevel(log.DebugLevel) + //log.SetLevel(log.DebugLevel) + log.SetLevel(log.InfoLevel) } type BlockNumber uint64 @@ -44,7 +45,7 @@ func NewApp(config *appconfig) *App { func (self *App) init(config *appconfig) { self.api = NewSteemAPI(steemAPIURL) self.datastore = NewSteemDataStore("./d") - self.desiredFetchingThreads = 10 + self.desiredFetchingThreads = 20 self.currentFetchingThreads = 0 self.lock = &sync.Mutex{} } @@ -87,7 +88,10 @@ func (self *App) spawnNewFetcher(blockNum BlockNumber) { go func() { self.incrFetchers() self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum)) + self.datastore.SetCurrentBlockHeight() + time.Sleep(250 * time.Millisecond) self.decrFetchers() + }() } @@ -110,14 +114,16 @@ func (self *App) mainloop() { for { self.updateCurrentBlockHeight() localHeight := self.datastore.CurrentBlockHeight() - time.Sleep(500 * time.Millisecond) + log.Infof("our highest fetched block height is %d", localHeight) + log.Infof("current number of active fetchers: %d", self.numFetchers()) + time.Sleep(1 * time.Second) if localHeight < self.currentNetworkBlockHeight { // we need to fetch some blocks from the network avail := self.desiredFetchingThreads - self.numFetchers() diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight) log.Infof("we need to fetch %d blocks", diff) if uint64(diff) > uint64(avail) { - panic("not implemented yet") + self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail))) } else { // just spawn fetchers for the blocks we don't have // spawning will update the number of running fetchers