Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
Merge pull request #48 from 8thlight/refactor-to-listener
Browse files Browse the repository at this point in the history
Refactor to use listener
  • Loading branch information
ericmeyer authored Nov 2, 2017
2 parents 323187c + 97020f1 commit 0757782
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 128 deletions.
20 changes: 10 additions & 10 deletions Gododir/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ func parseIpcPath(context *do.Context) string {
}

func startBlockchainListener(config cfg.Config, ipcPath string) {
port := config.Database.Port
host := config.Database.Hostname
databaseName := config.Database.Name

var blockchain core.Blockchain = core.NewGethBlockchain(ipcPath)
blockchain.RegisterObserver(core.BlockchainLoggingObserver{})
pgConfig := fmt.Sprintf("host=%s port=%d dbname=%s sslmode=disable", host, port, databaseName)
db, err := sqlx.Connect("postgres", pgConfig)
blockchain := core.NewGethBlockchain(ipcPath)
loggingObserver := core.BlockchainLoggingObserver{}
connectString := cfg.DbConnectionString(cfg.Public().Database)
db, err := sqlx.Connect("postgres", connectString)
if err != nil {
log.Fatalf("Error connecting to DB: %v\n", err)
}
blockchain.RegisterObserver(core.BlockchainDBObserver{Db: db})
blockchain.SubscribeToEvents()
dbObserver := (core.BlockchainDBObserver{Db: db})
listener := core.NewBlockchainListener(blockchain, []core.BlockchainObserver{
loggingObserver,
dbObserver,
})
listener.Start()
}

func tasks(p *do.Project) {
Expand Down
4 changes: 2 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package core

type Blockchain interface {
RegisterObserver(observer BlockchainObserver)
SubscribeToEvents()
SubscribeToBlocks(blocks chan Block)
StartListening()
}
31 changes: 31 additions & 0 deletions core/blockchain_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package core

type BlockchainListener struct {
inputBlocks chan Block
blockchain Blockchain
observers []BlockchainObserver
}

func NewBlockchainListener(blockchain Blockchain, observers []BlockchainObserver) BlockchainListener {
inputBlocks := make(chan Block, 10)
blockchain.SubscribeToBlocks(inputBlocks)
listener := BlockchainListener{
inputBlocks: inputBlocks,
blockchain: blockchain,
observers: observers,
}
return listener
}

func (listener BlockchainListener) Start() {
go listener.blockchain.StartListening()
for block := range listener.inputBlocks {
listener.notifyObservers(block)
}
}

func (listener BlockchainListener) notifyObservers(block Block) {
for _, observer := range listener.observers {
observer.NotifyBlockAdded(block)
}
}
56 changes: 56 additions & 0 deletions core/blockchain_listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package core_test

import (
"github.com/8thlight/vulcanizedb/core"
"github.com/8thlight/vulcanizedb/fakes"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("Blockchain listeners", func() {

It("starts with no blocks", func(done Done) {
observer := fakes.NewFakeBlockchainObserverTwo()
blockchain := &fakes.Blockchain{}

core.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})

Expect(len(observer.CurrentBlocks)).To(Equal(0))
close(done)
}, 1)

It("sees when one block was added", func(done Done) {
observer := fakes.NewFakeBlockchainObserverTwo()
blockchain := &fakes.Blockchain{}
listener := core.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})
go listener.Start()

go blockchain.AddBlock(core.Block{Number: 123})

wasObserverNotified := <-observer.WasNotified
Expect(wasObserverNotified).To(BeTrue())
Expect(len(observer.CurrentBlocks)).To(Equal(1))
addedBlock := observer.CurrentBlocks[0]
Expect(addedBlock.Number).To(Equal(int64(123)))
close(done)
}, 1)

It("sees a second block", func(done Done) {
observer := fakes.NewFakeBlockchainObserverTwo()
blockchain := &fakes.Blockchain{}
listener := core.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})
go listener.Start()

go blockchain.AddBlock(core.Block{Number: 123})
<-observer.WasNotified
go blockchain.AddBlock(core.Block{Number: 456})
wasObserverNotified := <-observer.WasNotified

Expect(wasObserverNotified).To(BeTrue())
Expect(len(observer.CurrentBlocks)).To(Equal(2))
addedBlock := observer.CurrentBlocks[1]
Expect(addedBlock.Number).To(Equal(int64(456)))
close(done)
}, 1)

})
51 changes: 0 additions & 51 deletions core/fake_blockchain_test.go

This file was deleted.

35 changes: 14 additions & 21 deletions core/geth_blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@ package core

import (
"fmt"
"reflect"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"golang.org/x/net/context"
)

type GethBlockchain struct {
client *ethclient.Client
observers []BlockchainObserver
subscription ethereum.Subscription
client *ethclient.Client
readGethHeaders chan *types.Header
outputBlocks chan Block
}

