Skip to content

Commit

Permalink
Merge pull request #5 from makerdao/synchronize-config
Browse files Browse the repository at this point in the history
(VDB-1016) Make config preparation synchronous
  • Loading branch information
rmulhol authored Nov 20, 2019
2 parents 885fe3b + 6e0f4f5 commit 4ec8f88
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 179 deletions.
11 changes: 5 additions & 6 deletions cmd/coldImport.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@
package cmd

import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/makerdao/vulcanizedb/pkg/crypto"
"github.com/makerdao/vulcanizedb/pkg/datastore/ethereum"
"github.com/makerdao/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/makerdao/vulcanizedb/pkg/eth/cold_import"
"github.com/makerdao/vulcanizedb/pkg/eth/converters/cold_db"
vulcCommon "github.com/makerdao/vulcanizedb/pkg/eth/converters/common"
"github.com/makerdao/vulcanizedb/pkg/eth/converters/common"
"github.com/makerdao/vulcanizedb/pkg/fs"
"github.com/makerdao/vulcanizedb/utils"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var coldImportCmd = &cobra.Command{
Expand All @@ -40,7 +39,7 @@ var coldImportCmd = &cobra.Command{
Geth must be synced over all of the desired blocks and must not be running in order to execute this command.`,
Run: func(cmd *cobra.Command, args []string) {
SubCommand = cmd.CalledAs()
LogWithCommand = *log.WithField("SubCommand", SubCommand)
LogWithCommand = *logrus.WithField("SubCommand", SubCommand)
coldImport()
},
}
Expand Down Expand Up @@ -86,7 +85,7 @@ func coldImport() {
blockRepository := repositories.NewBlockRepository(&pgDB)
receiptRepository := repositories.FullSyncReceiptRepository{DB: &pgDB}
transactionConverter := cold_db.NewColdDbTransactionConverter()
blockConverter := vulcCommon.NewBlockConverter(transactionConverter)
blockConverter := common.NewBlockConverter(transactionConverter)

// init and execute cold importer
coldImporter := cold_import.NewColdImporter(ethDB, blockRepository, receiptRepository, blockConverter)
Expand Down
93 changes: 16 additions & 77 deletions cmd/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,9 @@
package cmd

import (
"errors"
"fmt"
"strconv"

log "github.com/sirupsen/logrus"
"github.com/makerdao/vulcanizedb/pkg/plugin"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/makerdao/vulcanizedb/pkg/config"
p2 "github.com/makerdao/vulcanizedb/pkg/plugin"
)

// composeCmd represents the compose command
Expand Down Expand Up @@ -103,90 +96,36 @@ Specify config location when executing the command:
./vulcanizedb compose --config=./environments/config_name.toml`,
Run: func(cmd *cobra.Command, args []string) {
SubCommand = cmd.CalledAs()
LogWithCommand = *log.WithField("SubCommand", SubCommand)
LogWithCommand = *logrus.WithField("SubCommand", SubCommand)
compose()
},
}

func compose() {
// Build plugin generator config
prepConfig()
configErr := prepConfig()
if configErr != nil {
LogWithCommand.Fatalf("failed to prepare config: %s", configErr.Error())
}

// Generate code to build the plugin according to the config file
LogWithCommand.Info("generating plugin")
generator, err := p2.NewGenerator(genConfig, databaseConfig)
if err != nil {
LogWithCommand.Debug("initializing plugin generator failed")
LogWithCommand.Fatal(err)
generator, constructorErr := plugin.NewGenerator(genConfig, databaseConfig)
if constructorErr != nil {
LogWithCommand.Fatalf("initializing plugin generator failed: %s", constructorErr.Error())
}
err = generator.GenerateExporterPlugin()
if err != nil {
LogWithCommand.Debug("generating plugin failed")
LogWithCommand.Fatal(err)
generateErr := generator.GenerateExporterPlugin()
if generateErr != nil {
LogWithCommand.Fatalf("generating plugin failed: %s", generateErr.Error())
}
// TODO: Embed versioning info in the .so files so we know which version of vulcanizedb to run them with
_, pluginPath, err := genConfig.GetPluginPaths()
if err != nil {
LogWithCommand.Debug("getting plugin path failed")
LogWithCommand.Fatal(err)
_, pluginPath, pathErr := genConfig.GetPluginPaths()
if pathErr != nil {
LogWithCommand.Fatalf("getting plugin path failed: %s", pathErr.Error())
}
fmt.Printf("Composed plugin %s", pluginPath)
LogWithCommand.Info("plugin .so file output to ", pluginPath)
}

func init() {
rootCmd.AddCommand(composeCmd)
}

func prepConfig() {
LogWithCommand.Info("configuring plugin")
names := viper.GetStringSlice("exporter.transformerNames")
transformers := make(map[string]config.Transformer)
for _, name := range names {
transformer := viper.GetStringMapString("exporter." + name)
p, pOK := transformer["path"]
if !pOK || p == "" {
LogWithCommand.Fatal(name, " transformer config is missing `path` value")
}
r, rOK := transformer["repository"]
if !rOK || r == "" {
LogWithCommand.Fatal(name, " transformer config is missing `repository` value")
}
m, mOK := transformer["migrations"]
if !mOK || m == "" {
LogWithCommand.Fatal(name, " transformer config is missing `migrations` value")
}
mr, mrOK := transformer["rank"]
if !mrOK || mr == "" {
LogWithCommand.Fatal(name, " transformer config is missing `rank` value")
}
rank, err := strconv.ParseUint(mr, 10, 64)
if err != nil {
LogWithCommand.Fatal(name, " migration `rank` can't be converted to an unsigned integer")
}
t, tOK := transformer["type"]
if !tOK {
LogWithCommand.Fatal(name, " transformer config is missing `type` value")
}
transformerType := config.GetTransformerType(t)
if transformerType == config.UnknownTransformerType {
LogWithCommand.Fatal(errors.New(`unknown transformer type in exporter config accepted types are "eth_event", "eth_storage"`))
}

transformers[name] = config.Transformer{
Path: p,
Type: transformerType,
RepositoryPath: r,
MigrationPath: m,
MigrationRank: rank,
}
}

genConfig = config.Plugin{
Transformers: transformers,
FilePath: "$GOPATH/src/github.com/makerdao/vulcanizedb/plugins",
FileName: viper.GetString("exporter.name"),
Save: viper.GetBool("exporter.save"),
Home: viper.GetString("exporter.home"),
}
}
61 changes: 29 additions & 32 deletions cmd/composeAndExecute.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@
package cmd

import (
"github.com/ethereum/go-ethereum/statediff"
"github.com/makerdao/vulcanizedb/libraries/shared/fetcher"
"github.com/makerdao/vulcanizedb/libraries/shared/streamer"
"github.com/makerdao/vulcanizedb/pkg/fs"
"os"
"plugin"
syn "sync"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/ethereum/go-ethereum/statediff"
"github.com/makerdao/vulcanizedb/libraries/shared/fetcher"
"github.com/makerdao/vulcanizedb/libraries/shared/streamer"
"github.com/makerdao/vulcanizedb/libraries/shared/watcher"
"github.com/makerdao/vulcanizedb/pkg/fs"
p2 "github.com/makerdao/vulcanizedb/pkg/plugin"
"github.com/makerdao/vulcanizedb/pkg/plugin/helpers"
"github.com/makerdao/vulcanizedb/utils"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

// composeAndExecuteCmd represents the composeAndExecute command
Expand Down Expand Up @@ -109,55 +107,54 @@ Specify config location when executing the command:
./vulcanizedb composeAndExecute --config=./environments/config_name.toml`,
Run: func(cmd *cobra.Command, args []string) {
SubCommand = cmd.CalledAs()
LogWithCommand = *log.WithField("SubCommand", SubCommand)
LogWithCommand = *logrus.WithField("SubCommand", SubCommand)
composeAndExecute()
},
}

func composeAndExecute() {
// Build plugin generator config
prepConfig()
configErr := prepConfig()
if configErr != nil {
LogWithCommand.Fatalf("failed to prepare config: %s", configErr.Error())
}

// Generate code to build the plugin according to the config file
LogWithCommand.Info("generating plugin")
generator, err := p2.NewGenerator(genConfig, databaseConfig)
if err != nil {
LogWithCommand.Fatal(err)
generator, constructorErr := p2.NewGenerator(genConfig, databaseConfig)
if constructorErr != nil {
LogWithCommand.Fatalf("failed to initialize generator: %s", constructorErr.Error())
}
err = generator.GenerateExporterPlugin()
if err != nil {
LogWithCommand.Debug("generating plugin failed")
LogWithCommand.Fatal(err)
generateErr := generator.GenerateExporterPlugin()
if generateErr != nil {
LogWithCommand.Fatalf("generating plugin failed: %s", generateErr.Error())
}

// Get the plugin path and load the plugin
_, pluginPath, err := genConfig.GetPluginPaths()
if err != nil {
LogWithCommand.Fatal(err)
_, pluginPath, pathErr := genConfig.GetPluginPaths()
if pathErr != nil {
LogWithCommand.Fatalf("failed to get plugin paths: %s", pathErr.Error())
}
if !genConfig.Save {
defer helpers.ClearFiles(pluginPath)
}
LogWithCommand.Info("linking plugin ", pluginPath)
plug, err := plugin.Open(pluginPath)
if err != nil {
LogWithCommand.Debug("linking plugin failed")
LogWithCommand.Fatal(err)
plug, openErr := plugin.Open(pluginPath)
if openErr != nil {
LogWithCommand.Fatalf("linking plugin failed: %s", openErr.Error())
}

// Load the `Exporter` symbol from the plugin
LogWithCommand.Info("loading transformers from plugin")
symExporter, err := plug.Lookup("Exporter")
if err != nil {
LogWithCommand.Debug("loading Exporter symbol failed")
LogWithCommand.Fatal(err)
symExporter, lookupErr := plug.Lookup("Exporter")
if lookupErr != nil {
LogWithCommand.Fatalf("loading Exporter symbol failed: %s", lookupErr.Error())
}

// Assert that the symbol is of type Exporter
exporter, ok := symExporter.(Exporter)
if !ok {
LogWithCommand.Debug("plugged-in symbol not of type Exporter")
os.Exit(1)
LogWithCommand.Fatal("plugged-in symbol not of type Exporter")
}

// Use the Exporters export method to load the EventTransformerInitializer, StorageTransformerInitializer, and ContractTransformerInitializer sets
Expand All @@ -183,7 +180,7 @@ func composeAndExecute() {
if len(ethStorageInitializers) > 0 {
switch storageDiffsSource {
case "geth":
log.Debug("fetching storage diffs from geth pub sub")
logrus.Debug("fetching storage diffs from geth pub sub")
rpcClient, _ := getClients()
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
payloadChan := make(chan statediff.Payload)
Expand All @@ -193,7 +190,7 @@ func composeAndExecute() {
wg.Add(1)
go watchEthStorage(&sw, &wg)
default:
log.Debug("fetching storage diffs from csv")
logrus.Debug("fetching storage diffs from csv")
tailer := fs.FileTailer{Path: storageDiffsPath}
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
sw := watcher.NewStorageWatcher(storageFetcher, &db)
Expand Down
9 changes: 4 additions & 5 deletions cmd/contractWatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ package cmd

import (
"fmt"
"github.com/makerdao/vulcanizedb/pkg/config"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

st "github.com/makerdao/vulcanizedb/libraries/shared/transformer"
"github.com/makerdao/vulcanizedb/pkg/config"
ft "github.com/makerdao/vulcanizedb/pkg/contract_watcher/full/transformer"
ht "github.com/makerdao/vulcanizedb/pkg/contract_watcher/header/transformer"
"github.com/makerdao/vulcanizedb/utils"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

// contractWatcherCmd represents the contractWatcher command
Expand Down Expand Up @@ -80,7 +79,7 @@ Requires a .toml config file:
`,
Run: func(cmd *cobra.Command, args []string) {
SubCommand = cmd.CalledAs()
LogWithCommand = *log.WithField("SubCommand", SubCommand)
LogWithCommand = *logrus.WithField("SubCommand", SubCommand)
contractWatcher()
},
}
Expand Down
Loading

0 comments on commit 4ec8f88

Please sign in to comment.