From be92d4cc688ed70b6112d2908418effc6b4bcb60 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 19 Oct 2020 10:29:40 -0500 Subject: [PATCH] publish code and codehash consumed from the updated statediff payload --- .../00016_remove_deployment_from_tx_cids.sql | 7 ++ db/schema.sql | 50 +++++++++- go.mod | 2 +- go.sum | 2 + pkg/eth/converter.go | 13 +-- pkg/eth/indexer.go | 12 +-- pkg/eth/mocks/test_data.go | 97 ++++++++++--------- pkg/eth/models.go | 19 ++-- pkg/eth/publisher.go | 6 -- pkg/eth/transformer.go | 47 +++++---- pkg/eth/transformer_test.go | 10 ++ pkg/prom/prom.go | 23 +++-- pkg/shared/functions.go | 17 ++++ 13 files changed, 201 insertions(+), 104 deletions(-) create mode 100644 db/migrations/00016_remove_deployment_from_tx_cids.sql diff --git a/db/migrations/00016_remove_deployment_from_tx_cids.sql b/db/migrations/00016_remove_deployment_from_tx_cids.sql new file mode 100644 index 000000000..c155b9b0a --- /dev/null +++ b/db/migrations/00016_remove_deployment_from_tx_cids.sql @@ -0,0 +1,7 @@ +-- +goose Up +ALTER TABLE eth.transaction_cids +DROP COLUMN deployment; + +-- +goose Down +ALTER TABLE eth.transaction_cids +ADD COLUMN deployment BOOL NOT NULL DEFAULT FALSE; \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index fcaa900cb..5a10d391a 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -51,6 +51,55 @@ end; $_$; +-- +-- Name: canonical_header(bigint); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.canonical_header(height bigint) RETURNS integer + LANGUAGE plpgsql + AS $$ +DECLARE + current_weight INT; + heaviest_weight INT DEFAULT 0; + heaviest_id INT; + r eth.header_cids%ROWTYPE; +BEGIN + FOR r IN SELECT * FROM eth.header_cids + WHERE block_number = height + LOOP + SELECT INTO current_weight * FROM header_weight(r.block_hash); + IF current_weight > heaviest_weight THEN + heaviest_weight := current_weight; + heaviest_id := r.id; + END IF; + END LOOP; + RETURN heaviest_id; +END +$$; + + +-- +-- Name: header_weight(character varying); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.header_weight(hash character varying) RETURNS bigint + LANGUAGE sql + AS $$ + WITH RECURSIVE validator AS ( + SELECT block_hash, parent_hash, block_number + FROM eth.header_cids + WHERE block_hash = hash + UNION + SELECT eth.header_cids.block_hash, eth.header_cids.parent_hash, eth.header_cids.block_number + FROM eth.header_cids + INNER JOIN validator + ON eth.header_cids.parent_hash = validator.block_hash + AND eth.header_cids.block_number = validator.block_number + 1 + ) + SELECT COUNT(*) FROM validator; +$$; + + SET default_tablespace = ''; SET default_table_access_method = heap; @@ -271,7 +320,6 @@ CREATE TABLE eth.transaction_cids ( mh_key text NOT NULL, dst character varying(66) NOT NULL, src character varying(66) NOT NULL, - deployment boolean NOT NULL, tx_data bytea ); diff --git a/go.mod b/go.mod index 118167b6a..630ea14a3 100644 --- a/go.mod +++ b/go.mod @@ -29,4 +29,4 @@ require ( golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a // indirect ) -replace github.com/ethereum/go-ethereum v1.9.11 => github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.5 +replace github.com/ethereum/go-ethereum v1.9.11 => github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.8 diff --git a/go.sum b/go.sum index ffc26c0d3..0b8a08fbf 100644 --- a/go.sum +++ b/go.sum @@ -935,6 +935,8 @@ github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49u github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.5 h1:U+BqhjRLR22e9OEm8cgWC3Eq3bh8G6azjNpXeenfCG4= github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.5/go.mod h1:7oC0Ni6dosMv5pxMigm6s0hN8g4haJMBnqmmo0D9YfQ= +github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.8 h1:7TK52k55uvSl+1SCKYYFelzH1NrpvEcDrpeU9nUDIpI= +github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.8/go.mod h1:7oC0Ni6dosMv5pxMigm6s0hN8g4haJMBnqmmo0D9YfQ= github.com/vulcanize/pg-ipfs-ethdb v0.0.1-alpha/go.mod h1:OuqE4r2LGWAtDVx3s1yaAzDcwy+LEAqrWaE1L8UfrGY= github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= diff --git a/pkg/eth/converter.go b/pkg/eth/converter.go index af74d55f5..a42f322e9 100644 --- a/pkg/eth/converter.go +++ b/pkg/eth/converter.go @@ -99,9 +99,7 @@ func (pc *PayloadConverter) Convert(payload statediff.Payload) (*ConvertedPayloa // This is the contract address if this receipt is for a contract creation tx contract := shared.HandleZeroAddr(receipt.ContractAddress) var contractHash string - deployment := false if contract != "" { - deployment = true contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() } // receipt and rctMeta will have same indexes @@ -123,12 +121,11 @@ func (pc *PayloadConverter) Convert(payload statediff.Payload) (*ConvertedPayloa } // txMeta will have same index as its corresponding trx in the convertedPayload.BlockBody convertedPayload.TxMetaData = append(convertedPayload.TxMetaData, TxModel{ - Dst: shared.HandleZeroAddrPointer(trx.To()), - Src: shared.HandleZeroAddr(from), - TxHash: trx.Hash().String(), - Index: int64(i), - Data: trx.Data(), - Deployment: deployment, + Dst: shared.HandleZeroAddrPointer(trx.To()), + Src: shared.HandleZeroAddr(from), + TxHash: trx.Hash().String(), + Index: int64(i), + Data: trx.Data(), }) } diff --git a/pkg/eth/indexer.go b/pkg/eth/indexer.go index 952f6634a..d029081fd 100644 --- a/pkg/eth/indexer.go +++ b/pkg/eth/indexer.go @@ -111,10 +111,10 @@ func (in *CIDIndexer) indexUncleCID(tx *sqlx.Tx, uncle UncleModel, headerID int6 func (in *CIDIndexer) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload CIDPayload, headerID int64) error { for _, trxCidMeta := range payload.TransactionCIDs { var txID int64 - err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, deployment) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data, deployment) = ($3, $4, $5, $6, $7, $8, $9) + err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data) = ($3, $4, $5, $6, $7, $8) RETURNING id`, - headerID, trxCidMeta.TxHash, trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src, trxCidMeta.Index, trxCidMeta.MhKey, trxCidMeta.Data, trxCidMeta.Deployment).Scan(&txID) + headerID, trxCidMeta.TxHash, trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src, trxCidMeta.Index, trxCidMeta.MhKey, trxCidMeta.Data).Scan(&txID) if err != nil { return err } @@ -131,10 +131,10 @@ func (in *CIDIndexer) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload CIDPay func (in *CIDIndexer) indexTransactionCID(tx *sqlx.Tx, transaction TxModel, headerID int64) (int64, error) { var txID int64 - err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, deployment) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data, deployment) = ($3, $4, $5, $6, $7, $8, $9) + err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data) = ($3, $4, $5, $6, $7, $8) RETURNING id`, - headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Deployment).Scan(&txID) + headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data).Scan(&txID) if err == nil { prom.TransactionInc() } diff --git a/pkg/eth/mocks/test_data.go b/pkg/eth/mocks/test_data.go index 800ece54c..abb174fee 100644 --- a/pkg/eth/mocks/test_data.go +++ b/pkg/eth/mocks/test_data.go @@ -63,6 +63,7 @@ var ( ContractAddress = crypto.CreateAddress(SenderAddr, MockTransactions[2].Nonce()) ContractHash = crypto.Keccak256Hash(ContractAddress.Bytes()).String() MockContractByteCode = []byte{0, 1, 2, 3, 4, 5} + MockCodeHash = crypto.Keccak256Hash(MockContractByteCode) mockTopic11 = common.HexToHash("0x04") mockTopic12 = common.HexToHash("0x06") mockTopic21 = common.HexToHash("0x05") @@ -99,66 +100,60 @@ var ( StorageMhKey = shared.MultihashKeyFromCID(StorageCID) MockTrxMeta = []eth.TxModel{ { - CID: "", // This is empty until we go to publish to ipfs - MhKey: "", - Src: SenderAddr.Hex(), - Dst: Address.String(), - Index: 0, - TxHash: MockTransactions[0].Hash().String(), - Data: []byte{}, - Deployment: false, + CID: "", // This is empty until we go to publish to ipfs + MhKey: "", + Src: SenderAddr.Hex(), + Dst: Address.String(), + Index: 0, + TxHash: MockTransactions[0].Hash().String(), + Data: []byte{}, }, { - CID: "", - MhKey: "", - Src: SenderAddr.Hex(), - Dst: AnotherAddress.String(), - Index: 1, - TxHash: MockTransactions[1].Hash().String(), - Data: []byte{}, - Deployment: false, + CID: "", + MhKey: "", + Src: SenderAddr.Hex(), + Dst: AnotherAddress.String(), + Index: 1, + TxHash: MockTransactions[1].Hash().String(), + Data: []byte{}, }, { - CID: "", - MhKey: "", - Src: SenderAddr.Hex(), - Dst: "", - Index: 2, - TxHash: MockTransactions[2].Hash().String(), - Data: MockContractByteCode, - Deployment: true, + CID: "", + MhKey: "", + Src: SenderAddr.Hex(), + Dst: "", + Index: 2, + TxHash: MockTransactions[2].Hash().String(), + Data: MockContractByteCode, }, } MockTrxMetaPostPublsh = []eth.TxModel{ { - CID: Trx1CID.String(), // This is empty until we go to publish to ipfs - MhKey: Trx1MhKey, - Src: SenderAddr.Hex(), - Dst: Address.String(), - Index: 0, - TxHash: MockTransactions[0].Hash().String(), - Data: []byte{}, - Deployment: false, + CID: Trx1CID.String(), // This is empty until we go to publish to ipfs + MhKey: Trx1MhKey, + Src: SenderAddr.Hex(), + Dst: Address.String(), + Index: 0, + TxHash: MockTransactions[0].Hash().String(), + Data: []byte{}, }, { - CID: Trx2CID.String(), - MhKey: Trx2MhKey, - Src: SenderAddr.Hex(), - Dst: AnotherAddress.String(), - Index: 1, - TxHash: MockTransactions[1].Hash().String(), - Data: []byte{}, - Deployment: false, + CID: Trx2CID.String(), + MhKey: Trx2MhKey, + Src: SenderAddr.Hex(), + Dst: AnotherAddress.String(), + Index: 1, + TxHash: MockTransactions[1].Hash().String(), + Data: []byte{}, }, { - CID: Trx3CID.String(), - MhKey: Trx3MhKey, - Src: SenderAddr.Hex(), - Dst: "", - Index: 2, - TxHash: MockTransactions[2].Hash().String(), - Data: MockContractByteCode, - Deployment: true, + CID: Trx3CID.String(), + MhKey: Trx3MhKey, + Src: SenderAddr.Hex(), + Dst: "", + Index: 2, + TxHash: MockTransactions[2].Hash().String(), + Data: MockContractByteCode, }, } MockRctMeta = []eth.ReceiptModel{ @@ -313,6 +308,12 @@ var ( BlockNumber: new(big.Int).Set(BlockNumber), BlockHash: MockBlock.Hash(), Nodes: StateDiffs, + CodeAndCodeHashes: []statediff.CodeAndCodeHash{ + { + Code: MockContractByteCode, + Hash: MockCodeHash, + }, + }, } MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff) MockStateNodes = []eth.TrieNode{ diff --git a/pkg/eth/models.go b/pkg/eth/models.go index 6ded3a672..bc464df57 100644 --- a/pkg/eth/models.go +++ b/pkg/eth/models.go @@ -51,16 +51,15 @@ type UncleModel struct { // TxModel is the db model for eth.transaction_cids type TxModel struct { - ID int64 `db:"id"` - HeaderID int64 `db:"header_id"` - Index int64 `db:"index"` - TxHash string `db:"tx_hash"` - CID string `db:"cid"` - MhKey string `db:"mh_key"` - Dst string `db:"dst"` - Src string `db:"src"` - Data []byte `db:"tx_data"` - Deployment bool `db:"deployment"` + ID int64 `db:"id"` + HeaderID int64 `db:"header_id"` + Index int64 `db:"index"` + TxHash string `db:"tx_hash"` + CID string `db:"cid"` + MhKey string `db:"mh_key"` + Dst string `db:"dst"` + Src string `db:"src"` + Data []byte `db:"tx_data"` } // ReceiptModel is the db model for eth.receipt_cids diff --git a/pkg/eth/publisher.go b/pkg/eth/publisher.go index 97db11d8b..2e52aff76 100644 --- a/pkg/eth/publisher.go +++ b/pkg/eth/publisher.go @@ -145,12 +145,6 @@ func (pub *IPLDPublisher) Publish(payload ConvertedPayload) error { if err != nil { return err } - // If tx is a contract deployment, publish the data (code) - if txModel.Deployment { // codec doesn't matter in this case sine we are not interested in the cid and the db key is multihash-derived - if _, err = shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, txModel.Data); err != nil { - return err - } - } rctModel := payload.ReceiptMetaData[i] rctModel.CID = rctNode.Cid().String() rctModel.MhKey = shared.MultihashKeyFromCID(rctNode.Cid()) diff --git a/pkg/eth/transformer.go b/pkg/eth/transformer.go index 842dc5c03..570f63f0c 100644 --- a/pkg/eth/transformer.go +++ b/pkg/eth/transformer.go @@ -168,6 +168,13 @@ func (sdt *StateDiffTransformer) Transform(workerID int, payload statediff.Paylo prom.SetTimeMetric("t_state_store_processing", tDiff) traceMsg += fmt.Sprintf("state and storage processing time: %s\r\n", tDiff.String()) t = time.Now() + if err := sdt.processCodeAndCodeHashes(tx, stateDiff.CodeAndCodeHashes); err != nil { + return 0, err + } + tDiff = time.Now().Sub(t) + prom.SetTimeMetric("t_code_codehash_processing", tDiff) + traceMsg += fmt.Sprintf("code and codehash processing time: %s\r\n", tDiff.String()) + t = time.Now() return height, err // return error explicity so that the defer() assigns to it } @@ -276,27 +283,18 @@ func (sdt *StateDiffTransformer) processReceiptsAndTxs(tx *sqlx.Tx, args process // this is the contract address if this receipt is for a contract creation tx contract := shared.HandleZeroAddr(receipt.ContractAddress) var contractHash string - isDeployment := contract != "" - if isDeployment { + if contract != "" { contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() - // if tx is a contract deployment, publish the data (code) - // codec doesn't matter in this case sine we are not interested in the cid and the db key is multihash-derived - // TODO: THE DATA IS NOT DIRECTLY THE CONTRACT CODE; THERE IS A MISSING PROCESSING STEP HERE - // the contractHash => contract code is not currently correct - if _, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, trx.Data()); err != nil { - return err - } } // index tx first so that the receipt can reference it by FK txModel := TxModel{ - Dst: shared.HandleZeroAddrPointer(trx.To()), - Src: shared.HandleZeroAddr(from), - TxHash: trx.Hash().String(), - Index: int64(i), - Data: trx.Data(), - Deployment: isDeployment, - CID: txNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(txNode.Cid()), + Dst: shared.HandleZeroAddrPointer(trx.To()), + Src: shared.HandleZeroAddr(from), + TxHash: trx.Hash().String(), + Index: int64(i), + Data: trx.Data(), + CID: txNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(txNode.Cid()), } txID, err := sdt.indexer.indexTransactionCID(tx, txModel, args.headerID) if err != nil { @@ -386,3 +384,18 @@ func (sdt *StateDiffTransformer) processStateAndStorage(tx *sqlx.Tx, headerID in } return nil } + +// processCodeAndCodeHashes publishes code and codehash pairs to the ipld database +func (sdt *StateDiffTransformer) processCodeAndCodeHashes(tx *sqlx.Tx, codeAndCodeHashes []statediff.CodeAndCodeHash) error { + for _, c := range codeAndCodeHashes { + // codec doesn't matter since db key is multihash-based + mhKey, err := shared.MultihashKeyFromKeccak256(c.Hash) + if err != nil { + return err + } + if err := shared.PublishDirect(tx, mhKey, c.Code); err != nil { + return err + } + } + return nil +} diff --git a/pkg/eth/transformer_test.go b/pkg/eth/transformer_test.go index a6daef768..aa79b7184 100644 --- a/pkg/eth/transformer_test.go +++ b/pkg/eth/transformer_test.go @@ -224,5 +224,15 @@ var _ = Describe("PublishAndIndexer", func() { Expect(err).ToNot(HaveOccurred()) Expect(data).To(Equal(mocks.StorageLeafNode)) }) + + It("Publishes code and codehash", func() { + code := make([]byte, 0) + key, err := shared.MultihashKeyFromKeccak256(mocks.MockCodeHash) + Expect(err).ToNot(HaveOccurred()) + pgStr := `SELECT data FROM public.blocks WHERE key = $1` + err = db.Get(&code, pgStr, key) + Expect(err).ToNot(HaveOccurred()) + Expect(code).To(Equal(mocks.MockContractByteCode)) + }) }) }) diff --git a/pkg/prom/prom.go b/pkg/prom/prom.go index a93a4d438..f50a59bc7 100644 --- a/pkg/prom/prom.go +++ b/pkg/prom/prom.go @@ -19,13 +19,14 @@ var ( lenPayloadChan prometheus.Gauge - tPayloadDecode prometheus.Histogram - tFreePostgres prometheus.Histogram - tPostgresCommit prometheus.Histogram - tHeaderProcessing prometheus.Histogram - tUncleProcessing prometheus.Histogram - tTxAndRecProcessing prometheus.Histogram - tStateAndStoreProcessing prometheus.Histogram + tPayloadDecode prometheus.Histogram + tFreePostgres prometheus.Histogram + tPostgresCommit prometheus.Histogram + tHeaderProcessing prometheus.Histogram + tUncleProcessing prometheus.Histogram + tTxAndRecProcessing prometheus.Histogram + tStateAndStoreProcessing prometheus.Histogram + tCodeAndCodeHashProcessing prometheus.Histogram ) // Init module initialization @@ -96,6 +97,12 @@ func Init() { Name: "t_state_store_processing", Help: "State and storage processing time", }) + tCodeAndCodeHashProcessing = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: statsSubsystem, + Name: "t_code_codehash_processing", + Help: "Code and codehash processing time", + }) } // RegisterDBCollector create metric colletor for given connection @@ -154,5 +161,7 @@ func SetTimeMetric(name string, t time.Duration) { tTxAndRecProcessing.Observe(tAsF64) case "t_state_store_processing": tStateAndStoreProcessing.Observe(tAsF64) + case "t_code_codehash_processing": + tCodeAndCodeHashProcessing.Observe(tAsF64) } } diff --git a/pkg/shared/functions.go b/pkg/shared/functions.go index 3a51e53a6..cb63108b5 100644 --- a/pkg/shared/functions.go +++ b/pkg/shared/functions.go @@ -23,6 +23,7 @@ import ( "github.com/ipfs/go-ipfs-ds-help" node "github.com/ipfs/go-ipld-format" "github.com/jmoiron/sqlx" + "github.com/multiformats/go-multihash" "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-eth-indexer/pkg/ipfs/ipld" ) @@ -104,3 +105,19 @@ func PublishRaw(tx *sqlx.Tx, codec, mh uint64, raw []byte) (string, error) { _, err = tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw) return c.String(), err } + +// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string +func MultihashKeyFromKeccak256(hash common.Hash) (string, error) { + mh, err := multihash.Encode(hash.Bytes(), multihash.KECCAK_256) + if err != nil { + return "", err + } + dbKey := dshelp.MultihashToDsKey(mh) + return blockstore.BlockPrefix.String() + dbKey.String(), nil +} + +// PublishDirect diretly writes a previously derived mhkey => value pair to the ipld database +func PublishDirect(tx *sqlx.Tx, key string, value []byte) error { + _, err := tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, key, value) + return err +}