diff --git a/db.go b/db.go index 16880aa..f3a49f8 100644 --- a/db.go +++ b/db.go @@ -1,17 +1,19 @@ package main -import "log" +import log "github.com/sirupsen/logrus" import "io/ioutil" import "github.com/dgraph-io/badger" +import "github.com/spf13/afero" +import "github.com/go-redis/redis" // SteemDataStore is the object with which the rest of this tool interacts type SteemDataStore struct { - kv KeyValueStorer + kv KVStorer } func NewSteemDataStore(dir string) *SteemDataStore { self := new(SteemDataStore) - self.kv = NewBadgerKeyValueStore(dir) + self.kv = NewRedisKVStore() return self } @@ -23,27 +25,76 @@ func (self *SteemDataStore) StoreBlockOps(blockNum int, blockOps []byte) { // SteemDataStore // it could be fs, badgerdb, whatever -type KeyValueStorer interface { +type KVStorer interface { Open(string) Get(*string) (*string, error) Put(*string, *string) error Close() } -// BadgerKeyValueStore is an object that conforms to KeyValueStorer for use +type RedisKVStore struct { + rc *redis.Client +} + +func NewRedisKVStore() *RedisKVStore { + rkvs := new(RedisKVStore) + rkvs.Open("localhost:6379") //FIXME(sneak) use viper + return rkvs +} + +func (self *RedisKVStore) Get(keyname *string) (*string, error) { + val, err := self.rc.Get(*keyname).Result() + if err != nil { + panic("unable to fetch key from redis") + //FIXME(sneak) we should probably distinguish between key not + //existing and fetch error + } + return &val, nil +} + +func (self *RedisKVStore) Put(keyname *string, value *string) error { + err := self.rc.Set(*keyname, *value, 0).Err() + if err != nil { + panic("unable to write to redis") + } + return nil +} + +func (self *RedisKVStore) Close() { + self.rc.Close() +} + +func (self *RedisKVStore) Open(hostname string) { + self.rc = redis.NewClient(&redis.Options{ + Addr: hostname, + Password: "", // no password set + DB: 0, // use default DB + }) + + _, err := self.rc.Ping().Result() + if err != nil { + panic(err) + } +} + +type AferoFSKVStore struct { + fs *afero.Fs +} + +// BadgerKVStore is an object that conforms to KeyValueStorer for use // by SteemDataStore to persist Steem data -type BadgerKeyValueStore struct { +type BadgerKVStore struct { db *badger.DB } -func NewBadgerKeyValueStore(dir string) *BadgerKeyValueStore { - kv := new(BadgerKeyValueStore) +func NewBadgerKVStore(dir string) *BadgerKVStore { + kv := new(BadgerKVStore) kv.Open(dir) return kv } -func (kv *BadgerKeyValueStore) Open(dir string) { +func (kv *BadgerKVStore) Open(dir string) { dir, err := ioutil.TempDir("", "badger") if err != nil { log.Fatal(err) @@ -58,11 +109,11 @@ func (kv *BadgerKeyValueStore) Open(dir string) { } } -func (kv *BadgerKeyValueStore) Close() { +func (kv *BadgerKVStore) Close() { kv.db.Close() } -func (kv *BadgerKeyValueStore) Put(key *string, value *string) error { +func (kv *BadgerKVStore) Put(key *string, value *string) error { txn := kv.db.NewTransaction(true) // Read-write txn err := txn.Set([]byte(*key), []byte(*value)) if err != nil { @@ -75,8 +126,8 @@ func (kv *BadgerKeyValueStore) Put(key *string, value *string) error { return nil } -func (kv *BadgerKeyValueStore) Get(key *string) (*string, error) { - txn := kv.db.NewTransaction(true) // Read-write txn +func (kv *BadgerKVStore) Get(key *string) (*string, error) { + txn := kv.db.NewTransaction(false) item, err := txn.Get([]byte(*key)) if err != nil { return nil, err diff --git a/main.go b/main.go index cca809b..387bcaa 100644 --- a/main.go +++ b/main.go @@ -1,37 +1,116 @@ package main -import ( - "encoding/json" - "fmt" - log "github.com/sirupsen/logrus" - "github.com/spf13/afero" -) +//import "github.com/spf13/viper" +//import "encoding/json" +//import "fmt" +import "sync" +import "time" +import log "github.com/sirupsen/logrus" const steemAPIURL = "https://api.steemit.com" func main() { - log.SetLevel(log.DebugLevel) - - var fs = afero.NewBasePathFs(afero.NewOsFs(), "./d") - var s = NewSteemAPI(steemAPIURL) - var state = NewSteemDataStore("./d") - - var endBlock = *currentBlockHeight(s) - - var startBlock = 1 - - fetchBlockRange(s, state, startBlock, endBlock) + processinit() + app := NewApp(&appconfig{}) + app.main() } -func fetchBlockRange(s *SteemAPI, fs afero.Fs, startBlock int, endBlock int) *error { +func processinit() { + //FIXME(sneak) use viper to set loglevel + log.SetLevel(log.DebugLevel) +} + +type App struct { + datastore *SteemDataStore + api *SteemAPI + currentNetworkBlockHeight int + currentLocalBlockHeight int + currentFetchingThreads int + desiredFetchingThreads int + lock *sync.Mutex +} + +type appconfig map[string]string + +func NewApp(config *appconfig) *App { + self := new(App) + self.init(config) + return self +} + +func (self *App) init(config *appconfig) { + self.api = NewSteemAPI(steemAPIURL) + self.datastore = NewSteemDataStore("./d") + self.desiredFetchingThreads = 10 + self.currentFetchingThreads = 0 + self.lock = &sync.Mutex{} +} + +func (self *App) updateCurrentBlockHeight() { + h := self.fetchCurrentBlockHeight() + if h > self.currentNetworkBlockHeight { + self.lock.Lock() + defer self.lock.Unlock() + self.currentNetworkBlockHeight = h + log.Infof("current block height is now %d", self.currentNetworkBlockHeight) + } +} + +func (self *App) main() { + log.Infof("steem block data fetcher starting up...") + self.mainloop() +} + +func (self *App) numFetchers() int { + self.lock.Lock() + defer self.lock.Unlock() + return self.currentFetchingThreads +} + +func (self *App) incrFetchers() { + self.lock.Lock() + defer self.lock.Unlock() + self.currentFetchingThreads += 1 +} + +func (self *App) decrFetchers() { + self.lock.Lock() + defer self.lock.Unlock() + self.currentFetchingThreads -= 1 +} + +func (self *App) spawnNewFetcher(blockNum int) { + self.incrFetchers() +} + +func (self *App) mainloop() { + for { + self.updateCurrentBlockHeight() + time.Sleep(1 * time.Second) + } +} + +func (self *App) fetchCurrentBlockHeight() int { + r, err := self.api.GetDynamicGlobalProperties() + if err != nil { + panic("can't fetch global properties, bailing") + } + if r.LastIrreversibleBlockNum < 100 { + panic("can't fetch global properties, bailing") + } + return r.LastIrreversibleBlockNum +} + +func fetchBlockRange(s *SteemAPI, state *SteemDataStore, startBlock int, endBlock int) *error { log.Debugf("fetching block range %d to %d inclusive", startBlock, endBlock) for i := startBlock; i <= endBlock; i++ { - fetchSingleBlock(s, fs, i) + //fetchSingleBlock(s, fs, i) } return nil } -func fetchSingleBlock(s *SteemAPI, fs afero.Fs, blockNum int) { +/* +func (self *App) fetchSingleBlockOps(blockNum int) (*byte[]) { tmpName := fmt.Sprintf("/blockOps/%d.json.tmp", blockNum) realName := fmt.Sprintf("/blockOps/%d.json", blockNum) @@ -59,32 +138,4 @@ func fetchSingleBlock(s *SteemAPI, fs afero.Fs, blockNum int) { } fs.Rename(tmpName, realName) } - -func nope() { - - //r2, err := s.GetOpsInBlock(20000000) - - //if err != nil { - // log.Fatal(err) - //} - - //spew.Dump(r) - - //bytes, err := json.Marshal(r2) - - //if err != nil { - // fmt.Println(err) - // } - // fmt.Println(string(bytes)) -} - -func currentBlockHeight(s *SteemAPI) *int { - r, err := s.GetDynamicGlobalProperties() - if err != nil { - return nil - } - if r.LastIrreversibleBlockNum > 0 { - return &r.LastIrreversibleBlockNum - } - return nil -} +*/