Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
Merge pull request #116 from vulcanize/storage-diffs-over-rpc
Browse files Browse the repository at this point in the history
Storage diffs over rpc
  • Loading branch information
elizabethengelman authored Oct 2, 2019
2 parents 20ce0ab + f0d2741 commit b4e16c4
Show file tree
Hide file tree
Showing 37 changed files with 1,127 additions and 417 deletions.
32 changes: 24 additions & 8 deletions cmd/composeAndExecute.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package cmd

import (
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/pkg/fs"
"os"
"plugin"
syn "sync"
Expand All @@ -25,9 +29,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fs"
p2 "github.com/vulcanize/vulcanizedb/pkg/plugin"
"github.com/vulcanize/vulcanizedb/pkg/plugin/helpers"
"github.com/vulcanize/vulcanizedb/utils"
Expand Down Expand Up @@ -179,12 +181,26 @@ func composeAndExecute() {
}

if len(ethStorageInitializers) > 0 {
tailer := fs.FileTailer{Path: storageDiffsPath}
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers)
wg.Add(1)
go watchEthStorage(&sw, &wg)
switch storageDiffsSource {
case "geth":
log.Debug("fetching storage diffs from geth pub sub")
rpcClient, _ := getClients()
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
payloadChan := make(chan statediff.Payload)
storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan)
sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers)
wg.Add(1)
go watchEthStorage(&sw, &wg)
default:
log.Debug("fetching storage diffs from csv")
tailer := fs.FileTailer{Path: storageDiffsPath}
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers)
wg.Add(1)
go watchEthStorage(&sw, &wg)
}
}

if len(ethContractInitializers) > 0 {
Expand Down
38 changes: 27 additions & 11 deletions cmd/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ package cmd

import (
"fmt"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
"github.com/vulcanize/vulcanizedb/pkg/fs"
"plugin"
syn "sync"
"time"
Expand All @@ -26,11 +30,9 @@ import (
"github.com/spf13/cobra"

"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
storageUtils "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
"github.com/vulcanize/vulcanizedb/pkg/fs"
"github.com/vulcanize/vulcanizedb/utils"
)

Expand Down Expand Up @@ -123,12 +125,26 @@ func execute() {
}

if len(ethStorageInitializers) > 0 {
tailer := fs.FileTailer{Path: storageDiffsPath}
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers)
wg.Add(1)
go watchEthStorage(&sw, &wg)
switch storageDiffsSource {
case "geth":
log.Debug("fetching storage diffs from geth pub sub")
rpcClient, _ := getClients()
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
payloadChan := make(chan statediff.Payload)
storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan)
sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers)
wg.Add(1)
go watchEthStorage(&sw, &wg)
default:
log.Debug("fetching storage diffs from csv")
tailer := fs.FileTailer{Path: storageDiffsPath}
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
sw := watcher.NewStorageWatcher(storageFetcher, &db)
sw.AddTransformers(ethStorageInitializers)
wg.Add(1)
go watchEthStorage(&sw, &wg)
}
}

if len(ethContractInitializers) > 0 {
Expand Down Expand Up @@ -166,16 +182,16 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
}
}

func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
func watchEthStorage(w watcher.IStorageWatcher, wg *syn.WaitGroup) {
defer wg.Done()
// Execute over the StorageTransformerInitializer set using the storage watcher
LogWithCommand.Info("executing storage transformers")
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for range ticker.C {
errs := make(chan error)
rows := make(chan storageUtils.StorageDiffRow)
w.Execute(rows, errs, queueRecheckInterval)
diffs := make(chan storageUtils.StorageDiff)
w.Execute(diffs, errs, queueRecheckInterval)
}
}

Expand Down
18 changes: 14 additions & 4 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
recheckHeadersArg bool
SubCommand string
LogWithCommand log.Entry
storageDiffsSource string
)

