starting to look like a real app now

This commit is contained in:
Jeffrey Paul 2018-10-31 01:48:53 -07:00
parent f91f8e1927
commit b8f85eae72
Signed by: sneak
GPG Key ID: 052443F4DF2A55C2
2 changed files with 164 additions and 62 deletions

77
db.go
View File

@ -1,17 +1,19 @@
package main
import "log"
import log "github.com/sirupsen/logrus"
import "io/ioutil"
import "github.com/dgraph-io/badger"
import "github.com/spf13/afero"
import "github.com/go-redis/redis"
// SteemDataStore is the object with which the rest of this tool interacts
type SteemDataStore struct {
kv KeyValueStorer
kv KVStorer
}
func NewSteemDataStore(dir string) *SteemDataStore {
self := new(SteemDataStore)
self.kv = NewBadgerKeyValueStore(dir)
self.kv = NewRedisKVStore()
return self
}
@ -23,27 +25,76 @@ func (self *SteemDataStore) StoreBlockOps(blockNum int, blockOps []byte) {
// SteemDataStore
// it could be fs, badgerdb, whatever
type KeyValueStorer interface {
type KVStorer interface {
Open(string)
Get(*string) (*string, error)
Put(*string, *string) error
Close()
}
// BadgerKeyValueStore is an object that conforms to KeyValueStorer for use
type RedisKVStore struct {
rc *redis.Client
}
func NewRedisKVStore() *RedisKVStore {
rkvs := new(RedisKVStore)
rkvs.Open("localhost:6379") //FIXME(sneak) use viper
return rkvs
}
func (self *RedisKVStore) Get(keyname *string) (*string, error) {
val, err := self.rc.Get(*keyname).Result()
if err != nil {
panic("unable to fetch key from redis")
//FIXME(sneak) we should probably distinguish between key not
//existing and fetch error
}
return &val, nil
}
func (self *RedisKVStore) Put(keyname *string, value *string) error {
err := self.rc.Set(*keyname, *value, 0).Err()
if err != nil {
panic("unable to write to redis")
}
return nil
}
func (self *RedisKVStore) Close() {
self.rc.Close()
}
func (self *RedisKVStore) Open(hostname string) {
self.rc = redis.NewClient(&redis.Options{
Addr: hostname,
Password: "", // no password set
DB: 0, // use default DB
})
_, err := self.rc.Ping().Result()
if err != nil {
panic(err)
}
}
type AferoFSKVStore struct {
fs *afero.Fs
}
// BadgerKVStore is an object that conforms to KeyValueStorer for use
// by SteemDataStore to persist Steem data
type BadgerKeyValueStore struct {
type BadgerKVStore struct {
db *badger.DB
}
func NewBadgerKeyValueStore(dir string) *BadgerKeyValueStore {
kv := new(BadgerKeyValueStore)
func NewBadgerKVStore(dir string) *BadgerKVStore {
kv := new(BadgerKVStore)
kv.Open(dir)
return kv
}
func (kv *BadgerKeyValueStore) Open(dir string) {
func (kv *BadgerKVStore) Open(dir string) {
dir, err := ioutil.TempDir("", "badger")
if err != nil {
log.Fatal(err)
@ -58,11 +109,11 @@ func (kv *BadgerKeyValueStore) Open(dir string) {
}
}
func (kv *BadgerKeyValueStore) Close() {
func (kv *BadgerKVStore) Close() {
kv.db.Close()
}
func (kv *BadgerKeyValueStore) Put(key *string, value *string) error {
func (kv *BadgerKVStore) Put(key *string, value *string) error {
txn := kv.db.NewTransaction(true) // Read-write txn
err := txn.Set([]byte(*key), []byte(*value))
if err != nil {
@ -75,8 +126,8 @@ func (kv *BadgerKeyValueStore) Put(key *string, value *string) error {
return nil
}
func (kv *BadgerKeyValueStore) Get(key *string) (*string, error) {
txn := kv.db.NewTransaction(true) // Read-write txn
func (kv *BadgerKVStore) Get(key *string) (*string, error) {
txn := kv.db.NewTransaction(false)
item, err := txn.Get([]byte(*key))
if err != nil {
return nil, err

149
main.go
View File

@ -1,37 +1,116 @@
package main
import (
"encoding/json"
"fmt"
log "github.com/sirupsen/logrus"
"github.com/spf13/afero"
)
//import "github.com/spf13/viper"
//import "encoding/json"
//import "fmt"
import "sync"
import "time"
import log "github.com/sirupsen/logrus"
const steemAPIURL = "https://api.steemit.com"
func main() {
log.SetLevel(log.DebugLevel)
var fs = afero.NewBasePathFs(afero.NewOsFs(), "./d")
var s = NewSteemAPI(steemAPIURL)
var state = NewSteemDataStore("./d")
var endBlock = *currentBlockHeight(s)
var startBlock = 1
fetchBlockRange(s, state, startBlock, endBlock)
processinit()
app := NewApp(&appconfig{})
app.main()
}
func fetchBlockRange(s *SteemAPI, fs afero.Fs, startBlock int, endBlock int) *error {
func processinit() {
//FIXME(sneak) use viper to set loglevel
log.SetLevel(log.DebugLevel)
}
type App struct {
datastore *SteemDataStore
api *SteemAPI
currentNetworkBlockHeight int
currentLocalBlockHeight int
currentFetchingThreads int
desiredFetchingThreads int
lock *sync.Mutex
}
type appconfig map[string]string
func NewApp(config *appconfig) *App {
self := new(App)
self.init(config)
return self
}
func (self *App) init(config *appconfig) {
self.api = NewSteemAPI(steemAPIURL)
self.datastore = NewSteemDataStore("./d")
self.desiredFetchingThreads = 10
self.currentFetchingThreads = 0
self.lock = &sync.Mutex{}
}
func (self *App) updateCurrentBlockHeight() {
h := self.fetchCurrentBlockHeight()
if h > self.currentNetworkBlockHeight {
self.lock.Lock()
defer self.lock.Unlock()
self.currentNetworkBlockHeight = h
log.Infof("current block height is now %d", self.currentNetworkBlockHeight)
}
}
func (self *App) main() {
log.Infof("steem block data fetcher starting up...")
self.mainloop()
}
func (self *App) numFetchers() int {
self.lock.Lock()
defer self.lock.Unlock()
return self.currentFetchingThreads
}
func (self *App) incrFetchers() {
self.lock.Lock()
defer self.lock.Unlock()
self.currentFetchingThreads += 1
}
func (self *App) decrFetchers() {
self.lock.Lock()
defer self.lock.Unlock()
self.currentFetchingThreads -= 1
}
func (self *App) spawnNewFetcher(blockNum int) {
self.incrFetchers()
}
func (self *App) mainloop() {
for {
self.updateCurrentBlockHeight()
time.Sleep(1 * time.Second)
}
}
func (self *App) fetchCurrentBlockHeight() int {
r, err := self.api.GetDynamicGlobalProperties()
if err != nil {
panic("can't fetch global properties, bailing")
}
if r.LastIrreversibleBlockNum < 100 {
panic("can't fetch global properties, bailing")
}
return r.LastIrreversibleBlockNum
}
func fetchBlockRange(s *SteemAPI, state *SteemDataStore, startBlock int, endBlock int) *error {
log.Debugf("fetching block range %d to %d inclusive", startBlock, endBlock)
for i := startBlock; i <= endBlock; i++ {
fetchSingleBlock(s, fs, i)
//fetchSingleBlock(s, fs, i)
}
return nil
}
func fetchSingleBlock(s *SteemAPI, fs afero.Fs, blockNum int) {
/*
func (self *App) fetchSingleBlockOps(blockNum int) (*byte[]) {
tmpName := fmt.Sprintf("/blockOps/%d.json.tmp", blockNum)
realName := fmt.Sprintf("/blockOps/%d.json", blockNum)
@ -59,32 +138,4 @@ func fetchSingleBlock(s *SteemAPI, fs afero.Fs, blockNum int) {
}
fs.Rename(tmpName, realName)
}
func nope() {
//r2, err := s.GetOpsInBlock(20000000)
//if err != nil {
// log.Fatal(err)
//}
//spew.Dump(r)
//bytes, err := json.Marshal(r2)
//if err != nil {
// fmt.Println(err)
// }
// fmt.Println(string(bytes))
}
func currentBlockHeight(s *SteemAPI) *int {
r, err := s.GetDynamicGlobalProperties()
if err != nil {
return nil
}
if r.LastIrreversibleBlockNum > 0 {
return &r.LastIrreversibleBlockNum
}
return nil
}
*/