Compare commits
	
		
			7 Commits
		
	
	
		
			32874cca63
			...
			76d26a6f1e
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 76d26a6f1e | |||
| 3d6bf1e08f | |||
| 8ff07a4a0b | |||
| 43d43d3748 | |||
| ff9b67f543 | |||
| 25ccf2bc9e | |||
| c43df78d55 | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @ -0,0 +1 @@ | ||||
| steem-block-db | ||||
							
								
								
									
										30
									
								
								.gitlab-ci.yml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										30
									
								
								.gitlab-ci.yml
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,30 @@ | ||||
| image: golang:1.11 | ||||
| 
 | ||||
| cache: | ||||
|   paths: | ||||
|     - /apt-cache | ||||
|     - /go/src/github.com | ||||
|     - /go/src/golang.org | ||||
|     - /go/src/google.golang.org | ||||
|     - /go/src/gopkg.in | ||||
| 
 | ||||
| stages: | ||||
|   - test | ||||
|   - build | ||||
| 
 | ||||
| before_script: | ||||
|   - mkdir /go/src/steem-block-db | ||||
|   - cp $CI_PROJECT_DIR/*.go /go/src/steem-block-db | ||||
|   - cd /go/src/steem-block-db | ||||
|   - GOPATH=/go go get | ||||
| 
 | ||||
| #unit_tests: | ||||
| #  stage: test | ||||
| #  script: | ||||
| #    - make test | ||||
| 
 | ||||
| build: | ||||
|   stage: build | ||||
|   script: | ||||
|     - cd /go/src/steem-block-db | ||||
|     - GOPATH=/go go build | ||||
							
								
								
									
										19
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										19
									
								
								Makefile
									
									
									
									
									
								
							| @ -1,4 +1,19 @@ | ||||
| default: run | ||||
| GOFILES := $(shell find . -type f -name '*.go' -not -name '*_test.go') | ||||
| 
 | ||||
| default: test | ||||
| 
 | ||||
| .PHONY: run build test | ||||
| 
 | ||||
| run: | ||||
| 	go run *.go | ||||
| 	go run $(GOFILES) | ||||
| 
 | ||||
| build: steem-block-db | ||||
| 
 | ||||
| steem-block-db: *.go | ||||
| 	go build | ||||
| 
 | ||||
| test: | ||||
| 	go test | ||||
| 
 | ||||
| clean: | ||||
| 	rm steem-block-db | ||||
|  | ||||
							
								
								
									
										161
									
								
								app.go
									
									
									
									
									
								
							
							
						
						
									
										161
									
								
								app.go
									
									
									
									
									
								
							| @ -1,21 +1,23 @@ | ||||
| package main | ||||
| 
 | ||||
| import "sync" | ||||
| import "time" | ||||
| import log "github.com/sirupsen/logrus" | ||||
| import "encoding/json" | ||||
| 
 | ||||
| //import "encoding/json"
 | ||||
| 
 | ||||
| type App struct { | ||||
| 	datastore                 SteemDataStorer | ||||
| 	api                       *SteemAPI | ||||
| 	currentNetworkBlockHeight BlockNumber | ||||
| 	currentLocalBlockHeight   BlockNumber | ||||
| 	lock                      *sync.Mutex | ||||
| 	api    SteemAPIShape | ||||
| 	config *appconfig | ||||
| 	db     SteemDataStorer | ||||
| 	lock   *sync.Mutex | ||||
| } | ||||
| 
 | ||||
| type appconfig struct { | ||||
| 	logLevel log.Level | ||||
| 	apiUrl   string | ||||
| 	redisUrl string | ||||
| 	logLevel              log.Level | ||||
| 	apiUrl                string | ||||
| 	redisUrl              string | ||||
| 	desiredFetcherThreads uint | ||||
| } | ||||
| 
 | ||||
| func NewApp(config *appconfig) *App { | ||||
| @ -26,107 +28,80 @@ func NewApp(config *appconfig) *App { | ||||
| 
 | ||||
| func (self *App) init(config *appconfig) { | ||||
| 	log.SetLevel(config.logLevel) | ||||
| 	log.SetFormatter(&log.TextFormatter{ | ||||
| 		FullTimestamp: true, | ||||
| 	}) | ||||
| 	log.SetFormatter(&log.JSONFormatter{}) | ||||
| 	//log.SetReportCaller(true)
 | ||||
| 	self.api = NewSteemAPI(config.apiUrl) | ||||
| 	self.datastore = NewSteemDataStore(config.redisUrl) | ||||
| 	self.db = NewSteemDataStore(config.redisUrl) | ||||
| 	self.lock = &sync.Mutex{} | ||||
| } | ||||
| 
 | ||||
| func (self *App) updateCurrentBlockHeight() { | ||||
| 	h := self.fetchCurrentBlockHeight() | ||||
| 	if h > self.currentNetworkBlockHeight { | ||||
| 		self.lock.Lock() | ||||
| 		defer self.lock.Unlock() | ||||
| 		self.currentNetworkBlockHeight = h | ||||
| 		log.Infof("current block height is now %d", self.currentNetworkBlockHeight) | ||||
| 	} | ||||
| 	self.config = config | ||||
| } | ||||
| 
 | ||||
| func (self *App) main() { | ||||
| 	log.Infof("steem block data fetcher starting up...") | ||||
| 	//self.mainloop()
 | ||||
| } | ||||
| 
 | ||||
| /* | ||||
| func (self *App) numFetchers() uint { | ||||
| 	self.lock.Lock() | ||||
| 	defer self.lock.Unlock() | ||||
| 	return uint(len(*self.fetchingBlocks)) | ||||
| } | ||||
| 
 | ||||
| func (self *App) spawnNewFetcher(blockNum BlockNumber) { | ||||
| 	log.Debugf("spawning fetcher for block %d", blockNum) | ||||
| 	go func() { | ||||
| 		// this is so hacky, make a queue like a grownup would you
 | ||||
| 		time.Sleep(100 * time.Millisecond) | ||||
| 		if self.datastore.HaveOpsForBlock(blockNum) { | ||||
| 			log.Infof("already have ops for block %d, not re-fetching", blockNum) | ||||
| 			return | ||||
| 		} | ||||
| 		//self.incrFetchers()
 | ||||
| 		self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum)) | ||||
| 		//self.decrFetchers()
 | ||||
| 
 | ||||
| 	}() | ||||
| } | ||||
| 
 | ||||
| func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) { | ||||
| 	self.datastore.StoreBlockOps(blockNum, blockOps) | ||||
| } | ||||
| 
 | ||||
| // note that parallelFetchAndStoreBlocks does not respect the
 | ||||
| // limitation on number of desired fetchers, that is for the caller
 | ||||
| func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) { | ||||
| 	var diff = uint64(end) - uint64(start) | ||||
| 	var i uint64 | ||||
| 	for i = 0; i < diff; i++ { | ||||
| 		self.spawnNewFetcher(BlockNumber(uint64(start) + i)) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (self *App) populateFetchers() { | ||||
| 	self.mainloop() | ||||
| } | ||||
| 
 | ||||
| func (self *App) mainloop() { | ||||
| 
 | ||||
| 	log.Infof("using %d fetching threads", self.desiredFetchingThreads) | ||||
| 
 | ||||
| 	self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() | ||||
| 
 | ||||
| 	log.Infof("using %d fetching threads", self.config.desiredFetcherThreads) | ||||
| 	batchSize := uint(3000) | ||||
| 	//batchSize := uint(10)
 | ||||
| 	var start BlockNumber | ||||
| 	var end BlockNumber | ||||
| 	for { | ||||
| 		self.spawnMoreFetchers() | ||||
| 		self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() | ||||
| 		time.Sleep(1000 * time.Millisecond) | ||||
| 		self.db.SetCurrentNetworkBlockHeight(self.fetchCurrentBlockHeight()) | ||||
| 		if self.db.CurrentNetworkBlockHeight() == self.db.CurrentLocalBlockHeight() { | ||||
| 			//we are synced
 | ||||
| 			time.Sleep(1 * time.Second) | ||||
| 			continue | ||||
| 		} | ||||
| 		log.Infof("current network block height = %d", self.db.CurrentNetworkBlockHeight()) | ||||
| 		log.Infof("current local block height = %d", self.db.CurrentLocalBlockHeight()) | ||||
| 		// we are not synced
 | ||||
| 
 | ||||
| 		// how far behind are we?
 | ||||
| 		countMustFetch := uint(self.db.CurrentNetworkBlockHeight() - self.db.CurrentLocalBlockHeight()) | ||||
| 		log.Infof("we are %d blocks behind", countMustFetch) | ||||
| 		start = self.db.CurrentLocalBlockHeight() + 1 | ||||
| 		log.Infof("beginning fetch with start block %d", start) | ||||
| 		if countMustFetch <= batchSize { | ||||
| 			end = self.db.CurrentNetworkBlockHeight() | ||||
| 			log.Infof("fetch is within our batch size (%d), end block for fetch batch is %d", batchSize, end) | ||||
| 		} else { | ||||
| 			end = BlockNumber(uint(start) + uint(batchSize) - uint(1)) | ||||
| 			log.Infof("fetch is too large for batch size (%d), end block for fetch batch is %d", batchSize, end) | ||||
| 		} | ||||
| 
 | ||||
| 		bf := NewBlockFetcher(&BlockFetcherConfig{ | ||||
| 			api:                   self.api.(*SteemAPI), | ||||
| 			desiredFetcherThreads: self.config.desiredFetcherThreads, | ||||
| 			startBlock:            start, | ||||
| 			endBlock:              end, | ||||
| 		}) | ||||
| 
 | ||||
| 		blocks := bf.fetch() | ||||
| 		log.Infof("blockfetcher has returned") | ||||
| 		self.pushBlocks(blocks) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (self *App) spawnMoreFetchers() { | ||||
| } | ||||
| 
 | ||||
| /* | ||||
| 	self.updateCurrentBlockHeight() | ||||
| 	log.Infof("current number of active fetchers: %d", self.numFetchers()) | ||||
| 	time.Sleep(1500 * time.Millisecond) | ||||
| 	self.datastore.SetCurrentBlockHeight() | ||||
| 	localHeight := self.datastore.CurrentBlockHeight() | ||||
| 	log.Infof("our highest fetched block height is %d", localHeight) | ||||
| 	if localHeight < self.currentNetworkBlockHeight { | ||||
| 		// we need to fetch some blocks from the network
 | ||||
| 		avail := self.desiredFetchingThreads - self.numFetchers() | ||||
| 		diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight) | ||||
| 		log.Infof("we need to fetch %d blocks", diff) | ||||
| 		if uint64(diff) > uint64(avail) { | ||||
| 			self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail))) | ||||
| 		} else { | ||||
| 			// just spawn fetchers for the blocks we don't have
 | ||||
| 			// spawning will update the number of running fetchers
 | ||||
| 			self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight) | ||||
| func (self *App) pushBlocks(newBlocks *[]FetchedBlock) { | ||||
| 	counter := 0 | ||||
| 	for _, newBlock := range *newBlocks { | ||||
| 		counter += 1 | ||||
| 		err := self.db.StoreBlockOps(newBlock.blockNumber, newBlock.blockData) | ||||
| 		if err != nil { | ||||
| 			log.Panic(err) | ||||
| 		} | ||||
| 	} | ||||
| 	//needFetchers := self.desiredFetchingThreads - self.numFetchers()
 | ||||
| 
 | ||||
| */ | ||||
| 	log.Infof("pushed %d new blocks to db", counter) | ||||
| 	self.db.UpdateCurrentLocalBlockHeight() | ||||
| } | ||||
| 
 | ||||