const (
Expand Down Expand Up @@ -80,6 +81,7 @@ func setViperConfigs() {
ipc = viper.GetString("client.ipcpath")
levelDbPath = viper.GetString("client.leveldbpath")
storageDiffsPath = viper.GetString("filesystem.storageDiffsPath")
storageDiffsSource = viper.GetString("storageDiffs.source")
databaseConfig = config.Database{
Name: viper.GetString("database.name"),
Hostname: viper.GetString("database.hostname"),
Expand Down Expand Up @@ -118,6 +120,7 @@ func init() {
rootCmd.PersistentFlags().String("client-ipcPath", "", "location of geth.ipc file")
rootCmd.PersistentFlags().String("client-levelDbPath", "", "location of levelDb chaindata")
rootCmd.PersistentFlags().String("filesystem-storageDiffsPath", "", "location of storage diffs csv file")
rootCmd.PersistentFlags().String("storageDiffs-source", "csv", "where to get the state diffs: csv or geth")
rootCmd.PersistentFlags().String("exporter-name", "exporter", "name of exporter plugin")
rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "Log level (trace, debug, info, warn, error, fatal, panic")

Expand All @@ -129,6 +132,7 @@ func init() {
viper.BindPFlag("client.ipcPath", rootCmd.PersistentFlags().Lookup("client-ipcPath"))
viper.BindPFlag("client.levelDbPath", rootCmd.PersistentFlags().Lookup("client-levelDbPath"))
viper.BindPFlag("filesystem.storageDiffsPath", rootCmd.PersistentFlags().Lookup("filesystem-storageDiffsPath"))
viper.BindPFlag("storageDiffs.source", rootCmd.PersistentFlags().Lookup("storageDiffs-source"))
viper.BindPFlag("exporter.fileName", rootCmd.PersistentFlags().Lookup("exporter-name"))
viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level"))
}
Expand All @@ -152,15 +156,21 @@ func initConfig() {
}

func getBlockChain() *geth.BlockChain {
rpcClient, ethClient := getClients()
vdbEthClient := client.NewEthClient(ethClient)
vdbNode := node.MakeNode(rpcClient)
transactionConverter := vRpc.NewRpcTransactionConverter(ethClient)
return geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter)
}

func getClients() (client.RpcClient, *ethclient.Client) {
rawRpcClient, err := rpc.Dial(ipc)

if err != nil {
LogWithCommand.Fatal(err)
}
rpcClient := client.NewRpcClient(rawRpcClient, ipc)
ethClient := ethclient.NewClient(rawRpcClient)
vdbEthClient := client.NewEthClient(ethClient)
vdbNode := node.MakeNode(rpcClient)
transactionConverter := vRpc.NewRpcTransactionConverter(ethClient)
return geth.NewBlockChain(vdbEthClient, rpcClient, vdbNode, transactionConverter)

return rpcClient, ethClient
}
12 changes: 10 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ require (
github.com/ethereum/go-ethereum v1.9.5
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect
github.com/go-sql-driver/mysql v1.4.1 // indirect
github.com/golang/protobuf v1.3.2 // indirect
github.com/gorilla/websocket v1.4.1 // indirect
github.com/graph-gophers/graphql-go v0.0.0-20190724201507-010347b5f9e6 // indirect
github.com/hashicorp/golang-lru v0.5.1
github.com/hashicorp/golang-lru v0.5.3
github.com/howeyc/fsnotify v0.9.0 // indirect
github.com/hpcloud/tail v1.0.0
github.com/huin/goupnp v1.0.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.1 // indirect
Expand Down Expand Up @@ -42,8 +44,14 @@ require (
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/tyler-smith/go-bip39 v1.0.2 // indirect
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 // indirect
golang.org/x/crypto v0.0.0-20190926114937-fa1a29108794 // indirect
golang.org/x/net v0.0.0-20190603091049-60506f45cf65
golang.org/x/sync v0.0.0-20190423024810-112230192c58
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190709231704-1e4459ed25ff // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7
gopkg.in/urfave/cli.v1 v1.0.0-00010101000000-000000000000 // indirect
)

replace github.com/ethereum/go-ethereum => github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101

replace gopkg.in/urfave/cli.v1 => gopkg.in/urfave/cli.v1 v1.20.0
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ github.com/graph-gophers/graphql-go v0.0.0-20190724201507-010347b5f9e6/go.mod h1
github.com/hashicorp/golang-lru v0.0.0-20180201235237-0fb14efe8c47/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk=
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/howeyc/fsnotify v0.9.0 h1:0gtV5JmOKH4A8SsFxG2BczSeXWWPvcMT0euZt5gDAxY=
Expand Down Expand Up @@ -241,6 +243,10 @@ github.com/tyler-smith/go-bip39 v1.0.0/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2
github.com/tyler-smith/go-bip39 v1.0.2 h1:+t3w+KwLXO6154GNJY+qUtIxLTmFjfUmpguQT1OlOT8=
github.com/tyler-smith/go-bip39 v1.0.2/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101 h1:fsHhBzscAwi4u7/F033SFJwTIz+46D8uDWMu2/ZdvzA=
github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101 h1:fsHhBzscAwi4u7/F033SFJwTIz+46D8uDWMu2/ZdvzA=
github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101/go.mod h1:9i0pGnKDUFFr8yC/n8xyrNBVfhYlpwE8J3Ge6ThKvug=
github.com/vulcanize/go-ethereum v0.0.0-20190731183759-8e20673bd101/go.mod h1:9i0pGnKDUFFr8yC/n8xyrNBVfhYlpwE8J3Ge6ThKvug=
github.com/vulcanize/vulcanizedb v0.0.5/go.mod h1:utXkheCL9VjTfmuivuvRiAAyHh54GSN9XRQNEbFCA8k=
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 h1:1cngl9mPEoITZG8s8cVcUy5CeIBYhEESkOB7m6Gmkrk=
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208/go.mod h1:IotVbo4F+mw0EzQ08zFqg7pK3FebNXpaMsRy2RT+Ees=
Expand All @@ -252,6 +258,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90Pveol
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190926114937-fa1a29108794 h1:4Yo9XtTfxfBCecLiBW8TYsFIdN7TkDhjGLWetFo4JSo=
golang.org/x/crypto v0.0.0-20190926114937-fa1a29108794/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a h1:gOpx8G595UYyvj8UK4+OFyY4rx037g3fmfhe5SasG3U=
Expand Down Expand Up @@ -281,6 +289,8 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c h1:+EXw7AwNOKzPFXMZ1yNjO40aW
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190922100055-0a153f010e69 h1:rOhMmluY6kLMhdnrivzec6lLgaVbMHMn2ISQXJeJ5EM=
golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
Expand All @@ -300,6 +310,7 @@ gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190709231704-1e4459ed25ff h1:uuol9OUzSv
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190709231704-1e4459ed25ff/go.mod h1:uAJfkITjFhyEEuUfm7bsmCZRbW5WRq8s9EY8HZ6hCns=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/urfave/cli.v1 v1.20.0 h1:NdAVW6RYxDif9DhDHaAortIu956m2c0v+09AZBPTbE0=
gopkg.in/urfave/cli.v1 v1.20.0/go.mod h1:vuBzUtMdQeixQj8LVd+/98pzhxNGQoyuPBlsXHOQNO0=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
19 changes: 9 additions & 10 deletions libraries/shared/factories/storage/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ package storage

import (
"github.com/ethereum/go-ethereum/common"

"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)

type Transformer struct {
Address common.Address
Mappings storage.Mappings
Repository Repository
HashedAddress common.Hash
Mappings storage.Mappings
Repository Repository
}

func (transformer Transformer) NewTransformer(db *postgres.DB) transformer.StorageTransformer {
Expand All @@ -37,18 +36,18 @@ func (transformer Transformer) NewTransformer(db *postgres.DB) transformer.Stora
return transformer
}

func (transformer Transformer) ContractAddress() common.Address {
return transformer.Address
func (transformer Transformer) KeccakContractAddress() common.Hash {
return transformer.HashedAddress
}

func (transformer Transformer) Execute(row utils.StorageDiffRow) error {
metadata, lookupErr := transformer.Mappings.Lookup(row.StorageKey)
func (transformer Transformer) Execute(diff utils.StorageDiff) error {
metadata, lookupErr := transformer.Mappings.Lookup(diff.StorageKey)
if lookupErr != nil {
return lookupErr
}
value, decodeErr := utils.Decode(row, metadata)
value, decodeErr := utils.Decode(diff, metadata)
if decodeErr != nil {
return decodeErr
}
return transformer.Repository.Create(row.BlockHeight, row.BlockHash.Hex(), metadata, value)
return transformer.Repository.Create(diff.BlockHeight, diff.BlockHash.Hex(), metadata, value)
}
Loading

0 comments on commit b4e16c4

Please sign in to comment.