Compare commits
	
		
			7 Commits
		
	
	
		
			32874cca63
			...
			76d26a6f1e
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 76d26a6f1e | |||
| 3d6bf1e08f | |||
| 8ff07a4a0b | |||
| 43d43d3748 | |||
| ff9b67f543 | |||
| 25ccf2bc9e | |||
| c43df78d55 | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @ -0,0 +1 @@ | |||||||
|  | steem-block-db | ||||||
							
								
								
									
										30
									
								
								.gitlab-ci.yml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										30
									
								
								.gitlab-ci.yml
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,30 @@ | |||||||
|  | image: golang:1.11 | ||||||
|  | 
 | ||||||
|  | cache: | ||||||
|  |   paths: | ||||||
|  |     - /apt-cache | ||||||
|  |     - /go/src/github.com | ||||||
|  |     - /go/src/golang.org | ||||||
|  |     - /go/src/google.golang.org | ||||||
|  |     - /go/src/gopkg.in | ||||||
|  | 
 | ||||||
|  | stages: | ||||||
|  |   - test | ||||||
|  |   - build | ||||||
|  | 
 | ||||||
|  | before_script: | ||||||
|  |   - mkdir /go/src/steem-block-db | ||||||
|  |   - cp $CI_PROJECT_DIR/*.go /go/src/steem-block-db | ||||||
|  |   - cd /go/src/steem-block-db | ||||||
|  |   - GOPATH=/go go get | ||||||
|  | 
 | ||||||
|  | #unit_tests: | ||||||
|  | #  stage: test | ||||||
|  | #  script: | ||||||
|  | #    - make test | ||||||
|  | 
 | ||||||
|  | build: | ||||||
|  |   stage: build | ||||||
|  |   script: | ||||||
|  |     - cd /go/src/steem-block-db | ||||||
|  |     - GOPATH=/go go build | ||||||
							
								
								
									
										19
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										19
									
								
								Makefile
									
									
									
									
									
								
							| @ -1,4 +1,19 @@ | |||||||
| default: run | GOFILES := $(shell find . -type f -name '*.go' -not -name '*_test.go') | ||||||
|  | 
 | ||||||
|  | default: test | ||||||
|  | 
 | ||||||
|  | .PHONY: run build test | ||||||
| 
 | 
 | ||||||
| run: | run: | ||||||
| 	go run *.go | 	go run $(GOFILES) | ||||||
|  | 
 | ||||||
|  | build: steem-block-db | ||||||
|  | 
 | ||||||
|  | steem-block-db: *.go | ||||||
|  | 	go build | ||||||
|  | 
 | ||||||
|  | test: | ||||||
|  | 	go test | ||||||
|  | 
 | ||||||
|  | clean: | ||||||
|  | 	rm steem-block-db | ||||||
|  | |||||||
							
								
								
									
										153
									
								
								app.go
									
									
									
									
									
								
							
							
						
						
									
										153
									
								
								app.go
									
									
									
									
									
								
							| @ -1,14 +1,15 @@ | |||||||
| package main | package main | ||||||
| 
 | 
 | ||||||
| import "sync" | import "sync" | ||||||
|  | import "time" | ||||||
| import log "github.com/sirupsen/logrus" | import log "github.com/sirupsen/logrus" | ||||||
| import "encoding/json" | 
 | ||||||
|  | //import "encoding/json"
 | ||||||
| 
 | 
 | ||||||
| type App struct { | type App struct { | ||||||
| 	datastore                 SteemDataStorer | 	api    SteemAPIShape | ||||||
| 	api                       *SteemAPI | 	config *appconfig | ||||||
| 	currentNetworkBlockHeight BlockNumber | 	db     SteemDataStorer | ||||||
| 	currentLocalBlockHeight   BlockNumber |  | ||||||
| 	lock   *sync.Mutex | 	lock   *sync.Mutex | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -16,6 +17,7 @@ type appconfig struct { | |||||||
| 	logLevel              log.Level | 	logLevel              log.Level | ||||||
| 	apiUrl                string | 	apiUrl                string | ||||||
| 	redisUrl              string | 	redisUrl              string | ||||||
|  | 	desiredFetcherThreads uint | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewApp(config *appconfig) *App { | func NewApp(config *appconfig) *App { | ||||||
| @ -26,107 +28,80 @@ func NewApp(config *appconfig) *App { | |||||||
| 
 | 
 | ||||||
| func (self *App) init(config *appconfig) { | func (self *App) init(config *appconfig) { | ||||||
| 	log.SetLevel(config.logLevel) | 	log.SetLevel(config.logLevel) | ||||||
|  | 	log.SetFormatter(&log.TextFormatter{ | ||||||
|  | 		FullTimestamp: true, | ||||||
|  | 	}) | ||||||
|  | 	log.SetFormatter(&log.JSONFormatter{}) | ||||||
|  | 	//log.SetReportCaller(true)
 | ||||||
| 	self.api = NewSteemAPI(config.apiUrl) | 	self.api = NewSteemAPI(config.apiUrl) | ||||||
| 	self.datastore = NewSteemDataStore(config.redisUrl) | 	self.db = NewSteemDataStore(config.redisUrl) | ||||||
| 	self.lock = &sync.Mutex{} | 	self.lock = &sync.Mutex{} | ||||||
| } | 	self.config = config | ||||||
| 
 |  | ||||||
| func (self *App) updateCurrentBlockHeight() { |  | ||||||
| 	h := self.fetchCurrentBlockHeight() |  | ||||||
| 	if h > self.currentNetworkBlockHeight { |  | ||||||
| 		self.lock.Lock() |  | ||||||
| 		defer self.lock.Unlock() |  | ||||||
| 		self.currentNetworkBlockHeight = h |  | ||||||
| 		log.Infof("current block height is now %d", self.currentNetworkBlockHeight) |  | ||||||
| 	} |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *App) main() { | func (self *App) main() { | ||||||
| 	log.Infof("steem block data fetcher starting up...") | 	log.Infof("steem block data fetcher starting up...") | ||||||
| 	//self.mainloop()
 | 	self.mainloop() | ||||||
| } |  | ||||||
| 
 |  | ||||||
| /* |  | ||||||
| func (self *App) numFetchers() uint { |  | ||||||
| 	self.lock.Lock() |  | ||||||
| 	defer self.lock.Unlock() |  | ||||||
| 	return uint(len(*self.fetchingBlocks)) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (self *App) spawnNewFetcher(blockNum BlockNumber) { |  | ||||||
| 	log.Debugf("spawning fetcher for block %d", blockNum) |  | ||||||
| 	go func() { |  | ||||||
| 		// this is so hacky, make a queue like a grownup would you
 |  | ||||||
| 		time.Sleep(100 * time.Millisecond) |  | ||||||
| 		if self.datastore.HaveOpsForBlock(blockNum) { |  | ||||||
| 			log.Infof("already have ops for block %d, not re-fetching", blockNum) |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
| 		//self.incrFetchers()
 |  | ||||||
| 		self.storeBlockOps(blockNum, self.fetchBlockOps(blockNum)) |  | ||||||
| 		//self.decrFetchers()
 |  | ||||||
| 
 |  | ||||||
| 	}() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (self *App) storeBlockOps(blockNum BlockNumber, blockOps *[]byte) { |  | ||||||
| 	self.datastore.StoreBlockOps(blockNum, blockOps) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // note that parallelFetchAndStoreBlocks does not respect the
 |  | ||||||
| // limitation on number of desired fetchers, that is for the caller
 |  | ||||||
| func (self *App) parallelFetchAndStoreBlocks(start BlockNumber, end BlockNumber) { |  | ||||||
| 	var diff = uint64(end) - uint64(start) |  | ||||||
| 	var i uint64 |  | ||||||
| 	for i = 0; i < diff; i++ { |  | ||||||
| 		self.spawnNewFetcher(BlockNumber(uint64(start) + i)) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (self *App) populateFetchers() { |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *App) mainloop() { | func (self *App) mainloop() { | ||||||
| 
 | 	log.Infof("using %d fetching threads", self.config.desiredFetcherThreads) | ||||||
| 	log.Infof("using %d fetching threads", self.desiredFetchingThreads) | 	batchSize := uint(3000) | ||||||
| 
 | 	//batchSize := uint(10)
 | ||||||
| 	self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() | 	var start BlockNumber | ||||||
| 
 | 	var end BlockNumber | ||||||
| 	for { | 	for { | ||||||
| 		self.spawnMoreFetchers() | 		self.db.SetCurrentNetworkBlockHeight(self.fetchCurrentBlockHeight()) | ||||||
| 		self.currentNetworkBlockHeight = self.fetchCurrentBlockHeight() | 		if self.db.CurrentNetworkBlockHeight() == self.db.CurrentLocalBlockHeight() { | ||||||
| 		time.Sleep(1000 * time.Millisecond) | 			//we are synced
 | ||||||
|  | 			time.Sleep(1 * time.Second) | ||||||
|  | 			continue | ||||||
| 		} | 		} | ||||||
| } | 		log.Infof("current network block height = %d", self.db.CurrentNetworkBlockHeight()) | ||||||
|  | 		log.Infof("current local block height = %d", self.db.CurrentLocalBlockHeight()) | ||||||
|  | 		// we are not synced
 | ||||||
| 
 | 
 | ||||||
| func (self *App) spawnMoreFetchers() { | 		// how far behind are we?
 | ||||||
| } | 		countMustFetch := uint(self.db.CurrentNetworkBlockHeight() - self.db.CurrentLocalBlockHeight()) | ||||||
| 
 | 		log.Infof("we are %d blocks behind", countMustFetch) | ||||||
| /* | 		start = self.db.CurrentLocalBlockHeight() + 1 | ||||||
| 	self.updateCurrentBlockHeight() | 		log.Infof("beginning fetch with start block %d", start) | ||||||
| 	log.Infof("current number of active fetchers: %d", self.numFetchers()) | 		if countMustFetch <= batchSize { | ||||||
| 	time.Sleep(1500 * time.Millisecond) | 			end = self.db.CurrentNetworkBlockHeight() | ||||||
| 	self.datastore.SetCurrentBlockHeight() | 			log.Infof("fetch is within our batch size (%d), end block for fetch batch is %d", batchSize, end) | ||||||
| 	localHeight := self.datastore.CurrentBlockHeight() |  | ||||||
| 	log.Infof("our highest fetched block height is %d", localHeight) |  | ||||||
| 	if localHeight < self.currentNetworkBlockHeight { |  | ||||||
| 		// we need to fetch some blocks from the network
 |  | ||||||
| 		avail := self.desiredFetchingThreads - self.numFetchers() |  | ||||||
| 		diff := uint64(self.currentNetworkBlockHeight) - uint64(localHeight) |  | ||||||
| 		log.Infof("we need to fetch %d blocks", diff) |  | ||||||
| 		if uint64(diff) > uint64(avail) { |  | ||||||
| 			self.parallelFetchAndStoreBlocks(BlockNumber(uint64(localHeight)+1), BlockNumber(uint64(localHeight)+1+uint64(avail))) |  | ||||||
| 		} else { | 		} else { | ||||||
| 			// just spawn fetchers for the blocks we don't have
 | 			end = BlockNumber(uint(start) + uint(batchSize) - uint(1)) | ||||||
| 			// spawning will update the number of running fetchers
 | 			log.Infof("fetch is too large for batch size (%d), end block for fetch batch is %d", batchSize, end) | ||||||
| 			self.parallelFetchAndStoreBlocks(localHeight+1, self.currentNetworkBlockHeight) |  | ||||||
| 		} | 		} | ||||||
| 	} |  | ||||||
| 	//needFetchers := self.desiredFetchingThreads - self.numFetchers()
 |  | ||||||
| 
 | 
 | ||||||
| */ | 		bf := NewBlockFetcher(&BlockFetcherConfig{ | ||||||
|  | 			api:                   self.api.(*SteemAPI), | ||||||
|  | 			desiredFetcherThreads: self.config.desiredFetcherThreads, | ||||||
|  | 			startBlock:            start, | ||||||
|  | 			endBlock:              end, | ||||||
|  | 		}) | ||||||
|  | 
 | ||||||
