Compare commits
	
		
			No commits in common. "76d26a6f1e84475c74bd30f8d69d05e55d1b4a90" and "32874cca63768ce0d4faac6228097dd57609f7dd" have entirely different histories.
		
	
	
		
			76d26a6f1e
			...
			32874cca63
		
	
		
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @ -1 +0,0 @@ | |||||||
| steem-block-db |  | ||||||
| @ -1,30 +0,0 @@ | |||||||
| image: golang:1.11 |  | ||||||
| 
 |  | ||||||
| cache: |  | ||||||
|   paths: |  | ||||||
|     - /apt-cache |  | ||||||
|     - /go/src/github.com |  | ||||||
|     - /go/src/golang.org |  | ||||||
|     - /go/src/google.golang.org |  | ||||||
|     - /go/src/gopkg.in |  | ||||||
| 
 |  | ||||||
| stages: |  | ||||||
|   - test |  | ||||||
|   - build |  | ||||||
| 
 |  | ||||||
| before_script: |  | ||||||
|   - mkdir /go/src/steem-block-db |  | ||||||
|   - cp $CI_PROJECT_DIR/*.go /go/src/steem-block-db |  | ||||||
|   - cd /go/src/steem-block-db |  | ||||||
|   - GOPATH=/go go get |  | ||||||
| 
 |  | ||||||
| #unit_tests: |  | ||||||
| #  stage: test |  | ||||||
| #  script: |  | ||||||
| #    - make test |  | ||||||
| 
 |  | ||||||
| build: |  | ||||||
|   stage: build |  | ||||||
|   script: |  | ||||||
|     - cd /go/src/steem-block-db |  | ||||||
|     - GOPATH=/go go build |  | ||||||
							
								
								
									
										19
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										19
									
								
								Makefile
									
									
									
									
									
								
							| @ -1,19 +1,4 @@ | |||||||
| GOFILES := $(shell find . -type f -name '*.go' -not -name '*_test.go') | default: run | ||||||
| 
 |  | ||||||
| default: test |  | ||||||
| 
 |  | ||||||
| .PHONY: run build test |  | ||||||
| 
 | 
 | ||||||
| run: | run: | ||||||
| 	go run $(GOFILES) | 	go run *.go | ||||||
| 
 |  | ||||||
| build: steem-block-db |  | ||||||
| 
 |  | ||||||
| steem-block-db: *.go |  | ||||||
| 	go build |  | ||||||
| 
 |  | ||||||
| test: |  | ||||||
| 	go test |  | ||||||
| 
 |  | ||||||
| clean: |  | ||||||
| 	rm steem-block-db |  | ||||||
|  | |||||||
							
								
								
									
										163
									
								
								app.go
									
									
									
									
									
								
							
							
						
						
									
										163
									
								
								app.go
									
									
									
									
									
								
							| @ -1,23 +1,21 @@ | |||||||
| package main | package main | ||||||
| 
 | 
 | ||||||
| import "sync" | import "sync" | ||||||
| import "time" |  | ||||||
| import log "github.com/sirupsen/logrus" | import log "github.com/sirupsen/logrus" | ||||||
| 
 | import "encoding/json" | ||||||
| //import "encoding/json"
 |  | ||||||
| 
 | 
 | ||||||
| type App struct { | type App struct { | ||||||
| 	api    SteemAPIShape | 	datastore                 SteemDataStorer | ||||||
| 	config *appconfig | 	api                       *SteemAPI | ||||||
| 	db     SteemDataStorer | 	currentNetworkBlockHeight BlockNumber | ||||||
| 	lock   *sync.Mutex | 	currentLocalBlockHeight   BlockNumber | ||||||
|  | 	lock                      *sync.Mutex | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type appconfig struct { | type appconfig struct { | ||||||
| 	logLevel              log.Level | 	logLevel log.Level | ||||||
| 	apiUrl                string | 	apiUrl   string | ||||||
| 	redisUrl              string | 	redisUrl string | ||||||
| 	desiredFetcherThreads uint |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewApp(config *appconfig) *App { | func NewApp(config *appconfig) *App { | ||||||
| @ -28,80 +26,107 @@ func NewApp(config *appconfig) *App { | |||||||
| 
 | 
 | ||||||
| func (self *App) init(config *appconfig) { | func (self *App) init(config *appconfig) { | ||||||
| 	log.SetLevel(config.logLevel) | 	log.SetLevel(config.logLevel) | ||||||
| 	log.SetFormatter(&log.TextFormatter{ |  | ||||||
| 		FullTimestamp: true, |  | ||||||
| 	}) |  | ||||||
| 	log.SetFormatter(&log.JSONFormatter{}) |  | ||||||
| 	//log.SetReportCaller(true)
 |  | ||||||
| 	self.api = NewSteemAPI(config.apiUrl) | 	self.api = NewSteemAPI(config.apiUrl) | ||||||
| 	self.db = NewSteemDataStore(config.redisUrl) | 	self.datastore = NewSteemDataStore(config.redisUrl) | ||||||
| 	self.lock = &sync.Mutex{} | 	self.lock = &sync.Mutex{} | ||||||
| 	self.config = config | } | ||||||
|  | 
 | ||||||
|  | func (self *App) updateCurrentBlockHeight() { | ||||||
|  | 	h := self.fetchCurrentBlockHeight() | ||||||
|  | 	if h > self.currentNetworkBlockHeight { | ||||||
|  | 		self.lock.Lock() | ||||||
|  | 		defer self.lock.Unlock() | ||||||
|  | 		self.currentNetworkBlockHeight = h | ||||||
|  | 		log.Infof("current block height is now %d", self.currentNetworkBlockHeight) | ||||||
|  | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *App) main() { | func (self *App) main() { | ||||||
| 	log.Infof("steem block data fetcher starting up...") | 	log.Infof("steem block data fetcher starting up...") | ||||||
| 	self.mainloop() | 	//self.mainloop()
 | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | /* | ||||||
|  | func (self *App) numFetchers() uint { | ||||||
|  | 	self.lock.Lock() | ||||||
|  | 	defer self.lock.Unlock() | ||||||
|  | 	return uint(len(*self.fetchingBlocks)) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (self *App) spawnNewFetcher(blockNum BlockNumber) { | ||||||
|  | 	log.Debugf("spawning fetcher for block %d", blockNum) | ||||||
|  | 	go func() { | ||||||
|  | 		// this is so hacky, make a queue like a grownup would you
 | ||||||
|  | 		time.Sleep(100 * time.Millisecond) | ||||||
|  | 		if self.datastore.HaveOpsForBlock(blockNum) { | ||||||
|  | 			log.Infof("already have ops for block %d, not re-fetching", blockNum) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		//self.incrFetchers()
 | ||||||
|  | 		self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum)) | ||||||
|  | 		//self.decrFetchers()
 | ||||||
|  | 
 | ||||||
|  | 	}() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) { | ||||||
|  | 	self.datastore.StoreBlockOps(blockNum, blockOps) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // note that parallelFetchAndStoreBlocks does not respect the
 | ||||||
|  | // limitation on number of desired fetchers, that is for the caller
 | ||||||
|  | func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) { | ||||||
|  | 	var diff = uint64(end) - uint64(start) | ||||||
|  | 	var i uint64 | ||||||
|  | 	for i = 0; i < diff; i++ { | ||||||
|  | 		self.spawnNewFetcher(BlockNumber(uint64(start) + i)) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (self *App) populateFetchers() { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *App) mainloop() { | func (self *App) mainloop() { | ||||||
| 	log.Infof("using %d fetching threads", self.config.desiredFetcherThreads) | 
 | ||||||
| 	batchSize := uint(3000) | 	log.Infof("using %d fetching threads", self.desiredFetchingThreads) | ||||||
| 	//batchSize := uint(10)
 | 
 | ||||||
| 	var start BlockNumber | 	self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() | ||||||
| 	var end BlockNumber | 
 | ||||||
| 	for { | 	for { | ||||||
| 		self.db.SetCurrentNetworkBlockHeight(self.fetchCurrentBlockHeight()) | 		self.spawnMoreFetchers() | ||||||
| 		if self.db.CurrentNetworkBlockHeight() == self.db.CurrentLocalBlockHeight() { | 		self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() | ||||||
| 			//we are synced
 | 		time.Sleep(1000 * time.Millisecond) | ||||||
| 			time.Sleep(1 * time.Second) | 	} | ||||||
| 			continue | } | ||||||
| 		} |  | ||||||
| 		log.Infof("current network block height = %d", self.db.CurrentNetworkBlockHeight()) |  | ||||||
| 		log.Infof("current local block height = %d", self.db.CurrentLocalBlockHeight()) |  | ||||||
| 		// we are not synced
 |  | ||||||
| 
 | 
 | ||||||
| 		// how far behind are we?
 | func (self *App) spawnMoreFetchers() { | ||||||
| 		countMustFetch := uint(self.db.CurrentNetworkBlockHeight() - self.db.CurrentLocalBlockHeight()) | } | ||||||
| 		log.Infof("we are %d blocks behind", countMustFetch) | 
 | ||||||
| 		start = self.db.CurrentLocalBlockHeight() + 1 | /* | ||||||
| 		log.Infof("beginning fetch with start block %d", start) | 	self.updateCurrentBlockHeight() | ||||||
| 		if countMustFetch <= batchSize { | 	log.Infof("current number of active fetchers: %d", self.numFetchers()) | ||||||
| 			end = self.db.CurrentNetworkBlockHeight() | 	time.Sleep(1500 * time.Millisecond) | ||||||
| 			log.Infof("fetch is within our batch size (%d), end block for fetch batch is %d", batchSize, end) | 	self.datastore.SetCurrentBlockHeight() | ||||||
|  | 	localHeight := self.datastore.CurrentBlockHeight() | ||||||
|  | 	log.Infof("our highest fetched block height is %d", localHeight) | ||||||
|  | 	if localHeight < self.currentNetworkBlockHeight { | ||||||
|  | 		// we need to fetch some blocks from the network
 | ||||||
|  | 		avail := self.desiredFetchingThreads - self.numFetchers() | ||||||
|  | 		diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight) | ||||||
|  | 		log.Infof("we need to fetch %d blocks", diff) | ||||||
|  | 		if uint64(diff) > uint64(avail) { | ||||||
|  | 			self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail))) | ||||||
| 		} else { | 		} else { | ||||||
| 			end = BlockNumber(uint(start) + uint(batchSize) - uint(1)) | 			// just spawn fetchers for the blocks we don't have
 | ||||||
| 			log.Infof("fetch is too large for batch size (%d), end block for fetch batch is %d", batchSize, end) | 			// spawning will update the number of running fetchers
 | ||||||
| 		} | 			self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight) | ||||||
| 
 |  | ||||||
| 		bf := NewBlockFetcher(&BlockFetcherConfig{ |  | ||||||
| 			api:                   self.api.(*SteemAPI), |  | ||||||
| 			desiredFetcherThreads: self.config.desiredFetcherThreads, |  | ||||||
| 			startBlock:            start, |  | ||||||
| 			endBlock:              end, |  | ||||||
| 		}) |  | ||||||
| 
 |  | ||||||
| 		blocks := bf.fetch() |  | ||||||
| 		log.Infof("blockfetcher has returned") |  | ||||||
| 		self.pushBlocks(blocks) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (self *App) pushBlocks(newBlocks *[]FetchedBlock) { |  | ||||||
| 	counter := 0 |  | ||||||
| 	for _, newBlock := range *newBlocks { |  | ||||||
| 		counter += 1 |  | ||||||
| 		err := self.db.StoreBlockOps(newBlock.blockNumber, newBlock.blockData) |  | ||||||
| 		if err != nil { |  | ||||||
| 			log.Panic(err) |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	log.Infof("pushed %d new blocks to db", counter) | 	//needFetchers := self.desiredFetchingThreads - self.numFetchers()
 | ||||||
| 	self.db.UpdateCurrentLocalBlockHeight() | 
 | ||||||
| } | */ | ||||||
| 
 | 
 | ||||||
