Skip to content

Commit

Permalink
Merge pull request #139 from makerdao/staging
Browse files Browse the repository at this point in the history
staging => prod
  • Loading branch information
rmulhol authored Oct 7, 2020
2 parents a6bcf75 + 9b59437 commit b9393cd
Show file tree
Hide file tree
Showing 82 changed files with 2,411 additions and 2,012 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
dist: trusty
language: go
go:
- 1.14
- 1.15
services:
- postgresql
- docker
Expand All @@ -20,7 +20,7 @@ deploy:
- provider: script
script: bash ./.travis/deploy.sh staging
on:
branch: staging
branch: beta
- provider: script
script: bash ./.travis/deploy.sh prod
on:
Expand Down
14 changes: 0 additions & 14 deletions .travis/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ fi
message BUILDING HEADER-SYNC
docker build -f dockerfiles/header_sync/Dockerfile . -t makerdao/vdb-headersync:$TAG

message BUILDING EXTRACT-DIFFS
docker build -f dockerfiles/extract_diffs/Dockerfile . -t makerdao/vdb-extract-diffs:$TAG

message BUILDING RESET-HEADER-CHECK
docker build -f dockerfiles/reset_header_check_count/Dockerfile . -t makerdao/vdb-reset-header-check:$TAG

Expand All @@ -42,9 +39,6 @@ echo "$DOCKER_PASSWORD" | docker login --username "$DOCKER_USER" --password-stdi
message PUSHING HEADER-SYNC
docker push makerdao/vdb-headersync:$TAG

message PUSHING EXTRACT-DIFFS
docker push makerdao/vdb-extract-diffs:$TAG

message PUSHING RESET-HEADER-CHECK
docker push makerdao/vdb-reset-header-check:$TAG

Expand All @@ -53,17 +47,9 @@ if [ "$ENVIRONMENT" == "prod" ]; then
message DEPLOYING HEADER-SYNC
aws ecs update-service --cluster vdb-cluster-$ENVIRONMENT --service vdb-header-sync-$ENVIRONMENT --force-new-deployment --endpoint https://ecs.$PROD_REGION.amazonaws.com --region $PROD_REGION

message DEPLOYING EXTRACT-DIFFS
aws ecs update-service --cluster vdb-cluster-$ENVIRONMENT --service vdb-extract-diffs-$ENVIRONMENT --force-new-deployment --endpoint https://ecs.$PROD_REGION.amazonaws.com --region $PROD_REGION
elif [ "$ENVIRONMENT" == "staging" ]; then
message DEPLOYING HEADER-SYNC
aws ecs update-service --cluster vdb-cluster-$ENVIRONMENT --service vdb-header-sync-$ENVIRONMENT --force-new-deployment --endpoint https://ecs.$STAGING_REGION.amazonaws.com --region $STAGING_REGION

message DEPLOYING EXTRACT-DIFFS
aws ecs update-service --cluster vdb-cluster-$ENVIRONMENT --service vdb-extract-diffs-$ENVIRONMENT --force-new-deployment --endpoint https://ecs.$STAGING_REGION.amazonaws.com --region $STAGING_REGION

message DEPLOYING EXTRACT-DIFFS-NEW-GETH
aws ecs update-service --cluster vdb-cluster-$ENVIRONMENT --service vdb-extract-diffs2-$ENVIRONMENT --force-new-deployment --endpoint https://ecs.$STAGING_REGION.amazonaws.com --region $STAGING_REGION
else
message UNKNOWN ENVIRONMENT
fi
69 changes: 35 additions & 34 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ $(BIN)/ginkgo:
go get -u github.com/onsi/ginkgo/ginkgo

## Migration tool
GOOSE = $(BIN)/goose
$(BIN)/goose:
go get -u -d github.com/pressly/goose/cmd/goose
go build -tags='no_mysql no_sqlite' -o $(BIN)/goose github.com/pressly/goose/cmd/goose
GOOSE = go run -tags='no_mysql no_sqlite3 no_mssql no_redshift' github.com/pressly/goose/cmd/goose

## Source linter
LINT = $(BIN)/golint
Expand All @@ -25,9 +22,8 @@ $(BIN)/gometalinter.v2:
go get -u gopkg.in/alecthomas/gometalinter.v2
$(METALINT) --install


.PHONY: installtools
installtools: | $(LINT) $(GOOSE) $(GINKGO)
installtools: | $(LINT) $(GINKGO)
echo "Installing tools"

.PHONY: metalint
Expand Down Expand Up @@ -65,6 +61,7 @@ test: | $(GINKGO) $(LINT)