|  | 		blocks := bf.fetch() | ||||||
|  | 		log.Infof("blockfetcher has returned") | ||||||
|  | 		self.pushBlocks(blocks) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (self *App) pushBlocks(newBlocks *[]FetchedBlock) { | ||||||
|  | 	counter := 0 | ||||||
|  | 	for _, newBlock := range *newBlocks { | ||||||
|  | 		counter += 1 | ||||||
|  | 		err := self.db.StoreBlockOps(newBlock.blockNumber, newBlock.blockData) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.Panic(err) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	log.Infof("pushed %d new blocks to db", counter) | ||||||
|  | 	self.db.UpdateCurrentLocalBlockHeight() | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| func (self *App) fetchCurrentBlockHeight() BlockNumber { | func (self *App) fetchCurrentBlockHeight() BlockNumber { | ||||||
|  | 
 | ||||||
| 	r, err := self.api.GetDynamicGlobalProperties() | 	r, err := self.api.GetDynamicGlobalProperties() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Panicf("can't fetch global properties, bailing. err: %s", err) | 		log.Panicf("can't fetch global properties, bailing. err: %s", err) | ||||||
|  | |||||||
| @ -1,6 +1,8 @@ | |||||||
| package main | package main | ||||||
| 
 | 
 | ||||||
|  | import "encoding/json" | ||||||
| import "sync" | import "sync" | ||||||
|  | import "time" | ||||||
| import log "github.com/sirupsen/logrus" | import log "github.com/sirupsen/logrus" | ||||||
| 
 | 
 | ||||||
| type FetchedBlock struct { | type FetchedBlock struct { | ||||||
| @ -14,7 +16,7 @@ type BlockFetcher struct { | |||||||
| 	desiredFetcherThreads uint | 	desiredFetcherThreads uint | ||||||
| 	wantBlocks            map[BlockNumber]bool | 	wantBlocks            map[BlockNumber]bool | ||||||
| 	fetchingBlocks        map[BlockNumber]bool | 	fetchingBlocks        map[BlockNumber]bool | ||||||
| 	fetchedBlocks         *[]FetchedBlock | 	fetchedBlocks         map[BlockNumber]*FetchedBlock | ||||||
| 	lock                  *sync.Mutex | 	lock                  *sync.Mutex | ||||||
| 	workChannel           chan BlockNumber | 	workChannel           chan BlockNumber | ||||||
| 	resultsChannel        chan *FetchedBlock | 	resultsChannel        chan *FetchedBlock | ||||||
| @ -43,47 +45,52 @@ func (self *BlockFetcher) init(config *BlockFetcherConfig) { | |||||||
| 	self.lock = &sync.Mutex{} | 	self.lock = &sync.Mutex{} | ||||||
| 	self.api = config.api | 	self.api = config.api | ||||||
| 	self.desiredFetcherThreads = config.desiredFetcherThreads | 	self.desiredFetcherThreads = config.desiredFetcherThreads | ||||||
| 
 |  | ||||||
| 	self.workChannel = make(chan BlockNumber) | 	self.workChannel = make(chan BlockNumber) | ||||||
| 	self.resultsChannel = make(chan *FetchedBlock) | 	self.resultsChannel = make(chan *FetchedBlock) | ||||||
|  | 	self.wantBlocks = make(map[BlockNumber]bool) | ||||||
|  | 	self.fetchedBlocks = make(map[BlockNumber]*FetchedBlock) | ||||||
|  | 
 | ||||||
| 	diff := int(uint(config.endBlock) - uint(config.startBlock)) | 	diff := int(uint(config.endBlock) - uint(config.startBlock)) | ||||||
| 	log.Debugf("diff is %d", diff) |  | ||||||
| 	for i := 0; i <= diff; i++ { | 	for i := 0; i <= diff; i++ { | ||||||
| 		self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true | 		self.wantBlocks[BlockNumber(uint(config.startBlock)+uint(i))] = true | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	log.Debugf("wantblocks[] is now %v", self.wantBlocks) | 	log.Debugf("wantblocks[] is now %v", self.wantBlocks) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) { | func (self *BlockFetcher) removeWantBlock(blockNum BlockNumber) { | ||||||
|  | 	self.lock.Lock() | ||||||
|  | 	defer self.lock.Unlock() | ||||||
| 	if self.wantBlocks[blockNum] == false { | 	if self.wantBlocks[blockNum] == false { | ||||||
| 		log.Panicf("shouldn't happen") | 		log.Panicf("shouldn't happen") | ||||||
| 	} | 	} | ||||||
| 	self.lock.Lock() |  | ||||||
| 	defer self.lock.Unlock() |  | ||||||
| 	delete(self.wantBlocks, blockNum) | 	delete(self.wantBlocks, blockNum) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) { | func (self *BlockFetcher) removeFetchingBlock(blockNum BlockNumber) { | ||||||
|  | 	self.lock.Lock() | ||||||
|  | 	defer self.lock.Unlock() | ||||||
| 	if self.fetchingBlocks[blockNum] == false { | 	if self.fetchingBlocks[blockNum] == false { | ||||||
| 		log.Panicf("shouldn't happen") | 		log.Panicf("shouldn't happen") | ||||||
| 	} | 	} | ||||||
| 	self.lock.Lock() |  | ||||||
| 	defer self.lock.Unlock() |  | ||||||
| 	delete(self.fetchingBlocks, blockNum) | 	delete(self.fetchingBlocks, blockNum) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) { | func (self *BlockFetcher) addFetchingBlock(blockNum BlockNumber) { | ||||||
|  | 	self.lock.Lock() | ||||||
|  | 	defer self.lock.Unlock() | ||||||
| 	if self.fetchingBlocks[blockNum] == true { | 	if self.fetchingBlocks[blockNum] == true { | ||||||
| 		log.Panicf("shouldn't happen") | 		log.Panicf("shouldn't happen") | ||||||
| 	} | 	} | ||||||
| 	self.lock.Lock() | 	if self.fetchingBlocks == nil { | ||||||
| 	defer self.lock.Unlock() | 		self.fetchingBlocks = make(map[BlockNumber]bool) | ||||||
|  | 	} | ||||||
| 	if self.fetchingBlocks[blockNum] == false { | 	if self.fetchingBlocks[blockNum] == false { | ||||||
| 		self.fetchingBlocks[blockNum] = true | 		self.fetchingBlocks[blockNum] = true | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *BlockFetcher) fetcher(index int) { | func (self *BlockFetcher) fetcher(index uint) { | ||||||
| 	log.Debugf("fetcher thread %d starting", index) | 	log.Debugf("fetcher thread %d starting", index) | ||||||
| 
 | 
 | ||||||
| WorkLoop: | WorkLoop: | ||||||
| @ -100,50 +107,88 @@ WorkLoop: | |||||||
| 			} else { | 			} else { | ||||||
| 				// wait a sec and try again
 | 				// wait a sec and try again
 | ||||||
| 				time.Sleep(1 * time.Second) | 				time.Sleep(1 * time.Second) | ||||||
|  | 				log.Infof("error fetching block %d, retry %d (%v)", blockNum, i+1, r.error) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	log.Debugf("fetcher thread %d ending", index) | 	log.Debugf("fetcher thread %d ending", index) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (self *BlockFetcher) sendWork(b BlockNumber) { | ||||||
|  | 	go func() { | ||||||
|  | 		// yay cheap goroutines, let them block on the unbuffered channel
 | ||||||
|  | 		log.Debugf("waiting to send blockNum %d into the work channel", b) | ||||||
|  | 		self.workChannel <- b | ||||||
|  | 		log.Debugf("sent blockNum %d into the work channel", b) | ||||||
|  | 	}() | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (self *BlockFetcher) fetch() *[]FetchedBlock { | func (self *BlockFetcher) fetch() *[]FetchedBlock { | ||||||
| 	for i := 1; i < self.desiredFetcherThreads+1; i++ { | 	log.Debugf("blockfetcher beginning fetch") | ||||||
|  | 	startTime := time.Now() | ||||||
|  | 	for i := uint(1); i < self.desiredFetcherThreads+uint(1); i++ { | ||||||
| 		go self.fetcher(i) | 		go self.fetcher(i) | ||||||
| 	} | 	} | ||||||
| 	for blockNum, _ := range self.wantBlocks { | 
 | ||||||
| 		// yay cheap goroutines, let them block on the unbuffered channel
 | 	self.lock.Lock() | ||||||
| 		go func() { | 	for blockNum := range self.wantBlocks { | ||||||
| 			log.Debugf("waiting to send blockNum %d into the work channel", blockNum) | 		self.sendWork(blockNum) | ||||||
| 			self.workChannel <- blockNum |  | ||||||
| 			log.Debugf("sent blockNum %d into the work channel", blockNum) |  | ||||||
| 		}() |  | ||||||
| 	} | 	} | ||||||
|  | 	self.lock.Unlock() | ||||||
| 
 | 
 | ||||||
| 	// now we have to start reading from the unbuffered resultsChannel
 | 	// now we have to start reading from the unbuffered resultsChannel
 | ||||||
| 	// otherwise the workers will block when returning results
 | 	// otherwise the workers will block when returning results
 | ||||||
|  | 	for { | ||||||
| 		select { | 		select { | ||||||
| 		case result := <-self.resultsChannel: | 		case result := <-self.resultsChannel: | ||||||
| 			self.receiveResult(result) | 			self.receiveResult(result) | ||||||
| 		default: | 		default: | ||||||
|  | 
 | ||||||
|  | 			if startTime.Add(30 * time.Second).Before(time.Now()) { | ||||||
|  | 				// fetch took too long, return anyway
 | ||||||
|  | 				log.Infof("30s fetcher batch timeout reached, but not done fetching") | ||||||
|  | 				// return what we have
 | ||||||
|  | 				var final []FetchedBlock | ||||||
|  | 				self.lock.Lock() | ||||||
|  | 				for _, value := range self.fetchedBlocks { | ||||||
|  | 					final = append(final, *value) | ||||||
|  | 				} | ||||||
|  | 				self.lock.Unlock() | ||||||
|  | 				self = nil //this BlockFetcher is now finished.
 | ||||||
|  | 				return &final | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
| 			if self.Done() == true { | 			if self.Done() == true { | ||||||
|  | 				log.Infof("blockfetcher %+v considers itself Done()", self) | ||||||
| 				// if we get here, it's because workList is now empty and there
 | 				// if we get here, it's because workList is now empty and there
 | ||||||
| 				// are no more results in the results channel.
 | 				// are no more results in the results channel.
 | ||||||
| 				close(self.workChannel) // shut down the workers
 | 				close(self.workChannel) // shut down the workers
 | ||||||
| 			result := self.fetchedBlocks | 				var final []FetchedBlock | ||||||
| 			self = nil //this BlockFetcher is now finished.
 | 				self.lock.Lock() | ||||||
| 			return result | 				for _, value := range self.fetchedBlocks { | ||||||
|  | 					final = append(final, *value) | ||||||
| 				} | 				} | ||||||
|  | 				self.lock.Unlock() | ||||||
|  | 				self = nil //this BlockFetcher is now finished.
 | ||||||
|  | 				return &final | ||||||
|  | 			} | ||||||
|  | 			// in this case we are not done but got nothing from the result
 | ||||||
|  | 			// channel so just wait a little bit to get more results and
 | ||||||
|  | 			// check the channel again
 | ||||||
| 			time.Sleep(10 * time.Millisecond) | 			time.Sleep(10 * time.Millisecond) | ||||||
| 			//FIXME(sneak) we maybe need to handle a case here where wantBlocks never
 | 			//FIXME(sneak) we maybe need to handle a case here where wantBlocks never
 | ||||||
| 			//empties but workers need to be re-dispatched..
 | 			//empties but workers need to be re-dispatched..
 | ||||||
| 		} | 		} | ||||||
|  | 	} | ||||||
|  | 	log.Panicf("this shouldn't happen") | ||||||
|  | 	return nil //shouldn't happen, return should happen from above
 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *BlockFetcher) receiveResult(r *FetchedBlock) { | func (self *BlockFetcher) receiveResult(r *FetchedBlock) { | ||||||
| 	log.Debugf("got result for blocknum %d", r.blockNumber) | 	log.Debugf("got result for blocknum %d", r.blockNumber) | ||||||
| 	self.removeFetchingBlock(r.blockNumber) | 	self.removeFetchingBlock(r.blockNumber) | ||||||
| 	self.lock.Lock() | 	self.lock.Lock() | ||||||
| 	self.fetchedBlocks = append(self.fetchedBlocks, r) | 	self.fetchedBlocks[r.blockNumber] = r | ||||||
| 	self.lock.Unlock() | 	self.lock.Unlock() | ||||||
| 	self.removeWantBlock(r.blockNumber) | 	self.removeWantBlock(r.blockNumber) | ||||||
| } | } | ||||||
| @ -156,16 +201,16 @@ func (self *BlockFetcher) fetchBlockOpsFromNetwork(blockNum BlockNumber) *Fetche | |||||||
| 	} | 	} | ||||||
| 	r, err := self.api.GetOpsInBlock(blockNum) | 	r, err := self.api.GetOpsInBlock(blockNum) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		result.error = err | 		result.error = &err | ||||||
| 		return result | 		return result | ||||||
| 	} | 	} | ||||||
| 	bytes, err := json.Marshal(r) | 	bytes, err := json.Marshal(r) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		result.error = err | 		result.error = &err | ||||||
| 		return result | 		return result | ||||||
| 	} | 	} | ||||||
| 	count := len(*r) | 	count := len(*r) | ||||||
| 	log.Infof("got %d operations for block %d", count, blockNum) | 	log.Infof("got %d operations for block %d fetched from network", count, blockNum) | ||||||
| 	result.blockData = &bytes | 	result.blockData = &bytes | ||||||
| 	result.error = nil // make sure this is nil if it worked
 | 	result.error = nil // make sure this is nil if it worked
 | ||||||
| 	return result | 	return result | ||||||
|  | |||||||
							
								
								
									
										53
									
								
								blockfetcher_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										53
									
								
								blockfetcher_test.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,53 @@ | |||||||
|  | package main | ||||||
|  | 
 | ||||||
|  | import "testing" | ||||||
|  | import log "github.com/sirupsen/logrus" | ||||||
|  | 
 | ||||||
|  | /* | ||||||
|  | import "github.com/stretchr/testify/mock" | ||||||
|  | 
 | ||||||
|  | type MockedSteemAPI struct { | ||||||
|  | 	mock.Mock | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (m *MockedSteemAPI) DoSomething(number int) (bool, error) { | ||||||
|  | 	args := m.Called(number) | ||||||
|  | 	return args.Bool(0), args.Error(1) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | */ | ||||||
|  | 
 | ||||||
|  | func TestBlockfetcherInit(t *testing.T) { | ||||||
|  | 
 | ||||||
|  | 	log.SetLevel(log.DebugLevel) | ||||||
|  | 
 | ||||||
|  | 	bf := NewBlockFetcher(&BlockFetcherConfig{ | ||||||
|  | 		api:                   nil, | ||||||
|  | 		desiredFetcherThreads: 1, | ||||||
|  | 		startBlock:            10000, | ||||||
|  | 		endBlock:              10005, | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	if bf == nil { | ||||||
|  | 		t.Errorf("could not instantiate blockfetcher") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | //can't actually fetch yet until we mock the api
 | ||||||
|  | /* | ||||||
|  | func TestBlockfetcherFetch(t *testing.T) { | ||||||
|  | 
 | ||||||
|  | 	log.SetLevel(log.DebugLevel) | ||||||
|  | 
 | ||||||
|  | 	bf := NewBlockFetcher(&BlockFetcherConfig{ | ||||||
|  | 		api:                   nil, | ||||||
|  | 		desiredFetcherThreads: 1, | ||||||
|  | 		startBlock:            10000, | ||||||
|  | 		endBlock:              10005, | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	bf.fetch() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | */ | ||||||
							
								
								
									
										61
									
								
								db.go
									
									
									
									
									
								
							
							
						
						
									
										61
									
								
								db.go
									
									
									
									
									
								
							| @ -10,8 +10,11 @@ import "strconv" | |||||||
| const appPrefix = "sbf" | const appPrefix = "sbf" | ||||||
| 
 | 
 | ||||||
| type SteemDataStorer interface { | type SteemDataStorer interface { | ||||||
| 	SetCurrentBlockHeight(BlockNumber) error | 	SetCurrentNetworkBlockHeight(BlockNumber) error | ||||||
| 	CurrentBlockHeight() BlockNumber | 	SetCurrentLocalBlockHeight(BlockNumber) error | ||||||
|  | 	UpdateCurrentLocalBlockHeight() | ||||||
|  | 	CurrentLocalBlockHeight() BlockNumber | ||||||
|  | 	CurrentNetworkBlockHeight() BlockNumber | ||||||
| 	HaveOpsForBlock(BlockNumber) bool | 	HaveOpsForBlock(BlockNumber) bool | ||||||
| 	StoreBlockOps(BlockNumber, *[]byte) error | 	StoreBlockOps(BlockNumber, *[]byte) error | ||||||
| } | } | ||||||
| @ -24,11 +27,35 @@ type SteemDataStore struct { | |||||||
| func NewSteemDataStore(hostname string) SteemDataStorer { | func NewSteemDataStore(hostname string) SteemDataStorer { | ||||||
| 	self := new(SteemDataStore) | 	self := new(SteemDataStore) | ||||||
| 	self.kv = NewRedisKVStore(hostname) | 	self.kv = NewRedisKVStore(hostname) | ||||||
|  | 	self.init() | ||||||
| 	return self | 	return self | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *SteemDataStore) SetCurrentBlockHeight(blockNum BlockNumber) error { | func (self *SteemDataStore) init() { | ||||||
| 	keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) | 	self.UpdateCurrentLocalBlockHeight() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (self *SteemDataStore) UpdateCurrentLocalBlockHeight() { | ||||||
|  | 	cur := self.CurrentLocalBlockHeight() | ||||||
|  | 	next := self.FindHighestContiguousBlockInDb(cur) | ||||||
|  | 	if next != cur { | ||||||
|  | 		err := self.SetCurrentLocalBlockHeight(next) | ||||||
|  | 		log.Infof("current highest contig block in db is now %d", next) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.Panic(err) | ||||||
|  | 		} | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (self *SteemDataStore) SetCurrentLocalBlockHeight(blockNum BlockNumber) error { | ||||||
|  | 	keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix) | ||||||
|  | 	value := fmt.Sprintf("%d", blockNum) | ||||||
|  | 	return self.kv.Put(&keyname, &value) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (self *SteemDataStore) SetCurrentNetworkBlockHeight(blockNum BlockNumber) error { | ||||||
|  | 	keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix) | ||||||
| 	value := fmt.Sprintf("%d", blockNum) | 	value := fmt.Sprintf("%d", blockNum) | ||||||
| 	return self.kv.Put(&keyname, &value) | 	return self.kv.Put(&keyname, &value) | ||||||
| } | } | ||||||
| @ -42,7 +69,10 @@ func (self *SteemDataStore) FindHighestContiguousBlockInDb(from BlockNumber) Blo | |||||||
| 	for { | 	for { | ||||||
| 		try = BlockNumber(uint64(last) + 1) | 		try = BlockNumber(uint64(last) + 1) | ||||||
| 		keyname = fmt.Sprintf("%s.ops_in_block.%d", appPrefix, try) | 		keyname = fmt.Sprintf("%s.ops_in_block.%d", appPrefix, try) | ||||||
| 		exists, _ := self.kv.Exists(&keyname) | 		exists, err := self.kv.Exists(&keyname) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.Panic(err) | ||||||
|  | 		} | ||||||
| 		if exists == false { | 		if exists == false { | ||||||
| 			log.Debugf("cannot find block %d in db, highest found is %d", try, last) | 			log.Debugf("cannot find block %d in db, highest found is %d", try, last) | ||||||
| 			return last | 			return last | ||||||
| @ -64,14 +94,27 @@ func (self *SteemDataStore) HaveOpsForBlock(blockNum BlockNumber) bool { | |||||||
| 	return exists | 	return exists | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *SteemDataStore) CurrentBlockHeight() BlockNumber { | func (self *SteemDataStore) CurrentNetworkBlockHeight() BlockNumber { | ||||||
| 	keyname := fmt.Sprintf("%s.meta.CurrentBlockHeight", appPrefix) | 	keyname := fmt.Sprintf("%s.meta.CurrentNetworkBlockHeight", appPrefix) | ||||||
| 	val, err := self.kv.Get(&keyname) | 	val, err := self.kv.Get(&keyname) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		// assume this is key not found, initialize key to default
 | 		// assume this is key not found, initialize key to default
 | ||||||
| 		self.SetCurrentBlockHeight(0) | 		self.SetCurrentNetworkBlockHeight(0) | ||||||
| 		// retry
 | 		// retry
 | ||||||
| 		return self.CurrentBlockHeight() | 		return self.CurrentNetworkBlockHeight() | ||||||
|  | 	} | ||||||
|  | 	intval, err := strconv.ParseUint(*val, 10, 64) | ||||||
|  | 	return BlockNumber(intval) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (self *SteemDataStore) CurrentLocalBlockHeight() BlockNumber { | ||||||
|  | 	keyname := fmt.Sprintf("%s.meta.CurrentLocalBlockHeight", appPrefix) | ||||||
|  | 	val, err := self.kv.Get(&keyname) | ||||||
|  | 	if err != nil { | ||||||
|  | 		// assume this is key not found, initialize key to default
 | ||||||
|  | 		self.SetCurrentLocalBlockHeight(0) | ||||||
|  | 		// retry
 | ||||||
|  | 		return self.CurrentLocalBlockHeight() | ||||||
| 	} | 	} | ||||||
| 	intval, err := strconv.ParseUint(*val, 10, 64) | 	intval, err := strconv.ParseUint(*val, 10, 64) | ||||||
| 	return BlockNumber(intval) | 	return BlockNumber(intval) | ||||||
|  | |||||||
| @ -11,6 +11,7 @@ import ( | |||||||
| 	"io" | 	"io" | ||||||
| 	"io/ioutil" | 	"io/ioutil" | ||||||
| 	"net/http" | 	"net/http" | ||||||
|  | 	"time" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| type httpRPCClient interface { | type httpRPCClient interface { | ||||||
| @ -47,9 +48,14 @@ type JSONRPC struct { | |||||||
| 
 | 
 | ||||||
| // New create new rpc client with given url
 | // New create new rpc client with given url
 | ||||||
| func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC { | func NewJSONRPC(url string, options ...func(rpc *JSONRPC)) *JSONRPC { | ||||||
|  | 
 | ||||||
|  | 	netClient := &http.Client{ | ||||||
|  | 		Timeout: time.Second * 20, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	rpc := &JSONRPC{ | 	rpc := &JSONRPC{ | ||||||
| 		url:    url, | 		url:    url, | ||||||
| 		client: http.DefaultClient, | 		client: netClient, | ||||||
| 	} | 	} | ||||||
| 	for _, option := range options { | 	for _, option := range options { | ||||||
| 		option(rpc) | 		option(rpc) | ||||||
| @ -77,6 +83,7 @@ func (rpc *JSONRPC) Call(method string, params json.RawMessage) (json.RawMessage | |||||||
| 		defer response.Body.Close() | 		defer response.Body.Close() | ||||||
| 	} | 	} | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  | 		log.Infof("jsonrpc error: %v", err) | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -115,7 +115,7 @@ func (kv *BadgerKVStore) Put(key *string, value *string) error { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Fatal(err) | 		log.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	err = txn.Commit(nil) | 	err = txn.Commit() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Fatal(err) | 		log.Fatal(err) | ||||||
| 	} | 	} | ||||||
|  | |||||||
							
								
								
									
										52
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										52
									
								
								main.go
									
									
									
									
									
								
							| @ -1,36 +1,36 @@ | |||||||
| package main | package main | ||||||
| 
 | 
 | ||||||
| //import "github.com/spf13/viper"
 | import "github.com/spf13/viper" | ||||||
| //import "encoding/json"
 |  | ||||||
| import log "github.com/sirupsen/logrus" | import log "github.com/sirupsen/logrus" | ||||||
| 
 | 
 | ||||||
| const steemAPIURL = "https://api.steemit.com" | // STEEM_APIURL=https://api.steem.house ./steem-block-db
 | ||||||
| const redisUrl = "localhost:6379" |  | ||||||
| 
 |  | ||||||
| //const steemAPIURL = "http://10.100.202.175:8090"
 |  | ||||||
| //const steemAPIURL = "http://las2.local:8090"
 |  | ||||||
| 
 | 
 | ||||||
| func main() { | func main() { | ||||||
| 	log.SetLevel(log.DebugLevel) | 	viper.SetConfigName("steem") | ||||||
| 	var x *BlockFetcher | 	viper.AddConfigPath("/etc/steem") | ||||||
| 	x = NewBlockFetcher(&BlockFetcherConfig{ | 	viper.AddConfigPath("$HOME/.config/steem") | ||||||
| 		api:                   nil, | 	viper.SetEnvPrefix("steem") | ||||||
| 		desiredFetcherThreads: 40, | 	viper.BindEnv("debug") | ||||||
| 		startBlock:            10000, | 	viper.BindEnv("redis") | ||||||
| 		endBlock:              10005, | 	viper.BindEnv("apiurl") | ||||||
| 	}) | 	viper.ReadInConfig() // Find and read the config file if exists
 | ||||||
| 	_ = x | 	logLevel := log.InfoLevel | ||||||
| } | 	if viper.GetBool("debug") == true { | ||||||
| 
 | 		logLevel = log.DebugLevel | ||||||
| /* | 	} | ||||||
| func mainx() { | 	redis := "localhost:6379" | ||||||
|  | 	if viper.Get("redis") != nil { | ||||||
|  | 		redis = viper.GetString("redis") | ||||||
|  | 	} | ||||||
|  | 	apiurl := "https://api.steemit.com" | ||||||
|  | 	if viper.Get("apiurl") != nil { | ||||||
|  | 		apiurl = viper.GetString("apiurl") | ||||||
|  | 	} | ||||||
| 	app := NewApp(&appconfig{ | 	app := NewApp(&appconfig{ | ||||||
| 		logLevel:       log.DebugLevel, | 		logLevel:              logLevel, | ||||||
| 		apiUrl:         steemAPIURL, | 		apiUrl:                apiurl, | ||||||
| 		redisUrl:       redisUrl, | 		redisUrl:              redis, | ||||||
| 		fetcherThreads: 40, | 		desiredFetcherThreads: 30, | ||||||
| 	}) | 	}) | ||||||
| 	app.main() | 	app.main() | ||||||
| } | } | ||||||
| 
 |  | ||||||
| */ |  | ||||||
|  | |||||||
							
								
								
									
										16
									
								
								steemapi.go
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								steemapi.go
									
									
									
									
									
								
							| @ -9,10 +9,15 @@ type SteemAPI struct { | |||||||
| 	rpc *JSONRPC | 	rpc *JSONRPC | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | type SteemAPIShape interface { | ||||||
|  | 	GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) | ||||||
|  | 	GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse, error) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| var EmptyParams = []string{} | var EmptyParams = []string{} | ||||||
| var EmptyParamsRaw, _ = json.Marshal(EmptyParams) | var EmptyParamsRaw, _ = json.Marshal(EmptyParams) | ||||||
| 
 | 
 | ||||||
| func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI { | func NewSteemAPI(url string, options ...func(s SteemAPIShape)) *SteemAPI { | ||||||
| 
 | 
 | ||||||
| 	rpc := NewJSONRPC(url, func(x *JSONRPC) {}) | 	rpc := NewJSONRPC(url, func(x *JSONRPC) {}) | ||||||
| 
 | 
 | ||||||
| @ -29,7 +34,7 @@ func NewSteemAPI(url string, options ...func(s *SteemAPI)) *SteemAPI { | |||||||
| 
 | 
 | ||||||
| func (self *SteemAPI) GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) { | func (self *SteemAPI) GetDynamicGlobalProperties() (GetDynamicGlobalPropertiesResponse, error) { | ||||||
| 	var resp DynamicGlobalProperties | 	var resp DynamicGlobalProperties | ||||||
| 	raw, err := self.rpc.Call("get_dynamic_global_properties", EmptyParamsRaw) | 	raw, err := self.rpc.Call("condenser_api.get_dynamic_global_properties", EmptyParamsRaw) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| @ -46,9 +51,12 @@ func (self *SteemAPI) GetOpsInBlock(blockNum BlockNumber) (GetOpsInBlockResponse | |||||||
| 	// api.steemit.com.
 | 	// api.steemit.com.
 | ||||||
| 	realOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: false} | 	realOpsParams := &GetOpsInBlockRequestParams{BlockNum: blockNum, VirtualOps: false} | ||||||
| 	rop, err := realOpsParams.MarshalJSON() | 	rop, err := realOpsParams.MarshalJSON() | ||||||
| 	realOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop) | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	rawOpsResponse, err := self.rpc.Call("condenser_api.get_ops_in_block", rop) | ||||||
| 	var result []OperationObject | 	var result []OperationObject | ||||||
| 	err = json.Unmarshal(realOpsResponse, &result) | 	err = json.Unmarshal(rawOpsResponse, &result) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user