| func (self *App) fetchCurrentBlockHeight() BlockNumber { | ||||
| 
 | ||||
| 	r, err := self.api.GetDynamicGlobalProperties() | ||||
| 	if err != nil { | ||||
| 		log.Panicf("can't fetch global properties, bailing. err: %s", err) | ||||
|  | ||||
							
								
								
									
										117
									
								
								blockfetcher.go
									
									
									
									
									
								
							
							
						
						
									
										117
									
								
								blockfetcher.go
									
									
									
									
									
								
							| @ -1,6 +1,8 @@ | ||||
| package main | ||||
| 
 | ||||
| import "encoding/json" | ||||
| import "sync" | ||||
| import "time" | ||||
| import log "github.com/sirupsen/logrus" | ||||
| 
 | ||||
| type FetchedBlock struct { | ||||
| @ -14,7 +16,7 @@ type BlockFetcher struct { | ||||
| 	desiredFetcherThreads uint | ||||
| 	wantBlocks            map[BlockNumber]bool | ||||
| 	fetchingBlocks        map[BlockNumber]bool | ||||
| 	fetchedBlocks         *[]FetchedBlock | ||||
| 	fetchedBlocks         map[BlockNumber]*FetchedBlock | ||||
| 	lock                  *sync.Mutex | ||||
| 	workChannel           chan BlockNumber | ||||
| 	resultsChannel        chan *FetchedBlock | ||||
| @ -43,47 +45,52 @@ func (self *BlockFetcher) init(config *BlockFetcherConfig) { | ||||
| 	self.lock = &sync.Mutex{} | ||||
| 	self.api = config.api | ||||
| 	self.desiredFetcherThreads = config.desiredFetcherThreads | ||||
| 
 | ||||
| 	self.workChannel = make(chan BlockNumber) | ||||
| 	self.resultsChannel = make(chan *FetchedBlock) | ||||
| 	self.wantBlocks = make(map[BlockNumber]bool) | ||||
| 	self.fetchedBlocks = make(map[BlockNumber]*FetchedBlock) | ||||
| 
 | ||||
| 	diff := int(uint(config.endBlock) - uint(config.startBlock)) | ||||
| 	log.Debugf("diff is %d", diff) | ||||
| 	for i := 0; i <= diff; i++ { | ||||
| 		self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true | ||||
| 	} | ||||
| 
 | ||||
| 	log.Debugf("wantblocks[] is now %v", self.wantBlocks) | ||||
| } | ||||
| 
 | ||||
| func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) { | ||||
| 	self.lock.Lock() | ||||
| 	defer self.lock.Unlock() | ||||
| 	if self.wantBlocks[blockNum] == false { | ||||
| 		log.Panicf("shouldn't happen") | ||||
| 	} | ||||
| 	self.lock.Lock() | ||||
| 	defer self.lock.Unlock() | ||||
| 	delete(self.wantBlocks, blockNum) | ||||
| } | ||||
| 
 | ||||
| func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) { | ||||
| 	self.lock.Lock() | ||||
| 	defer self.lock.Unlock() | ||||
| 	if self.fetchingBlocks[blockNum] == false { | ||||
| 		log.Panicf("shouldn't happen") | ||||
| 	} | ||||
| 	self.lock.Lock() | ||||
| 	defer self.lock.Unlock() | ||||
| 	delete(self.fetchingBlocks, blockNum) | ||||
| } | ||||
| 
 | ||||
| func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) { | ||||
| 	self.lock.Lock() | ||||
| 	defer self.lock.Unlock() | ||||
| 	if self.fetchingBlocks[blockNum] == true { | ||||
| 		log.Panicf("shouldn't happen") | ||||
| 	} | ||||
| 	self.lock.Lock() | ||||
| 	defer self.lock.Unlock() | ||||
| 	if self.fetchingBlocks == nil { | ||||
| 		self.fetchingBlocks = make(map[BlockNumber]bool) | ||||
| 	} | ||||
| 	if self.fetchingBlocks[blockNum] == false { | ||||
| 		self.fetchingBlocks[blockNum] = true | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (self *BlockFetcher) fetcher(index int) { | ||||
| func (self *BlockFetcher) fetcher(index uint) { | ||||
| 	log.Debugf("fetcher thread %d starting", index) | ||||
| 
 | ||||
| WorkLoop: | ||||
| @ -100,50 +107,88 @@ WorkLoop: | ||||
| 			} else { | ||||
| 				// wait a sec and try again
 | ||||
| 				time.Sleep(1 * time.Second) | ||||
| 				log.Infof("error fetching block %d, retry %d (%v)", blockNum, i+1, r.error) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	log.Debugf("fetcher thread %d ending", index) | ||||
| } | ||||
| 
 | ||||
| func (self *BlockFetcher) sendWork(b BlockNumber) { | ||||
| 	go func() { | ||||
| 		// yay cheap goroutines, let them block on the unbuffered channel
 | ||||
| 		log.Debugf("waiting to send blockNum %d into the work channel", b) | ||||
| 		self.workChannel <- b | ||||
| 		log.Debugf("sent blockNum %d into the work channel", b) | ||||
| 	}() | ||||
| } | ||||
| 
 | ||||
| func (self *BlockFetcher) fetch() *[]FetchedBlock { | ||||
| 	for i := 1; i < self.desiredFetcherThreads+1; i++ { | ||||
| 	log.Debugf("blockfetcher beginning fetch") | ||||
| 	startTime := time.Now() | ||||
| 	for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ { | ||||
| 		go self.fetcher(i) | ||||
| 	} | ||||
| 	for blockNum, _ := range self.wantBlocks { | ||||
| 		// yay cheap goroutines, let them block on the unbuffered channel
 | ||||
| 		go func() { | ||||
| 			log.Debugf("waiting to send blockNum %d into the work channel", blockNum) | ||||
| 			self.workChannel <- blockNum | ||||
| 			log.Debugf("sent blockNum %d into the work channel", blockNum) | ||||
| 		}() | ||||
| 
 | ||||
| 	self.lock.Lock() | ||||
| 	for blockNum := range self.wantBlocks { | ||||
| 		self.sendWork(blockNum) | ||||
| 	} | ||||
| 	self.lock.Unlock() | ||||
| 
 | ||||
| 	// now we have to start reading from the unbuffered resultsChannel
 | ||||
| 	// otherwise the workers will block when returning results
 | ||||
| 	select { | ||||
| 	case result := <-self.resultsChannel: | ||||
| 		self.receiveResult(result) | ||||
| 	default: | ||||
| 		if self.Done() == true { | ||||
| 			// if we get here, it's because workList is now empty and there
 | ||||
| 			// are no more results in the results channel.
 | ||||
| 			close(self.workChannel) // shut down the workers
 | ||||
| 			result := self.fetchedBlocks | ||||
| 			self = nil //this BlockFetcher is now finished.
 | ||||
| 			return result | ||||
| 	for { | ||||
| 		select { | ||||
| 		case result := <-self.resultsChannel: | ||||
| 			self.receiveResult(result) | ||||
| 		default: | ||||
| 
 | ||||
| 			if startTime.Add(30 * time.Second).Before(time.Now()) { | ||||
| 				// fetch took too long, return anyway
 | ||||
| 				log.Infof("30s fetcher batch timeout reached, but not done fetching") | ||||
| 				// return what we have
 | ||||
| 				var final []FetchedBlock | ||||
| 				self.lock.Lock() | ||||
| 				for _, value := range self.fetchedBlocks { | ||||
| 					final = append(final, *value) | ||||
| 				} | ||||
| 				self.lock.Unlock() | ||||
| 				self = nil //this BlockFetcher is now finished.
 | ||||
| 				return &final | ||||
| 			} | ||||
| 
 | ||||
| 			if self.Done() == true { | ||||
| 				log.Infof("blockfetcher %+v considers itself Done()", self) | ||||
| 				// if we get here, it's because workList is now empty and there
 | ||||
| 				// are no more results in the results channel.
 | ||||
| 				close(self.workChannel) // shut down the workers
 | ||||
| 				var final []FetchedBlock | ||||
| 				self.lock.Lock() | ||||
| 				for _, value := range self.fetchedBlocks { | ||||
| 					final = append(final, *value) | ||||
| 				} | ||||
| 				self.lock.Unlock() | ||||
| 				self = nil //this BlockFetcher is now finished.
 | ||||
| 				return &final | ||||
| 			} | ||||
| 			// in this case we are not done but got nothing from the result
 | ||||
| 			// channel so just wait a little bit to get more results and
 | ||||
| 			// check the channel again
 | ||||
| 			time.Sleep(10 * time.Millisecond) | ||||
| 			//FIXME(sneak) we maybe need to handle a case here where wantBlocks never
 | ||||
| 			//empties but workers need to be re-dispatched..
 | ||||
| 		} | ||||
| 		time.Sleep(10 * time.Millisecond) | ||||
| 		//FIXME(sneak) we maybe need to handle a case here where wantBlocks never
 | ||||
| 		//empties but workers need to be re-dispatched..
 | ||||
| 	} | ||||
| 	log.Panicf("this shouldn't happen") | ||||
| 	return nil //shouldn't happen, return should happen from above
 | ||||
| } | ||||
| 
 | ||||
| func (self *BlockFetcher) receiveResult(r *FetchedBlock) { | ||||
| 	log.Debugf("got result for blocknum %d", r.blockNumber) | ||||
| 	self.removeFetchingBlock(r.blockNumber) | ||||
| 	self.lock.Lock() | ||||
| 	self.fetchedBlocks = append(self.fetchedBlocks, r) | ||||
| 	self.fetchedBlocks[r.blockNumber] = r | ||||
| 	self.lock.Unlock() | ||||
| 	self.removeWantBlock(r.blockNumber) | ||||
| } | ||||
| @ -156,16 +201,16 @@ func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *Fetche | ||||
| 	} | ||||
| 	r, err := self.api.GetOpsInBlock(blockNum) | ||||
| 	if err != nil { | ||||
| 		result.error = err | ||||
| 		result.error = &err | ||||
| 		return result | ||||
| 	} | ||||
| 	bytes, err := json.Marshal(r) | ||||
| 	if err != nil { | ||||
| 		result.error = err | ||||
| 		result.error = &err | ||||
| 		return result | ||||
| 	} | ||||
| 	count := len(*r) | ||||
| 	log.Infof("got %d operations for block %d", count, blockNum) | ||||
| 	log.Infof("got %d operations for block %d fetched from network", count, blockNum) | ||||
| 	result.blockData = &bytes | ||||
| 	result.error = nil // make sure this is nil if it worked
 | ||||
| 	return result | ||||
|  | ||||
							
								
								
									
										53
									
								
								blockfetcher_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										53
									
								
								blockfetcher_test.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,53 @@ | ||||
| package main | ||||
| 
 | ||||
| import "testing" | ||||
| import log "github.com/sirupsen/logrus" | ||||
| 
 | ||||
| /* | ||||
| import "github.com/stretchr/testify/mock" | ||||
| 
 | ||||
| type MockedSteemAPI struct { | ||||
| 	mock.Mock | ||||
| } | ||||
| 
 | ||||
| func (m *MockedSteemAPI) DoSomething(number int) (bool, error) { | ||||
| 	args := m.Called(number) | ||||
| 	return args.Bool(0), args.Error(1) | ||||
| } | ||||
| 
 | ||||
| */ | ||||
| 
 | ||||
| func TestBlockfetcherInit(t *testing.T) { | ||||
| 
 | ||||
| 	log.SetLevel(log.DebugLevel) | ||||
| 
 | ||||
| 	bf := NewBlockFetcher(&BlockFetcherConfig{ | ||||
| 		api:                   nil, | ||||
| 		desiredFetcherThreads: 1, | ||||
| 		startBlock:            10000, | ||||
| 		endBlock:              10005, | ||||
| 	}) | ||||
| 
 | ||||
| 	if bf == nil { | ||||
| 		t.Errorf("could not instantiate blockfetcher") | ||||
| 	} | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| //can't actually fetch yet until we mock the api
 | ||||
| /* | ||||
| func TestBlockfetcherFetch(t *testing.T) { | ||||
| 
 | ||||
| 	log.SetLevel(log.DebugLevel) | ||||
| 
 | ||||
| 	bf := NewBlockFetcher(&BlockFetcherConfig{ | ||||
| 		api:                   nil, | ||||
| 		desiredFetcherThreads: 1, | ||||
| 		startBlock:            10000, | ||||
| 		endBlock:              10005, | ||||
| 	}) | ||||
| 
 | ||||
| 	bf.fetch() | ||||
| } | ||||
| 
 | ||||
| */ | ||||
							
								
								
									
										61
									
								
								db.go
									
									
									
									
									
								
							
							
						
						
									
										61
									
								
								db.go
									
									
									
									
									
								
							| @ -10,8 +10,11 @@ import "strconv" | ||||
| const appPrefix = "sbf" | ||||
| 
 | ||||
| type SteemDataStorer interface { | ||||
| 	SetCurrentBlockHeight(BlockNumber) error | ||||
| 	CurrentBlockHeight() BlockNumber | ||||
| 	SetCurrentNetworkBlockHeight(BlockNumber) error | ||||
| 	SetCurrentLocalBlockHeight(BlockNumber) error | ||||
| 	UpdateCurrentLocalBlockHeight() | ||||
| 	CurrentLocalBlockHeight() BlockNumber | ||||
| 	CurrentNetworkBlockHeight() BlockNumber | ||||
| 	HaveOpsForBlock(BlockNumber) bool | ||||
| 	StoreBlockOps(BlockNumber, *[]byte) error | ||||
| } | ||||
| @ -24,11 +27,35 @@ type SteemDataStore struct { | ||||
| func NewSteemDataStore(hostname string) SteemDataStorer { | ||||
| 	self := new(SteemDataStore) | ||||
| 	self.kv = NewRedisKVStore(hostname) | ||||
| 	self.init() | ||||
| 	return self | ||||
| } | ||||
| 
 | ||||
| func (self *SteemDataStore) SetCurrentBlockHeight(blockNum BlockNumber) error { | ||||
| 	keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) | ||||
| func (self *SteemDataStore) init() { | ||||
| 	self.UpdateCurrentLocalBlockHeight() | ||||
| } | ||||
| 
 | ||||
| func (self *SteemDataStore) UpdateCurrentLocalBlockHeight() { | ||||
| 	cur := self.CurrentLocalBlockHeight() | ||||
| 	next := self.FindHighestContiguousBlockInDb(cur) | ||||
| 	if next != cur { | ||||
| 		err := self.SetCurrentLocalBlockHeight(next) | ||||
| 		log.Infof("current highest contig block in db is now %d", next) | ||||
| 		if err != nil { | ||||
| 			log.Panic(err) | ||||
| 		} | ||||
| 		return | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (self *SteemDataStore) SetCurrentLocalBlockHeight(blockNum BlockNumber) error { | ||||
| 	keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix) | ||||
| 	value := fmt.Sprintf("%d", blockNum) | ||||
| 	return self.kv.Put(&keyname, &value) | ||||
| } | ||||
| 
 | ||||
| func (self *SteemDataStore) SetCurrentNetworkBlockHeight(blockNum BlockNumber) error { | ||||
| 	keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix) | ||||
| 	value := fmt.Sprintf("%d", blockNum) | ||||
| 	return self.kv.Put(&keyname, &value) | ||||
| } | ||||
| @ -42,7 +69,10 @@ func (self *SteemDataStore) FindHighestContiguousBlockInDb(from BlockNumber) Blo | ||||
| 	for { | ||||
| 		try = BlockNumber(uint64(last) + 1) | ||||
| 		keyname = fmt.Sprintf("%s.ops_in_block.%d", appPrefix, try) | ||||
| 		exists, _ := self.kv.Exists(&keyname) | ||||
| 		exists, err := self.kv.Exists(&keyname) | ||||
| 		if err != nil { | ||||
| 			log.Panic(err) | ||||
| 		} | ||||
| 		if exists == false { | ||||
| 			log.Debugf("cannot find block %d in db, highest found is %d", try, last) | ||||
| 			return last | ||||
| @ -64,14 +94,27 @@ func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool { | ||||
| 	return exists | ||||
| } | ||||
| 
 | ||||
| func (self *SteemDataStore) CurrentBlockHeight() BlockNumber { | ||||
| 	keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) | ||||
| func (self *SteemDataStore) CurrentNetworkBlockHeight() BlockNumber { | ||||
| 	keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix) | ||||
| 	val, err := self.kv.Get(&keyname) | ||||
| 	if err != nil { | ||||
| 		// assume this is key not found, initialize key to default
 | ||||
| 		self.SetCurrentBlockHeight(0) | ||||
| 		self.SetCurrentNetworkBlockHeight(0) | ||||
| 		// retry
 | ||||
| 		return self.CurrentBlockHeight() | ||||
| 		return self.CurrentNetworkBlockHeight() | ||||
| 	} | ||||
| 	intval, err := strconv.ParseUint(*val, 10, 64) | ||||
| 	return BlockNumber(intval) | ||||
| } | ||||
| 
 | ||||
| func (self *SteemDataStore) CurrentLocalBlockHeight() BlockNumber { | ||||
| 	keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix) | ||||
| 	val, err := self.kv.Get(&keyname) | ||||
| 	if err != nil { | ||||
| 		// assume this is key not found, initialize key to default
 | ||||
| 		self.SetCurrentLocalBlockHeight(0) | ||||
| 		// retry
 | ||||
| 		return self.CurrentLocalBlockHeight() | ||||
| 	} | ||||
| 	intval, err := strconv.ParseUint(*val, 10, 64) | ||||
| 	return BlockNumber(intval) | ||||
|  | ||||
| @ -11,6 +11,7 @@ import ( | ||||
| 	"io" | ||||
| 	"io/ioutil" | ||||
| 	"net/http" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| type httpRPCClient interface { | ||||
| @ -47,9 +48,14 @@ type JSONRPC struct { | ||||
| 
 | ||||
| // New create new rpc client with given url
 | ||||
| func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC { | ||||
| 
 | ||||
| 	netClient := &http.Client{ | ||||
| 		Timeout: time.Second * 20, | ||||
| 	} | ||||
| 
 | ||||
| 	rpc := &JSONRPC{ | ||||
| 		url:    url, | ||||
| 		client: http.DefaultClient, | ||||
| 		client: netClient, | ||||
| 	} | ||||
| 	for _, option := range options { | ||||
| 		option(rpc) | ||||
| @ -77,6 +83,7 @@ func (rpc *JSONRPC) Call(method string, params json.RawMessage) (json.RawMessage | ||||
| 		defer response.Body.Close() | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		log.Infof("jsonrpc error: %v", err) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
|  | ||||
| @ -115,7 +115,7 @@ func (kv *BadgerKVStore) Put(key *string, value *string) error { | ||||
| 	if err != nil { | ||||
| 		log.Fatal(err) | ||||
| 	} | ||||
| 	err = txn.Commit(nil) | ||||
| 	err = txn.Commit() | ||||
| 	if err != nil { | ||||
| 		log.Fatal(err) | ||||
| 	} | ||||
|  | ||||
							
								
								
									
										52
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										52
									
								
								main.go
									
									
									
									
									
								
							| @ -1,36 +1,36 @@ | ||||
| package main | ||||
| 
 | ||||
| //import "github.com/spf13/viper"
 | ||||
| //import "encoding/json"
 | ||||
| import "github.com/spf13/viper" | ||||
| import log "github.com/sirupsen/logrus" | ||||
| 
 | ||||
| const steemAPIURL = "https://api.steemit.com" | ||||
| const redisUrl = "localhost:6379" | ||||
| 
 | ||||
| //const steemAPIURL = "http://10.100.202.175:8090"
 | ||||
| //const steemAPIURL = "http://las2.local:8090"
 | ||||
| // STEEM_APIURL=https://api.steem.house ./steem-block-db
 | ||||
| 
 | ||||
| func main() { | ||||
| 	log.SetLevel(log.DebugLevel) | ||||
| 	var x *BlockFetcher | ||||
| 	x = NewBlockFetcher(&BlockFetcherConfig{ | ||||
| 		api:                   nil, | ||||
| 		desiredFetcherThreads: 40, | ||||
| 		startBlock:            10000, | ||||
| 		endBlock:              10005, | ||||
| 	}) | ||||
| 	_ = x | ||||
| } | ||||
| 
 | ||||
| /* | ||||
| func mainx() { | ||||
| 	viper.SetConfigName("steem") | ||||
| 	viper.AddConfigPath("/etc/steem") | ||||
| 	viper.AddConfigPath("$HOME/.config/steem") | ||||
| 	viper.SetEnvPrefix("steem") | ||||
| 	viper.BindEnv("debug") | ||||
| 	viper.BindEnv("redis") | ||||
| 	viper.BindEnv("apiurl") | ||||
| 	viper.ReadInConfig() // Find and read the config file if exists
 | ||||
| 	logLevel := log.InfoLevel | ||||
| 	if viper.GetBool("debug") == true { | ||||
| 		logLevel = log.DebugLevel | ||||
| 	} | ||||
| 	redis := "localhost:6379" | ||||
| 	if viper.Get("redis") != nil { | ||||
| 		redis = viper.GetString("redis") | ||||
| 	} | ||||
| 	apiurl := "https://api.steemit.com" | ||||
| 	if viper.Get("apiurl") != nil { | ||||
| 		apiurl = viper.GetString("apiurl") | ||||
| 	} | ||||
| 	app := NewApp(&appconfig{ | ||||
| 		logLevel:       log.DebugLevel, | ||||
| 		apiUrl:         steemAPIURL, | ||||
| 		redisUrl:       redisUrl, | ||||
| 		fetcherThreads: 40, | ||||
| 		logLevel:              logLevel, | ||||
| 		apiUrl:                apiurl, | ||||
| 		redisUrl:              redis, | ||||
| 		desiredFetcherThreads: 30, | ||||
| 	}) | ||||
| 	app.main() | ||||
| } | ||||
| 
 | ||||
| */ | ||||
|  | ||||
							
								
								
									
										16
									
								
								steemapi.go
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								steemapi.go
									
									
									
									
									
								
							| @ -9,10 +9,15 @@ type SteemAPI struct { | ||||
| 	rpc *JSONRPC | ||||
| } | ||||
| 
 | ||||
| type SteemAPIShape interface { | ||||
| 	GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) | ||||
| 	GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse, error) | ||||
| } | ||||
| 
 | ||||
| var EmptyParams = []string{} | ||||
| var EmptyParamsRaw, _ = json.Marshal(EmptyParams) | ||||
| 
 | ||||
| func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI { | ||||
| func NewSteemAPI(url string, options ...func(s SteemAPIShape)) *SteemAPI { | ||||
| 
 | ||||
| 	rpc := NewJSONRPC(url, func(x *JSONRPC) {}) | ||||
| 
 | ||||
| @ -29,7 +34,7 @@ func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI { | ||||
| 
 | ||||
| func (self *SteemAPI) GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) { | ||||
| 	var resp DynamicGlobalProperties | ||||
| 	raw, err := self.rpc.Call("get_dynamic_global_properties", EmptyParamsRaw) | ||||
| 	raw, err := self.rpc.Call("condenser_api.get_dynamic_global_properties", EmptyParamsRaw) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @ -46,9 +51,12 @@ func (self *SteemAPI) GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse | ||||
| 	// api.steemit.com.
 | ||||
| 	realOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: false} | ||||
| 	rop, err := realOpsParams.MarshalJSON() | ||||
| 	realOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	rawOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop) | ||||
| 	var result []OperationObject | ||||
| 	err = json.Unmarshal(realOpsResponse, &result) | ||||
| 	err = json.Unmarshal(rawOpsResponse, &result) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user