.PHONY: integrationtest
integrationtest: | $(GINKGO) $(LINT)
test -n "$(CLIENT_IPCPATH)" # $$(CLIENT_IPCPATH)
go vet ./...
go fmt ./...
dropdb --if-exists $(TEST_DB)
Expand Down Expand Up @@ -100,30 +97,30 @@ checkmigname:
# Migration operations
## Rollback the last migration
.PHONY: rollback
rollback: $(GOOSE) checkdbvars
rollback: checkdbvars
$(GOOSE) -dir db/migrations postgres "$(CONNECT_STRING)" down
pg_dump -O -s $(CONNECT_STRING) > db/schema.sql
pg_dump -n 'public' -O -s $(CONNECT_STRING) > db/schema.sql


## Rollbackt to a select migration (id/timestamp)
.PHONY: rollback_to
rollback_to: $(GOOSE) checkmigration checkdbvars
rollback_to: checkmigration checkdbvars
$(GOOSE) -dir db/migrations postgres "$(CONNECT_STRING)" down-to "$(MIGRATION)"

## Apply all migrations not already run
.PHONY: migrate
migrate: $(GOOSE) checkdbvars
migrate: checkdbvars
$(GOOSE) -dir db/migrations postgres "$(CONNECT_STRING)" up
pg_dump -O -s $(CONNECT_STRING) > db/schema.sql
pg_dump -n 'public' -O -s $(CONNECT_STRING) > db/schema.sql

## Create a new migration file
.PHONY: new_migration
new_migration: $(GOOSE) checkmigname
new_migration: checkmigname
$(GOOSE) -dir db/migrations create $(NAME) sql

## Check which migrations are applied at the moment
.PHONY: migration_status
migration_status: $(GOOSE) checkdbvars
migration_status: checkdbvars
$(GOOSE) -dir db/migrations postgres "$(CONNECT_STRING)" status

# Convert timestamped migrations to versioned (to be run in CI);
Expand All @@ -138,25 +135,29 @@ import:
test -n "$(NAME)" # $$NAME
psql $(NAME) < db/schema.sql


# Docker actions
## Rinkeby docker environment
RINKEBY_COMPOSE_FILE=dockerfiles/rinkeby/docker-compose.yml

.PHONY: rinkeby_env_up
rinkeby_env_up:
docker-compose -f $(RINKEBY_COMPOSE_FILE) up -d geth
docker-compose -f $(RINKEBY_COMPOSE_FILE) up --build migrations
docker-compose -f $(RINKEBY_COMPOSE_FILE) up -d --build vulcanizedb

.PHONY: rinkeby_env_deploy
rinkeby_env_deploy:
docker-compose -f $(RINKEBY_COMPOSE_FILE) up -d --build vulcanizedb

.PHONY: dev_env_migrate
rinkeby_env_migrate:
docker-compose -f $(RINKEBY_COMPOSE_FILE) up --build migrations

.PHONY: rinkeby_env_down
rinkeby_env_down:
docker-compose -f $(RINKEBY_COMPOSE_FILE) down
# Build any docker image in dockerfiles
.PHONY: dockerbuild
dockerbuild:
test -n "$(IMAGE)" # $$IMAGE
docker build -t $(IMAGE) -f dockerfiles/$(IMAGE)/Dockerfile .

.PHONY: header_sync
header_sync: STARTING_BLOCK_NUMBER ?= 10000000
header_sync: HOST ?= host.docker.internal
header_sync: DATABASE_PASSWORD ?= postgres
header_sync: IMAGE_WITH_TAG ?= header_sync:latest
header_sync:
test -n "$(NAME)" # $$(NAME) - Database Name
test -n "$(CLIENT_IPCPATH)" # $$(CLIENT_IPCPATH)
docker run \
-it \
-p "5432:5432" \
-e "STARTING_BLOCK_NUMBER=$(STARTING_BLOCK_NUMBER)" \
-e "DATABASE_NAME=$(NAME)" \
-e "DATABASE_HOSTNAME=$(HOST)" \
-e "DATABASE_PORT=$(PORT)" \
-e "DATABASE_USER=$(USER)" \
-e "DATABASE_PASSWORD=$(DATABASE_PASSWORD)" \
-e "CLIENT_IPCPATH=$(CLIENT_IPCPATH)" \
$(IMAGE_WITH_TAG)
12 changes: 11 additions & 1 deletion cmd/backfillEvents.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/makerdao/vulcanizedb/libraries/shared/logs"
"github.com/makerdao/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/makerdao/vulcanizedb/utils"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -42,10 +43,19 @@ func backFillEvents() error {
LogWithCommand.Fatalf("SubCommand %v: exporting transformers failed: %v", SubCommand, exportTransformersErr)
}