func NewGethBlockchain(ipcPath string) *GethBlockchain {
Expand All @@ -24,25 +22,20 @@ func NewGethBlockchain(ipcPath string) *GethBlockchain {
return &blockchain
}

func (blockchain GethBlockchain) notifyObservers(gethBlock *types.Block) {
block := GethBlockToCoreBlock(gethBlock)
for _, observer := range blockchain.observers {
observer.NotifyBlockAdded(block)
}
}

func (blockchain *GethBlockchain) RegisterObserver(observer BlockchainObserver) {
fmt.Printf("Registering observer: %v\n", reflect.TypeOf(observer))
blockchain.observers = append(blockchain.observers, observer)
func (blockchain *GethBlockchain) SubscribeToBlocks(blocks chan Block) {
blockchain.outputBlocks = blocks
fmt.Println("SubscribeToBlocks")
inputHeaders := make(chan *types.Header, 10)
myContext := context.Background()
blockchain.readGethHeaders = inputHeaders
blockchain.client.SubscribeNewHead(myContext, inputHeaders)
}

func (blockchain *GethBlockchain) SubscribeToEvents() {
headers := make(chan *types.Header, 10)
func (blockchain *GethBlockchain) StartListening() {
myContext := context.Background()
sub, _ := blockchain.client.SubscribeNewHead(myContext, headers)
blockchain.subscription = sub
for header := range headers {
for header := range blockchain.readGethHeaders {
gethBlock, _ := blockchain.client.BlockByNumber(myContext, header.Number)
blockchain.notifyObservers(gethBlock)
block := GethBlockToCoreBlock(gethBlock)
blockchain.outputBlocks <- block
}
}
18 changes: 7 additions & 11 deletions fakes/blockchain.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
package fakes

import (
"github.com/8thlight/vulcanizedb/core"
)
import "github.com/8thlight/vulcanizedb/core"

type Blockchain struct {
observers []core.BlockchainObserver
outputBlocks chan core.Block
}

func (blockchain *Blockchain) RegisterObserver(observer core.BlockchainObserver) {
blockchain.observers = append(blockchain.observers, observer)
func (blockchain *Blockchain) SubscribeToBlocks(outputBlocks chan core.Block) {
blockchain.outputBlocks = outputBlocks
}

func (blockchain *Blockchain) AddBlock(block core.Block) {
for _, observer := range blockchain.observers {
observer.NotifyBlockAdded(block)
}
func (blockchain Blockchain) AddBlock(block core.Block) {
blockchain.outputBlocks <- block
}

func (_ *Blockchain) SubscribeToEvents() {}
func (*Blockchain) StartListening() {}
24 changes: 12 additions & 12 deletions fakes/blockchain_observer.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package fakes

import (
"github.com/8thlight/vulcanizedb/core"
)
import "github.com/8thlight/vulcanizedb/core"

type BlockchainObserver struct {
wasToldBlockAdded bool
blocks []core.Block
CurrentBlocks []core.Block
WasNotified chan bool
}

func (blockchainObserver *BlockchainObserver) WasToldBlockAdded() bool {
return blockchainObserver.wasToldBlockAdded
func (observer *BlockchainObserver) LastBlock() core.Block {
return observer.CurrentBlocks[len(observer.CurrentBlocks)-1]
}

func (blockchainObserver *BlockchainObserver) NotifyBlockAdded(block core.Block) {
blockchainObserver.blocks = append(blockchainObserver.blocks, block)
blockchainObserver.wasToldBlockAdded = true
func NewFakeBlockchainObserverTwo() *BlockchainObserver {
return &BlockchainObserver{
WasNotified: make(chan bool),
}
}

func (observer *BlockchainObserver) LastAddedBlock() core.Block {
return observer.blocks[len(observer.blocks)-1]
func (observer *BlockchainObserver) NotifyBlockAdded(block core.Block) {
observer.CurrentBlocks = append(observer.CurrentBlocks, block)
observer.WasNotified <- true
}
34 changes: 13 additions & 21 deletions integration_test/geth_blockchain_test.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,40 @@
package integration_test

import (
"fmt"
"path"
"path/filepath"
"runtime"

"github.com/8thlight/vulcanizedb/core"
"github.com/8thlight/vulcanizedb/fakes"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var (
_, filename, _, _ = runtime.Caller(0)
basepath = filepath.Dir(filename)
)

func RunTimePath() string {
return path.Join(path.Dir(filename), "../")
}

type ObserverWithChannel struct {
blocks chan core.Block
}

func (observer *ObserverWithChannel) NotifyBlockAdded(block core.Block) {
fmt.Println("Block: ", block.Number)
observer.blocks <- block
}

var _ = Describe("Reading from the Geth blockchain", func() {

It("reads two blocks with incrementing numbers", func(done Done) {
addedBlock := make(chan core.Block, 10)
observer := &ObserverWithChannel{addedBlock}
It("reads two block with listener", func(done Done) {
observer := fakes.NewFakeBlockchainObserverTwo()
blockchain := core.NewGethBlockchain(RunTimePath() + "/test_data_dir/geth.ipc")
observers := []core.BlockchainObserver{observer}
listener := core.NewBlockchainListener(blockchain, observers)
go listener.Start()

var blockchain core.Blockchain = core.NewGethBlockchain(RunTimePath() + "/test_data_dir/geth.ipc")
blockchain.RegisterObserver(observer)
<-observer.WasNotified
firstBlock := observer.LastBlock()
Expect(firstBlock).NotTo(BeNil())

go blockchain.SubscribeToEvents()
<-observer.WasNotified
secondBlock := observer.LastBlock()
Expect(secondBlock).NotTo(BeNil())

firstBlock := <-addedBlock
Expect(firstBlock).ShouldNot(BeNil())
secondBlock := <-addedBlock
Expect(firstBlock.Number + 1).Should(Equal(secondBlock.Number))

close(done)
Expand Down

0 comments on commit 0757782

Please sign in to comment.