| func (self *App) fetchCurrentBlockHeight() BlockNumber { | func (self *App) fetchCurrentBlockHeight() BlockNumber { | ||||||
| 
 |  | ||||||
| 	r, err := self.api.GetDynamicGlobalProperties() | 	r, err := self.api.GetDynamicGlobalProperties() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Panicf("can't fetch global properties, bailing. err: %s", err) | 		log.Panicf("can't fetch global properties, bailing. err: %s", err) | ||||||
|  | |||||||
							
								
								
									
										117
									
								
								blockfetcher.go
									
									
									
									
									
								
							
							
						
						
									
										117
									
								
								blockfetcher.go
									
									
									
									
									
								
							| @ -1,8 +1,6 @@ | |||||||
| package main | package main | ||||||
| 
 | 
 | ||||||
| import "encoding/json" |  | ||||||
| import "sync" | import "sync" | ||||||
| import "time" |  | ||||||
| import log "github.com/sirupsen/logrus" | import log "github.com/sirupsen/logrus" | ||||||
| 
 | 
 | ||||||
| type FetchedBlock struct { | type FetchedBlock struct { | ||||||
| @ -16,7 +14,7 @@ type BlockFetcher struct { | |||||||
| 	desiredFetcherThreads uint | 	desiredFetcherThreads uint | ||||||
| 	wantBlocks            map[BlockNumber]bool | 	wantBlocks            map[BlockNumber]bool | ||||||
| 	fetchingBlocks        map[BlockNumber]bool | 	fetchingBlocks        map[BlockNumber]bool | ||||||
| 	fetchedBlocks         map[BlockNumber]*FetchedBlock | 	fetchedBlocks         *[]FetchedBlock | ||||||
| 	lock                  *sync.Mutex | 	lock                  *sync.Mutex | ||||||
| 	workChannel           chan BlockNumber | 	workChannel           chan BlockNumber | ||||||
| 	resultsChannel        chan *FetchedBlock | 	resultsChannel        chan *FetchedBlock | ||||||
| @ -45,52 +43,47 @@ func (self *BlockFetcher) init(config *BlockFetcherConfig) { | |||||||
| 	self.lock = &sync.Mutex{} | 	self.lock = &sync.Mutex{} | ||||||
| 	self.api = config.api | 	self.api = config.api | ||||||
| 	self.desiredFetcherThreads = config.desiredFetcherThreads | 	self.desiredFetcherThreads = config.desiredFetcherThreads | ||||||
|  | 
 | ||||||
| 	self.workChannel = make(chan BlockNumber) | 	self.workChannel = make(chan BlockNumber) | ||||||
| 	self.resultsChannel = make(chan *FetchedBlock) | 	self.resultsChannel = make(chan *FetchedBlock) | ||||||
| 	self.wantBlocks = make(map[BlockNumber]bool) |  | ||||||
| 	self.fetchedBlocks = make(map[BlockNumber]*FetchedBlock) |  | ||||||
| 
 |  | ||||||
| 	diff := int(uint(config.endBlock) - uint(config.startBlock)) | 	diff := int(uint(config.endBlock) - uint(config.startBlock)) | ||||||
|  | 	log.Debugf("diff is %d", diff) | ||||||
| 	for i := 0; i <= diff; i++ { | 	for i := 0; i <= diff; i++ { | ||||||
| 		self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true | 		self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true | ||||||
| 	} | 	} | ||||||
| 
 |  | ||||||
| 	log.Debugf("wantblocks[] is now %v", self.wantBlocks) | 	log.Debugf("wantblocks[] is now %v", self.wantBlocks) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) { | func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) { | ||||||
| 	self.lock.Lock() |  | ||||||
| 	defer self.lock.Unlock() |  | ||||||
| 	if self.wantBlocks[blockNum] == false { | 	if self.wantBlocks[blockNum] == false { | ||||||
| 		log.Panicf("shouldn't happen") | 		log.Panicf("shouldn't happen") | ||||||
| 	} | 	} | ||||||
|  | 	self.lock.Lock() | ||||||
|  | 	defer self.lock.Unlock() | ||||||
| 	delete(self.wantBlocks, blockNum) | 	delete(self.wantBlocks, blockNum) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) { | func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) { | ||||||
| 	self.lock.Lock() |  | ||||||
| 	defer self.lock.Unlock() |  | ||||||
| 	if self.fetchingBlocks[blockNum] == false { | 	if self.fetchingBlocks[blockNum] == false { | ||||||
| 		log.Panicf("shouldn't happen") | 		log.Panicf("shouldn't happen") | ||||||
| 	} | 	} | ||||||
|  | 	self.lock.Lock() | ||||||
|  | 	defer self.lock.Unlock() | ||||||
| 	delete(self.fetchingBlocks, blockNum) | 	delete(self.fetchingBlocks, blockNum) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) { | func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) { | ||||||
| 	self.lock.Lock() |  | ||||||
| 	defer self.lock.Unlock() |  | ||||||
| 	if self.fetchingBlocks[blockNum] == true { | 	if self.fetchingBlocks[blockNum] == true { | ||||||
| 		log.Panicf("shouldn't happen") | 		log.Panicf("shouldn't happen") | ||||||
| 	} | 	} | ||||||
| 	if self.fetchingBlocks == nil { | 	self.lock.Lock() | ||||||
| 		self.fetchingBlocks = make(map[BlockNumber]bool) | 	defer self.lock.Unlock() | ||||||
| 	} |  | ||||||
| 	if self.fetchingBlocks[blockNum] == false { | 	if self.fetchingBlocks[blockNum] == false { | ||||||
| 		self.fetchingBlocks[blockNum] = true | 		self.fetchingBlocks[blockNum] = true | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *BlockFetcher) fetcher(index uint) { | func (self *BlockFetcher) fetcher(index int) { | ||||||
| 	log.Debugf("fetcher thread %d starting", index) | 	log.Debugf("fetcher thread %d starting", index) | ||||||
| 
 | 
 | ||||||
| WorkLoop: | WorkLoop: | ||||||
| @ -107,88 +100,50 @@ WorkLoop: | |||||||
| 			} else { | 			} else { | ||||||
| 				// wait a sec and try again
 | 				// wait a sec and try again
 | ||||||
| 				time.Sleep(1 * time.Second) | 				time.Sleep(1 * time.Second) | ||||||
| 				log.Infof("error fetching block %d, retry %d (%v)", blockNum, i+1, r.error) |  | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	log.Debugf("fetcher thread %d ending", index) | 	log.Debugf("fetcher thread %d ending", index) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *BlockFetcher) sendWork(b BlockNumber) { |  | ||||||
| 	go func() { |  | ||||||
| 		// yay cheap goroutines, let them block on the unbuffered channel
 |  | ||||||
| 		log.Debugf("waiting to send blockNum %d into the work channel", b) |  | ||||||
| 		self.workChannel <- b |  | ||||||
| 		log.Debugf("sent blockNum %d into the work channel", b) |  | ||||||
| 	}() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (self *BlockFetcher) fetch() *[]FetchedBlock { | func (self *BlockFetcher) fetch() *[]FetchedBlock { | ||||||
| 	log.Debugf("blockfetcher beginning fetch") | 	for i := 1; i < self.desiredFetcherThreads+1; i++ { | ||||||
| 	startTime := time.Now() |  | ||||||
| 	for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ { |  | ||||||
| 		go self.fetcher(i) | 		go self.fetcher(i) | ||||||
| 	} | 	} | ||||||
| 
 | 	for blockNum, _ := range self.wantBlocks { | ||||||
| 	self.lock.Lock() | 		// yay cheap goroutines, let them block on the unbuffered channel
 | ||||||
| 	for blockNum := range self.wantBlocks { | 		go func() { | ||||||
| 		self.sendWork(blockNum) | 			log.Debugf("waiting to send blockNum %d into the work channel", blockNum) | ||||||
|  | 			self.workChannel <- blockNum | ||||||
|  | 			log.Debugf("sent blockNum %d into the work channel", blockNum) | ||||||
|  | 		}() | ||||||
| 	} | 	} | ||||||
| 	self.lock.Unlock() |  | ||||||
| 
 | 
 | ||||||
| 	// now we have to start reading from the unbuffered resultsChannel
 | 	// now we have to start reading from the unbuffered resultsChannel
 | ||||||
| 	// otherwise the workers will block when returning results
 | 	// otherwise the workers will block when returning results
 | ||||||
| 	for { | 	select { | ||||||
| 		select { | 	case result := <-self.resultsChannel: | ||||||
| 		case result := <-self.resultsChannel: | 		self.receiveResult(result) | ||||||
| 			self.receiveResult(result) | 	default: | ||||||
| 		default: | 		if self.Done() == true { | ||||||
| 
 | 			// if we get here, it's because workList is now empty and there
 | ||||||
| 			if startTime.Add(30 * time.Second).Before(time.Now()) { | 			// are no more results in the results channel.
 | ||||||
| 				// fetch took too long, return anyway
 | 			close(self.workChannel) // shut down the workers
 | ||||||
| 				log.Infof("30s fetcher batch timeout reached, but not done fetching") | 			result := self.fetchedBlocks | ||||||
| 				// return what we have
 | 			self = nil //this BlockFetcher is now finished.
 | ||||||
| 				var final []FetchedBlock | 			return result | ||||||
| 				self.lock.Lock() |  | ||||||
| 				for _, value := range self.fetchedBlocks { |  | ||||||
| 					final = append(final, *value) |  | ||||||
| 				} |  | ||||||
| 				self.lock.Unlock() |  | ||||||
| 				self = nil //this BlockFetcher is now finished.
 |  | ||||||
| 				return &final |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			if self.Done() == true { |  | ||||||
| 				log.Infof("blockfetcher %+v considers itself Done()", self) |  | ||||||
| 				// if we get here, it's because workList is now empty and there
 |  | ||||||
| 				// are no more results in the results channel.
 |  | ||||||
| 				close(self.workChannel) // shut down the workers
 |  | ||||||
| 				var final []FetchedBlock |  | ||||||
| 				self.lock.Lock() |  | ||||||
| 				for _, value := range self.fetchedBlocks { |  | ||||||
| 					final = append(final, *value) |  | ||||||
| 				} |  | ||||||
| 				self.lock.Unlock() |  | ||||||
| 				self = nil //this BlockFetcher is now finished.
 |  | ||||||
| 				return &final |  | ||||||
| 			} |  | ||||||
| 			// in this case we are not done but got nothing from the result
 |  | ||||||
| 			// channel so just wait a little bit to get more results and
 |  | ||||||
| 			// check the channel again
 |  | ||||||
| 			time.Sleep(10 * time.Millisecond) |  | ||||||
| 			//FIXME(sneak) we maybe need to handle a case here where wantBlocks never
 |  | ||||||
| 			//empties but workers need to be re-dispatched..
 |  | ||||||
| 		} | 		} | ||||||
|  | 		time.Sleep(10 * time.Millisecond) | ||||||
|  | 		//FIXME(sneak) we maybe need to handle a case here where wantBlocks never
 | ||||||
|  | 		//empties but workers need to be re-dispatched..
 | ||||||
| 	} | 	} | ||||||
| 	log.Panicf("this shouldn't happen") |  | ||||||
| 	return nil //shouldn't happen, return should happen from above
 |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *BlockFetcher) receiveResult(r *FetchedBlock) { | func (self *BlockFetcher) receiveResult(r *FetchedBlock) { | ||||||
| 	log.Debugf("got result for blocknum %d", r.blockNumber) | 	log.Debugf("got result for blocknum %d", r.blockNumber) | ||||||
| 	self.removeFetchingBlock(r.blockNumber) | 	self.removeFetchingBlock(r.blockNumber) | ||||||
| 	self.lock.Lock() | 	self.lock.Lock() | ||||||
| 	self.fetchedBlocks[r.blockNumber] = r | 	self.fetchedBlocks = append(self.fetchedBlocks, r) | ||||||
| 	self.lock.Unlock() | 	self.lock.Unlock() | ||||||
| 	self.removeWantBlock(r.blockNumber) | 	self.removeWantBlock(r.blockNumber) | ||||||
| } | } | ||||||
| @ -201,16 +156,16 @@ func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *Fetche | |||||||
| 	} | 	} | ||||||
| 	r, err := self.api.GetOpsInBlock(blockNum) | 	r, err := self.api.GetOpsInBlock(blockNum) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		result.error = &err | 		result.error = err | ||||||
| 		return result | 		return result | ||||||
| 	} | 	} | ||||||
| 	bytes, err := json.Marshal(r) | 	bytes, err := json.Marshal(r) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		result.error = &err | 		result.error = err | ||||||
| 		return result | 		return result | ||||||
| 	} | 	} | ||||||
| 	count := len(*r) | 	count := len(*r) | ||||||
| 	log.Infof("got %d operations for block %d fetched from network", count, blockNum) | 	log.Infof("got %d operations for block %d", count, blockNum) | ||||||
| 	result.blockData = &bytes | 	result.blockData = &bytes | ||||||
| 	result.error = nil // make sure this is nil if it worked
 | 	result.error = nil // make sure this is nil if it worked
 | ||||||
| 	return result | 	return result | ||||||
|  | |||||||
| @ -1,53 +0,0 @@ | |||||||
| package main |  | ||||||
| 
 |  | ||||||
| import "testing" |  | ||||||
| import log "github.com/sirupsen/logrus" |  | ||||||
| 
 |  | ||||||
| /* |  | ||||||
| import "github.com/stretchr/testify/mock" |  | ||||||
| 
 |  | ||||||
| type MockedSteemAPI struct { |  | ||||||
| 	mock.Mock |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (m *MockedSteemAPI) DoSomething(number int) (bool, error) { |  | ||||||
| 	args := m.Called(number) |  | ||||||
| 	return args.Bool(0), args.Error(1) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| */ |  | ||||||
| 
 |  | ||||||
| func TestBlockfetcherInit(t *testing.T) { |  | ||||||
| 
 |  | ||||||
| 	log.SetLevel(log.DebugLevel) |  | ||||||
| 
 |  | ||||||
| 	bf := NewBlockFetcher(&BlockFetcherConfig{ |  | ||||||
| 		api:                   nil, |  | ||||||
| 		desiredFetcherThreads: 1, |  | ||||||
| 		startBlock:            10000, |  | ||||||
| 		endBlock:              10005, |  | ||||||
| 	}) |  | ||||||
| 
 |  | ||||||
| 	if bf == nil { |  | ||||||
| 		t.Errorf("could not instantiate blockfetcher") |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| //can't actually fetch yet until we mock the api
 |  | ||||||
| /* |  | ||||||
| func TestBlockfetcherFetch(t *testing.T) { |  | ||||||
| 
 |  | ||||||
| 	log.SetLevel(log.DebugLevel) |  | ||||||
| 
 |  | ||||||
| 	bf := NewBlockFetcher(&BlockFetcherConfig{ |  | ||||||
| 		api:                   nil, |  | ||||||
| 		desiredFetcherThreads: 1, |  | ||||||
| 		startBlock:            10000, |  | ||||||
| 		endBlock:              10005, |  | ||||||
| 	}) |  | ||||||
| 
 |  | ||||||
| 	bf.fetch() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| */ |  | ||||||
							
								
								
									
										61
									
								
								db.go
									
									
									
									
									
								
							
							
						
						
									
										61
									
								
								db.go
									
									
									
									
									
								
							| @ -10,11 +10,8 @@ import "strconv" | |||||||
| const appPrefix = "sbf" | const appPrefix = "sbf" | ||||||
| 
 | 
 | ||||||
| type SteemDataStorer interface { | type SteemDataStorer interface { | ||||||
| 	SetCurrentNetworkBlockHeight(BlockNumber) error | 	SetCurrentBlockHeight(BlockNumber) error | ||||||
| 	SetCurrentLocalBlockHeight(BlockNumber) error | 	CurrentBlockHeight() BlockNumber | ||||||
| 	UpdateCurrentLocalBlockHeight() |  | ||||||
| 	CurrentLocalBlockHeight() BlockNumber |  | ||||||
| 	CurrentNetworkBlockHeight() BlockNumber |  | ||||||
| 	HaveOpsForBlock(BlockNumber) bool | 	HaveOpsForBlock(BlockNumber) bool | ||||||
| 	StoreBlockOps(BlockNumber, *[]byte) error | 	StoreBlockOps(BlockNumber, *[]byte) error | ||||||
| } | } | ||||||
| @ -27,35 +24,11 @@ type SteemDataStore struct { | |||||||
| func NewSteemDataStore(hostname string) SteemDataStorer { | func NewSteemDataStore(hostname string) SteemDataStorer { | ||||||
| 	self := new(SteemDataStore) | 	self := new(SteemDataStore) | ||||||
| 	self.kv = NewRedisKVStore(hostname) | 	self.kv = NewRedisKVStore(hostname) | ||||||
| 	self.init() |  | ||||||
| 	return self | 	return self | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *SteemDataStore) init() { | func (self *SteemDataStore) SetCurrentBlockHeight(blockNum BlockNumber) error { | ||||||
| 	self.UpdateCurrentLocalBlockHeight() | 	keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (self *SteemDataStore) UpdateCurrentLocalBlockHeight() { |  | ||||||
| 	cur := self.CurrentLocalBlockHeight() |  | ||||||
| 	next := self.FindHighestContiguousBlockInDb(cur) |  | ||||||
| 	if next != cur { |  | ||||||
| 		err := self.SetCurrentLocalBlockHeight(next) |  | ||||||
| 		log.Infof("current highest contig block in db is now %d", next) |  | ||||||
| 		if err != nil { |  | ||||||
| 			log.Panic(err) |  | ||||||
| 		} |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (self *SteemDataStore) SetCurrentLocalBlockHeight(blockNum BlockNumber) error { |  | ||||||
| 	keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix) |  | ||||||
| 	value := fmt.Sprintf("%d", blockNum) |  | ||||||
| 	return self.kv.Put(&keyname, &value) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (self *SteemDataStore) SetCurrentNetworkBlockHeight(blockNum BlockNumber) error { |  | ||||||
| 	keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix) |  | ||||||
| 	value := fmt.Sprintf("%d", blockNum) | 	value := fmt.Sprintf("%d", blockNum) | ||||||
| 	return self.kv.Put(&keyname, &value) | 	return self.kv.Put(&keyname, &value) | ||||||
| } | } | ||||||
| @ -69,10 +42,7 @@ func (self *SteemDataStore) FindHighestContiguousBlockInDb(from BlockNumber) Blo | |||||||
| 	for { | 	for { | ||||||
| 		try = BlockNumber(uint64(last) + 1) | 		try = BlockNumber(uint64(last) + 1) | ||||||
| 		keyname = fmt.Sprintf("%s.ops_in_block.%d", appPrefix, try) | 		keyname = fmt.Sprintf("%s.ops_in_block.%d", appPrefix, try) | ||||||
| 		exists, err := self.kv.Exists(&keyname) | 		exists, _ := self.kv.Exists(&keyname) | ||||||
| 		if err != nil { |  | ||||||
| 			log.Panic(err) |  | ||||||
| 		} |  | ||||||
| 		if exists == false { | 		if exists == false { | ||||||
| 			log.Debugf("cannot find block %d in db, highest found is %d", try, last) | 			log.Debugf("cannot find block %d in db, highest found is %d", try, last) | ||||||
| 			return last | 			return last | ||||||
| @ -94,27 +64,14 @@ func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool { | |||||||
| 	return exists | 	return exists | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *SteemDataStore) CurrentNetworkBlockHeight() BlockNumber { | func (self *SteemDataStore) CurrentBlockHeight() BlockNumber { | ||||||
| 	keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix) | 	keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) | ||||||
| 	val, err := self.kv.Get(&keyname) | 	val, err := self.kv.Get(&keyname) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		// assume this is key not found, initialize key to default
 | 		// assume this is key not found, initialize key to default
 | ||||||
| 		self.SetCurrentNetworkBlockHeight(0) | 		self.SetCurrentBlockHeight(0) | ||||||
| 		// retry
 | 		// retry
 | ||||||
| 		return self.CurrentNetworkBlockHeight() | 		return self.CurrentBlockHeight() | ||||||
| 	} |  | ||||||
| 	intval, err := strconv.ParseUint(*val, 10, 64) |  | ||||||
| 	return BlockNumber(intval) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (self *SteemDataStore) CurrentLocalBlockHeight() BlockNumber { |  | ||||||
| 	keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix) |  | ||||||
| 	val, err := self.kv.Get(&keyname) |  | ||||||
| 	if err != nil { |  | ||||||
| 		// assume this is key not found, initialize key to default
 |  | ||||||
| 		self.SetCurrentLocalBlockHeight(0) |  | ||||||
| 		// retry
 |  | ||||||
| 		return self.CurrentLocalBlockHeight() |  | ||||||
| 	} | 	} | ||||||
| 	intval, err := strconv.ParseUint(*val, 10, 64) | 	intval, err := strconv.ParseUint(*val, 10, 64) | ||||||
| 	return BlockNumber(intval) | 	return BlockNumber(intval) | ||||||
|  | |||||||
| @ -11,7 +11,6 @@ import ( | |||||||
| 	"io" | 	"io" | ||||||
| 	"io/ioutil" | 	"io/ioutil" | ||||||
| 	"net/http" | 	"net/http" | ||||||
| 	"time" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type httpRPCClient interface { | type httpRPCClient interface { | ||||||
| @ -48,14 +47,9 @@ type JSONRPC struct { | |||||||
| 
 | 
 | ||||||
| // New create new rpc client with given url
 | // New create new rpc client with given url
 | ||||||
| func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC { | func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC { | ||||||
| 
 |  | ||||||
| 	netClient := &http.Client{ |  | ||||||
| 		Timeout: time.Second * 20, |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	rpc := &JSONRPC{ | 	rpc := &JSONRPC{ | ||||||
| 		url:    url, | 		url:    url, | ||||||
| 		client: netClient, | 		client: http.DefaultClient, | ||||||
| 	} | 	} | ||||||
| 	for _, option := range options { | 	for _, option := range options { | ||||||
| 		option(rpc) | 		option(rpc) | ||||||
| @ -83,7 +77,6 @@ func (rpc *JSONRPC) Call(method string, params json.RawMessage) (json.RawMessage | |||||||
| 		defer response.Body.Close() | 		defer response.Body.Close() | ||||||
| 	} | 	} | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Infof("jsonrpc error: %v", err) |  | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -115,7 +115,7 @@ func (kv *BadgerKVStore) Put(key *string, value *string) error { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Fatal(err) | 		log.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	err = txn.Commit() | 	err = txn.Commit(nil) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Fatal(err) | 		log.Fatal(err) | ||||||
| 	} | 	} | ||||||
|  | |||||||
							
								
								
									
										52
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										52
									
								
								main.go
									
									
									
									
									
								
							| @ -1,36 +1,36 @@ | |||||||
| package main | package main | ||||||
| 
 | 
 | ||||||
| import "github.com/spf13/viper" | //import "github.com/spf13/viper"
 | ||||||
|  | //import "encoding/json"
 | ||||||
| import log "github.com/sirupsen/logrus" | import log "github.com/sirupsen/logrus" | ||||||
| 
 | 
 | ||||||
| // STEEM_APIURL=https://api.steem.house ./steem-block-db
 | const steemAPIURL = "https://api.steemit.com" | ||||||
|  | const redisUrl = "localhost:6379" | ||||||
|  | 
 | ||||||
|  | //const steemAPIURL = "http://10.100.202.175:8090"
 | ||||||
|  | //const steemAPIURL = "http://las2.local:8090"
 | ||||||
| 
 | 
 | ||||||
| func main() { | func main() { | ||||||
| 	viper.SetConfigName("steem") | 	log.SetLevel(log.DebugLevel) | ||||||
| 	viper.AddConfigPath("/etc/steem") | 	var x *BlockFetcher | ||||||
| 	viper.AddConfigPath("$HOME/.config/steem") | 	x = NewBlockFetcher(&BlockFetcherConfig{ | ||||||
| 	viper.SetEnvPrefix("steem") | 		api:                   nil, | ||||||
| 	viper.BindEnv("debug") | 		desiredFetcherThreads: 40, | ||||||
| 	viper.BindEnv("redis") | 		startBlock:            10000, | ||||||
| 	viper.BindEnv("apiurl") | 		endBlock:              10005, | ||||||
| 	viper.ReadInConfig() // Find and read the config file if exists
 | 	}) | ||||||
| 	logLevel := log.InfoLevel | 	_ = x | ||||||
| 	if viper.GetBool("debug") == true { | } | ||||||
| 		logLevel = log.DebugLevel | 
 | ||||||
| 	} | /* | ||||||
| 	redis := "localhost:6379" | func mainx() { | ||||||
| 	if viper.Get("redis") != nil { |  | ||||||
| 		redis = viper.GetString("redis") |  | ||||||
| 	} |  | ||||||
| 	apiurl := "https://api.steemit.com" |  | ||||||
| 	if viper.Get("apiurl") != nil { |  | ||||||
| 		apiurl = viper.GetString("apiurl") |  | ||||||
| 	} |  | ||||||
| 	app := NewApp(&appconfig{ | 	app := NewApp(&appconfig{ | ||||||
| 		logLevel:              logLevel, | 		logLevel:       log.DebugLevel, | ||||||
| 		apiUrl:                apiurl, | 		apiUrl:         steemAPIURL, | ||||||
| 		redisUrl:              redis, | 		redisUrl:       redisUrl, | ||||||
| 		desiredFetcherThreads: 30, | 		fetcherThreads: 40, | ||||||
| 	}) | 	}) | ||||||
| 	app.main() | 	app.main() | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | */ | ||||||
|  | |||||||
							
								
								
									
										16
									
								
								steemapi.go
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								steemapi.go
									
									
									
									
									
								
							| @ -9,15 +9,10 @@ type SteemAPI struct { | |||||||
| 	rpc *JSONRPC | 	rpc *JSONRPC | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type SteemAPIShape interface { |  | ||||||
| 	GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) |  | ||||||
| 	GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse, error) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| var EmptyParams = []string{} | var EmptyParams = []string{} | ||||||
| var EmptyParamsRaw, _ = json.Marshal(EmptyParams) | var EmptyParamsRaw, _ = json.Marshal(EmptyParams) | ||||||
| 
 | 
 | ||||||
| func NewSteemAPI(url string, options ...func(s SteemAPIShape)) *SteemAPI { | func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI { | ||||||
| 
 | 
 | ||||||
| 	rpc := NewJSONRPC(url, func(x *JSONRPC) {}) | 	rpc := NewJSONRPC(url, func(x *JSONRPC) {}) | ||||||
| 
 | 
 | ||||||
| @ -34,7 +29,7 @@ func NewSteemAPI(url string, options ...func(s SteemAPIShape)) *SteemAPI { | |||||||
| 
 | 
 | ||||||
| func (self *SteemAPI) GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) { | func (self *SteemAPI) GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) { | ||||||
| 	var resp DynamicGlobalProperties | 	var resp DynamicGlobalProperties | ||||||
| 	raw, err := self.rpc.Call("condenser_api.get_dynamic_global_properties", EmptyParamsRaw) | 	raw, err := self.rpc.Call("get_dynamic_global_properties", EmptyParamsRaw) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| @ -51,12 +46,9 @@ func (self *SteemAPI) GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse | |||||||
| 	// api.steemit.com.
 | 	// api.steemit.com.
 | ||||||
| 	realOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: false} | 	realOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: false} | ||||||
| 	rop, err := realOpsParams.MarshalJSON() | 	rop, err := realOpsParams.MarshalJSON() | ||||||
| 	if err != nil { | 	realOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop) | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	rawOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop) |  | ||||||
| 	var result []OperationObject | 	var result []OperationObject | ||||||
| 	err = json.Unmarshal(rawOpsResponse, &result) | 	err = json.Unmarshal(realOpsResponse, &result) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user