if len(ethEventInitializers) < 1 {
logrus.Warn("not back-filling events because no transformers configured for back-fill")
return nil
}

blockChain := getBlockChain()
db := utils.LoadPostgres(databaseConfig, blockChain.Node())

extractor := logs.NewLogExtractor(&db, blockChain)
repo, repoErr := repositories.NewCheckedHeadersRepository(&db, genConfig.Schema)
if repoErr != nil {
return fmt.Errorf("error creating checked headers repository %w for schema %s", repoErr, genConfig.Schema)
}
extractor := logs.NewLogExtractor(&db, blockChain, repo)

for _, initializer := range ethEventInitializers {
transformer := initializer(&db)
Expand Down
3 changes: 2 additions & 1 deletion cmd/backfillStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func backfillStorage() error {
}

if len(storageInitializers) == 0 {
return fmt.Errorf("SubCommand %v: no storage transformers found in the given config", SubCommand)
logrus.Warn("not back-filling storage because no contracts configured for back-fill")
return nil
}

var loader backfill.StorageValueLoader
Expand Down
1 change: 1 addition & 0 deletions cmd/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var composeCmd = &cobra.Command{
[exporter]
home = "github.com/makerdao/vulcanizedb"
name = "exampleTransformerExporter"
schema = "<plugin schema>"
save = false
transformerNames = [
"transformer1",
Expand Down
27 changes: 20 additions & 7 deletions cmd/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/makerdao/vulcanizedb/libraries/shared/logs"
"github.com/makerdao/vulcanizedb/libraries/shared/transformer"
"github.com/makerdao/vulcanizedb/libraries/shared/watcher"
"github.com/makerdao/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/makerdao/vulcanizedb/pkg/fs"
"github.com/makerdao/vulcanizedb/utils"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -77,7 +78,8 @@ func init() {
executeCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
executeCmd.Flags().DurationVarP(&retryInterval, "retry-interval", "i", 7*time.Second, "interval duration between retries on execution error")
executeCmd.Flags().IntVarP(&maxUnexpectedErrors, "max-unexpected-errs", "m", 5, "maximum number of unexpected errors to allow (with retries) before exiting")
executeCmd.Flags().Int64VarP(&diffBlockFromHeadOfChain, "diff-blocks-from-head", "d", -1, "number of blocks from head of chain to start reprocessing diffs, defaults to -1 so all diffs are processsed")
executeCmd.Flags().Int64VarP(&newDiffBlockFromHeadOfChain, "new-diff-blocks-from-head", "d", -1, "number of blocks from head of chain to start reprocessing new diffs, defaults to -1 so all diffs are processsed")
executeCmd.Flags().Int64VarP(&unrecognizedDiffBlockFromHeadOfChain, "unrecognized-diff-blocks-from-head", "u", -1, "number of blocks from head of chain to start reprocessing unrecognized diffs, defaults to -1 so all diffs are processsed")
}

func executeTransformers() {
Expand All @@ -95,7 +97,11 @@ func executeTransformers() {
// Use WaitGroup to wait on both goroutines
var wg sync.WaitGroup
if len(ethEventInitializers) > 0 {
extractor := logs.NewLogExtractor(&db, blockChain)
repo, repoErr := repositories.NewCheckedHeadersRepository(&db, genConfig.Schema)
if repoErr != nil {
LogWithCommand.Fatalf("failed to create checked headers repository %s for schema %s", repoErr.Error(), genConfig.Schema)
}
extractor := logs.NewLogExtractor(&db, blockChain, repo)
delegator := logs.NewLogDelegator(&db)
eventHealthCheckMessage := []byte("event watcher starting\n")
statusWriter := fs.NewStatusWriter(healthCheckFile, eventHealthCheckMessage)
Expand All @@ -109,12 +115,19 @@ func executeTransformers() {
}

if len(ethStorageInitializers) > 0 {
storageHealthCheckMessage := []byte("storage watcher starting\n")
statusWriter := fs.NewStatusWriter(healthCheckFile, storageHealthCheckMessage)
sw := watcher.NewStorageWatcher(&db, diffBlockFromHeadOfChain, statusWriter)
sw.AddTransformers(ethStorageInitializers)
newDiffStorageHealthCheckMessage := []byte("storage watcher for new diffs starting\n")
newDiffStatusWriter := fs.NewStatusWriter(healthCheckFile, newDiffStorageHealthCheckMessage)
newDiffStorageWatcher := watcher.NewStorageWatcher(&db, newDiffBlockFromHeadOfChain, newDiffStatusWriter, watcher.New)
newDiffStorageWatcher.AddTransformers(ethStorageInitializers)
wg.Add(1)
go watchEthStorage(&newDiffStorageWatcher, &wg)

unrecognizedDiffStorageHealthCheckMessage := []byte("storage watcher for unrecognized diffs starting\n")
unrecognizedDiffStatusWriter := fs.NewStatusWriter(healthCheckFile, unrecognizedDiffStorageHealthCheckMessage)
unrecognizedDiffStorageWatcher := watcher.NewStorageWatcher(&db, unrecognizedDiffBlockFromHeadOfChain, unrecognizedDiffStatusWriter, watcher.Unrecognized)
unrecognizedDiffStorageWatcher.AddTransformers(ethStorageInitializers)
wg.Add(1)
go watchEthStorage(&sw, &wg)
go watchEthStorage(&unrecognizedDiffStorageWatcher, &wg)
}

if len(ethContractInitializers) > 0 {
Expand Down
56 changes: 40 additions & 16 deletions cmd/extractDiffs.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package cmd

import (
"github.com/ethereum/go-ethereum/statediff"
"strings"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/makerdao/vulcanizedb/libraries/shared/storage"
"github.com/makerdao/vulcanizedb/libraries/shared/storage/fetcher"
"github.com/makerdao/vulcanizedb/libraries/shared/streamer"
"github.com/makerdao/vulcanizedb/pkg/fs"
"github.com/makerdao/vulcanizedb/utils"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

// extractDiffsCmd represents the extractDiffs command
Expand All @@ -29,34 +34,38 @@ func init() {
rootCmd.AddCommand(extractDiffsCmd)
}

func getContractAddresses() []string {
LogWithCommand.Info("Getting contract addresses from config file")
contracts := viper.GetStringMap("contract")
var addresses []string
for contractName := range contracts {
address := viper.GetStringMapString("contract." + contractName)["address"]
addresses = append(addresses, address)
}
return addresses
}

func extractDiffs() {
// Setup bc and db objects
blockChain := getBlockChain()
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
addressesToWatch := getContractAddresses()

healthCheckFile := "/tmp/connection"
msg := []byte("geth storage fetcher connection established\n")
gethStatusWriter := fs.NewStatusWriter(healthCheckFile, msg)

// initialize fetcher
var storageFetcher fetcher.IStorageFetcher
logrus.Debug("fetching storage diffs from geth")
switch storageDiffsSource {
case "geth":
logrus.Info("Using original geth patch")
logrus.Debug("fetching storage diffs from geth pub sub")
rpcClient, _ := getClients()
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
payloadChan := make(chan statediff.Payload)

storageFetcher = fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan, fetcher.OldGethPatch, gethStatusWriter)
case "new-geth":
logrus.Info("Using new geth patch")
logrus.Debug("fetching storage diffs from geth pub sub")
rpcClient, _ := getClients()
stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient)
payloadChan := make(chan statediff.Payload)

storageFetcher = fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan, fetcher.NewGethPatch, gethStatusWriter)
logrus.Info("Using new geth patch with filters event system")
_, ethClient := getClients()
filterQuery := createFilterQuery(addressesToWatch)
stateDiffStreamer := streamer.NewEthStateChangeStreamer(ethClient, filterQuery)
payloadChan := make(chan filters.Payload)
storageFetcher = fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan, gethStatusWriter)
default:
logrus.Debug("fetching storage diffs from csv")
tailer := fs.FileTailer{Path: storageDiffsPath}
Expand All @@ -73,3 +82,18 @@ func extractDiffs() {
LogWithCommand.Fatalf("extracting diffs failed: %s", err.Error())
}
}

func createFilterQuery(watchedAddresses []string) ethereum.FilterQuery {
logrus.Infof("Creating a filter query for %d watched addresses", len(watchedAddresses))
addressesToLog := strings.Join(watchedAddresses[:], ", ")
logrus.Infof("Watched addresses: %s", addressesToLog)

var addresses []common.Address
for _, addressString := range watchedAddresses {
addresses = append(addresses, common.HexToAddress(addressString))
}

return ethereum.FilterQuery{
Addresses: addresses,
}
}
7 changes: 6 additions & 1 deletion cmd/resetHeaderCheckCount.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func init() {
func resetHeaderCount(blockNumber int64) error {
blockChain := getBlockChain()
db := utils.LoadPostgres(databaseConfig, blockChain.Node())
repo := repositories.NewCheckedHeadersRepository(&db)
repo, repoErr := repositories.NewCheckedHeadersRepository(&db, genConfig.Schema)

if repoErr != nil {
return fmt.Errorf("error creating checked headers repository %w", repoErr)
}

return repo.MarkSingleHeaderUnchecked(blockNumber)
}
Loading

0 comments on commit b9393cd

Please sign in to comment.