Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
Merge pull request #39 from vulcanize/omni_update
Browse files Browse the repository at this point in the history
Refactoring omni pkg code + PR72 from public branch
  • Loading branch information
i-norden authored Mar 22, 2019
2 parents e74e839 + 69836d9 commit 2506991
Show file tree
Hide file tree
Showing 83 changed files with 1,861 additions and 1,394 deletions.
11 changes: 4 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,25 @@ go:
services:
- postgresql
addons:
postgresql: "9.6"

postgresql: '9.6'
go_import_path: github.com/vulcanize/vulcanizedb

before_install:
# ginkgo golint dep goose
- make installtools
- bash ./scripts/install-postgres-10.sh
- curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | sudo apt-key add -
- echo "deb https://dl.yarnpkg.com/debian/ stable main" | sudo tee /etc/apt/sources.list.d/yarn.list
- sudo apt-get update && sudo apt-get install yarn

before_script:
- sudo -u postgres createdb vulcanize_private
- make migrate NAME=vulcanize_private
- cd postgraphile && yarn

script:
- yarn test
- cd ../
- make test
- make integrationtest

notifications:
email: false
env:
matrix:
secure: hz6YPkTm59QhOQ2+05K+AWHw0wOoPjz9wqfUKZcuUi+ICdcuClXMt+hVpUDmwrGOCp8B5y8hyxr/iq7P+3MLwz1/2JYxc9S9MT1O0WXU8ZuWZR0LI1e2ZhZCF3E/JWq6c4atxFIEhcv7roBUkbUcRA8cpRdZmEmpSM8JyP0z76CezIG/HeyUdVW34DLjTBNB0TJdOZyfOh0aCvzgXR/kfKaSYNBhJY7j2UK7x0qK0UlQ/n7RHCrtjWoNWpuwl9bw1F5plMHOD9bq0oDG6gs1SFBaybfEMN71Hp0QxhD/u+1tVuHfGooYhzVgxStPSCpSkgQ7vgSZI766ErqPc3B6Wv9K+s5exPLgCykEiLorW6qI8A+mdiPIiIzLBRMcbF/kCo7gFh0kDIYbSTjS5COfjNw/fKsp59upXF4VtCDgVgjAemY6XT4lziZiVQwiK1Oyln8HrIux1aJEWgRGEQpQqwVeCUHClHus5Paf/N0Ci5f9NHh2zbkZvDuUF2uQu6Wc58wXHcVsloyfQibJrH1q2sQRqdiPfN5Y9l0igFzXILFd4itXMyMnSDQh6+GD6V0YY/hGufAs42UymjGYbEwZQP/gn8/bQpilngbmlJ7bB2nva70kXgqhuNiZM5XuFuQMIP3U2Tbm87FHEFUCoKPv2if9Ft74YkAuzVljAMo2YLQ=
187 changes: 135 additions & 52 deletions README.md

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions cmd/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var composeCmd = &cobra.Command{
port = 5432
[client]
ipcPath = "http://kovan0.vulcanize.io:8545"
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
[exporter]
home = "github.com/vulcanize/vulcanizedb"
Expand All @@ -62,7 +62,7 @@ var composeCmd = &cobra.Command{
rank = "0"
[exporter.transformer2]
path = "path/to/transformer2"
type = "eth_event"
type = "eth_contract"
repository = "github.com/account/repo"
migrations = "db/migrations"
rank = "0"
Expand Down Expand Up @@ -91,7 +91,9 @@ from it and loaded into and executed over by the appropriate watcher.
The type of watcher that the transformer works with is specified using the
type variable for each transformer in the config. Currently there are watchers
of event data from an eth node (eth_event) and storage data from an eth node
(eth_storage).
(eth_storage), and a more generic interface for accepting contract_watcher pkg
based transformers which can perform both event watching and public method
polling (eth_contract).
Transformers of different types can be ran together in the same command using a
single config file or in separate command instances using different config files
Expand Down
19 changes: 14 additions & 5 deletions cmd/composeAndExecute.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var composeAndExecuteCmd = &cobra.Command{
port = 5432
[client]
ipcPath = "http://kovan0.vulcanize.io:8545"
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
[exporter]
home = "github.com/vulcanize/vulcanizedb"
Expand All @@ -62,7 +62,7 @@ var composeAndExecuteCmd = &cobra.Command{
rank = "0"
[exporter.transformer2]
path = "path/to/transformer2"
type = "eth_event"
type = "eth_contract"
repository = "github.com/account/repo"
migrations = "db/migrations"
rank = "2"
Expand Down Expand Up @@ -91,7 +91,9 @@ from it and loaded into and executed over by the appropriate watcher.
The type of watcher that the transformer works with is specified using the
type variable for each transformer in the config. Currently there are watchers
of event data from an eth node (eth_event) and storage data from an eth node
(eth_storage).
(eth_storage), and a more generic interface for accepting contract_watcher pkg
based transformers which can perform both event watching and public method
polling (eth_contract).
Transformers of different types can be ran together in the same command using a
single config file or in separate command instances using different config files
Expand Down Expand Up @@ -149,8 +151,8 @@ func composeAndExecute() {
os.Exit(1)
}

// Use the Exporters export method to load the EventTransformerInitializer and StorageTransformerInitializer sets
ethEventInitializers, ethStorageInitializers := exporter.Export()
// Use the Exporters export method to load the EventTransformerInitializer, StorageTransformerInitializer, and ContractTransformerInitializer sets
ethEventInitializers, ethStorageInitializers, ethContractInitializers := exporter.Export()

// Setup bc and db objects
blockChain := getBlockChain()
Expand All @@ -173,6 +175,13 @@ func composeAndExecute() {
wg.Add(1)
go watchEthStorage(&sw, &wg)
}

if len(ethContractInitializers) > 0 {
gw := watcher.NewContractWatcher(&db, blockChain)
gw.AddTransformers(ethContractInitializers)
wg.Add(1)
go watchEthContract(&gw, &wg)
}
wg.Wait()
}

Expand Down
125 changes: 125 additions & 0 deletions cmd/contractWatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.

// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package cmd

import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/config"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

st "github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
ft "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/transformer"
lt "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/light/transformer"
"github.com/vulcanize/vulcanizedb/utils"
)

// contractWatcherCmd represents the contractWatcher command
var contractWatcherCmd = &cobra.Command{
Use: "contractWatcher",
Short: "Watches events at the provided contract address using fully synced vDB",
Long: `Uses input contract address and event filters to watch events
Expects an ethereum node to be running
Expects an archival node synced into vulcanizeDB
Requires a .toml config file:
[database]
name = "vulcanize_public"
hostname = "localhost"
port = 5432
[client]
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
[contract]
network = ""
addresses = [
"contractAddress1",
"contractAddress2"
]
[contract.contractAddress1]
abi = 'ABI for contract 1'
startingBlock = 982463
[contract.contractAddress2]
abi = 'ABI for contract 2'
events = [
"event1",
"event2"
]
eventArgs = [
"arg1",
"arg2"
]
methods = [
"method1",
"method2"
]
methodArgs = [
"arg1",
"arg2"
]
startingBlock = 4448566
piping = true
`,
Run: func(cmd *cobra.Command, args []string) {
contractWatcher()
},
}

var (
mode string
)

func contractWatcher() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

blockChain := getBlockChain()
db := utils.LoadPostgres(databaseConfig, blockChain.Node())

var t st.ContractTransformer
con := config.ContractConfig{}
con.PrepConfig()
switch mode {
case "light":
t = lt.NewTransformer(con, blockChain, &db)
case "full":
t = ft.NewTransformer(con, blockChain, &db)
default:
log.Fatal("Invalid mode")
}

err := t.Init()
if err != nil {
log.Fatal(fmt.Sprintf("Failed to initialized transformer\r\nerr: %v\r\n", err))
}

for range ticker.C {
err = t.Execute()
if err != nil {
log.Error("Execution error for transformer:", t.GetConfig().Name, err)
}
}
}

func init() {
rootCmd.AddCommand(contractWatcherCmd)
contractWatcherCmd.Flags().StringVarP(&mode, "mode", "o", "light", "'light' or 'full' mode to work with either light synced or fully synced vDB (default is light)")
}
38 changes: 25 additions & 13 deletions cmd/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var executeCmd = &cobra.Command{
port = 5432
[client]
ipcPath = "http://kovan0.vulcanize.io:8545"
ipcPath = "/Users/user/Library/Ethereum/geth.ipc"
[exporter]
name = "exampleTransformerExporter"
Expand Down Expand Up @@ -99,8 +99,8 @@ func execute() {
os.Exit(1)
}

// Use the Exporters export method to load the EventTransformerInitializer and StorageTransformerInitializer sets
ethEventInitializers, ethStorageInitializers := exporter.Export()
// Use the Exporters export method to load the EventTransformerInitializer, StorageTransformerInitializer, and ContractTransformerInitializer sets
ethEventInitializers, ethStorageInitializers, ethContractInitializers := exporter.Export()

// Setup bc and db objects
blockChain := getBlockChain()
Expand All @@ -123,6 +123,13 @@ func execute() {
wg.Add(1)
go watchEthStorage(&sw, &wg)
}

if len(ethContractInitializers) > 0 {
gw := watcher.NewContractWatcher(&db, blockChain)
gw.AddTransformers(ethContractInitializers)
wg.Add(1)
go watchEthContract(&gw, &wg)
}
wg.Wait()
}

Expand All @@ -132,7 +139,7 @@ func init() {
}

type Exporter interface {
Export() ([]transformer.EventTransformerInitializer, []transformer.StorageTransformerInitializer)
Export() ([]transformer.EventTransformerInitializer, []transformer.StorageTransformerInitializer, []transformer.ContractTransformerInitializer)
}

func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
Expand All @@ -148,23 +155,28 @@ func watchEthEvents(w *watcher.EventWatcher, wg *syn.WaitGroup) {
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for range ticker.C {
err := w.Execute(recheck)
if err != nil {
// TODO Handle watcher errors in execute
}
w.Execute(recheck)
}
}

func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
defer wg.Done()
// Execute over the StorageTransformerInitializer set using the watcher
// Execute over the StorageTransformerInitializer set using the storage watcher
log.Info("executing storage transformers")
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for range ticker.C {
err := w.Execute()
if err != nil {
// TODO Handle watcher errors in execute
}
w.Execute()
}
}

func watchEthContract(w *watcher.ContractWatcher, wg *syn.WaitGroup) {
defer wg.Done()
// Execute over the ContractTransformerInitializer set using the contract watcher
log.Info("executing contract_watcher transformers")
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for range ticker.C {
w.Execute()
}
}
Loading

0 comments on commit 2506991

Please sign in to comment.