Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
(VDB-354) Queue unrecognized storage diffs
Browse files Browse the repository at this point in the history
- If we recognize a storage diff as coming from a watched contract but
  don't recognize the key, queue it for retrying later (after we've seen
  an event that might help us recognize the key)
- Remove unused errs and args
- Panic on unrecognized types (should not happen)
  • Loading branch information
rmulhol committed Feb 20, 2019
1 parent 238a674 commit 59bed23
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 78 deletions.
12 changes: 12 additions & 0 deletions db/migrations/20190219134901_create_queued_storage.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- +goose Up
CREATE TABLE public.queued_storage (
id SERIAL PRIMARY KEY,
block_height BIGINT,
block_hash BYTEA,
contract BYTEA,
storage_key BYTEA,
storage_value BYTEA
);

-- +goose Down
DROP TABLE public.queued_storage;
53 changes: 51 additions & 2 deletions db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
-- PostgreSQL database dump
--

-- Dumped from database version 10.5
-- Dumped by pg_dump version 10.5
-- Dumped from database version 10.6
-- Dumped by pg_dump version 10.6

SET statement_timeout = 0;
SET lock_timeout = 0;
Expand Down Expand Up @@ -2643,6 +2643,40 @@ CREATE SEQUENCE public.nodes_id_seq
ALTER SEQUENCE public.nodes_id_seq OWNED BY public.eth_nodes.id;


--
-- Name: queued_storage; Type: TABLE; Schema: public; Owner: -
--

CREATE TABLE public.queued_storage (
id integer NOT NULL,
block_height bigint,
block_hash bytea,
contract bytea,
storage_key bytea,
storage_value bytea
);


--
-- Name: queued_storage_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--

CREATE SEQUENCE public.queued_storage_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;


--
-- Name: queued_storage_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--

ALTER SEQUENCE public.queued_storage_id_seq OWNED BY public.queued_storage.id;


--
-- Name: receipts; Type: TABLE; Schema: public; Owner: -
--
Expand Down Expand Up @@ -3321,6 +3355,13 @@ ALTER TABLE ONLY public.log_filters ALTER COLUMN id SET DEFAULT nextval('public.
ALTER TABLE ONLY public.logs ALTER COLUMN id SET DEFAULT nextval('public.logs_id_seq'::regclass);


--
-- Name: queued_storage id; Type: DEFAULT; Schema: public; Owner: -
--

ALTER TABLE ONLY public.queued_storage ALTER COLUMN id SET DEFAULT nextval('public.queued_storage_id_seq'::regclass);


--
-- Name: receipts id; Type: DEFAULT; Schema: public; Owner: -
--
Expand Down Expand Up @@ -4197,6 +4238,14 @@ ALTER TABLE ONLY public.eth_nodes
ADD CONSTRAINT nodes_pkey PRIMARY KEY (id);


--
-- Name: queued_storage queued_storage_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--

ALTER TABLE ONLY public.queued_storage
ADD CONSTRAINT queued_storage_pkey PRIMARY KEY (id);


--
-- Name: receipts receipts_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
Expand Down
26 changes: 26 additions & 0 deletions libraries/shared/storage_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package shared

import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/storage_diffs/shared"
)

type IStorageQueue interface {
Add(row shared.StorageDiffRow) error
}

type StorageQueue struct {
db *postgres.DB
}

func NewStorageQueue(db *postgres.DB) StorageQueue {
return StorageQueue{db: db}
}

func (queue StorageQueue) Add(row shared.StorageDiffRow) error {
_, err := queue.db.Exec(`INSERT INTO public.queued_storage (contract,
block_hash, block_height, storage_key, storage_value) VALUES
($1, $2, $3, $4, $5)`, row.Contract.Bytes(), row.BlockHash.Bytes(),
row.BlockHeight, row.StorageKey.Bytes(), row.StorageValue.Bytes())
return err
}
32 changes: 32 additions & 0 deletions libraries/shared/storage_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package shared_test

import (
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
shared2 "github.com/vulcanize/vulcanizedb/libraries/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/storage_diffs/shared"
"github.com/vulcanize/vulcanizedb/test_config"
)

var _ = Describe("Storage queue", func() {
It("adds a storage row to the db", func() {
row := shared.StorageDiffRow{
Contract: common.HexToAddress("0x123456"),
BlockHash: common.HexToHash("0x678901"),
BlockHeight: 987,
StorageKey: common.HexToHash("0x654321"),
StorageValue: common.HexToHash("0x198765"),
}
db := test_config.NewTestDB(test_config.NewTestNode())
queue := shared2.NewStorageQueue(db)

addErr := queue.Add(row)

Expect(addErr).NotTo(HaveOccurred())
var result shared.StorageDiffRow
getErr := db.Get(&result, `SELECT contract, block_hash, block_height, storage_key, storage_value FROM public.queued_storage`)
Expect(getErr).NotTo(HaveOccurred())
Expect(result).To(Equal(row))
})
})
17 changes: 16 additions & 1 deletion libraries/shared/storage_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared/storage"
"github.com/vulcanize/vulcanizedb/pkg/transformers/storage_diffs/shared"
"reflect"
"strings"

"github.com/vulcanize/vulcanizedb/pkg/fs"
Expand All @@ -30,14 +31,17 @@ import (
type StorageWatcher struct {
db *postgres.DB
tailer fs.Tailer
Queue IStorageQueue
Transformers map[common.Address]storage.Transformer
}

func NewStorageWatcher(tailer fs.Tailer, db *postgres.DB) StorageWatcher {
transformers := make(map[common.Address]storage.Transformer)
queue := NewStorageQueue(db)
return StorageWatcher{
db: db,
tailer: tailer,
Queue: queue,
Transformers: transformers,
}
}
Expand Down Expand Up @@ -66,9 +70,20 @@ func (watcher StorageWatcher) Execute() error {
}
executeErr := transformer.Execute(row)
if executeErr != nil {
logrus.Warn(executeErr.Error())
if isKeyNotFound(executeErr) {
queueErr := watcher.Queue.Add(row)
if queueErr != nil {
logrus.Warn(queueErr.Error())
}
} else {
logrus.Warn(executeErr.Error())
}
continue
}
}
return nil
}

func isKeyNotFound(executeErr error) bool {
return reflect.TypeOf(executeErr) == reflect.TypeOf(shared.ErrStorageKeyNotFound{})
}
123 changes: 80 additions & 43 deletions libraries/shared/storage_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,8 @@ var _ = Describe("Storage Watcher", func() {

It("logs error if no transformer can parse storage row", func() {
mockTailer := fakes.NewMockTailer()
line := &tail.Line{
Text: "12345,block_hash,123,storage_key,storage_value",
Time: time.Time{},
Err: nil,
}
address := common.HexToAddress("0x12345")
line := getFakeLine(address.Bytes())
watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
tempFile, err := ioutil.TempFile("", "log")
defer os.Remove(tempFile.Name())
Expand All @@ -88,22 +85,14 @@ var _ = Describe("Storage Watcher", func() {
Expect(err).NotTo(HaveOccurred())
logContent, readErr := ioutil.ReadFile(tempFile.Name())
Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(shared2.ErrContractNotFound{Contract: common.HexToAddress("0x12345").Hex()}.Error()))
Expect(string(logContent)).To(ContainSubstring(shared2.ErrContractNotFound{Contract: address.Hex()}.Error()))
}, watcher, mockTailer, []*tail.Line{line})
})

It("executes transformer with storage row", func() {
address := []byte{1, 2, 3}
blockHash := []byte{4, 5, 6}
blockHeight := int64(789)
storageKey := []byte{9, 8, 7}
storageValue := []byte{6, 5, 4}
line := getFakeLine(address)
mockTailer := fakes.NewMockTailer()
line := &tail.Line{
Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address), common.Bytes2Hex(blockHash), blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)),
Time: time.Time{},
Err: nil,
}
watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address)}
watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
Expand All @@ -116,40 +105,76 @@ var _ = Describe("Storage Watcher", func() {
}, watcher, mockTailer, []*tail.Line{line})
})

It("logs error if executing transformer fails", func() {
address := []byte{1, 2, 3}
blockHash := []byte{4, 5, 6}
blockHeight := int64(789)
storageKey := []byte{9, 8, 7}
storageValue := []byte{6, 5, 4}
mockTailer := fakes.NewMockTailer()
line := &tail.Line{
Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address), common.Bytes2Hex(blockHash), blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)),
Time: time.Time{},
Err: nil,
}
watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
executionError := errors.New("storage watcher failed attempting to execute transformer")
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: executionError}
watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
tempFile, err := ioutil.TempFile("", "log")
defer os.Remove(tempFile.Name())
Expect(err).NotTo(HaveOccurred())
logrus.SetOutput(tempFile)

assert(func(err error) {
Describe("when executing transformer fails", func() {
It("queues row when error is storage key not found", func() {
address := []byte{1, 2, 3}
line := getFakeLine(address)
mockTailer := fakes.NewMockTailer()
watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
mockQueue := &mocks.MockStorageQueue{}
watcher.Queue = mockQueue
keyNotFoundError := shared2.ErrStorageKeyNotFound{Key: "unknown_storage_key"}
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError}
watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})

assert(func(err error) {
Expect(err).NotTo(HaveOccurred())
Expect(mockQueue.AddCalled).To(BeTrue())
}, watcher, mockTailer, []*tail.Line{line})
})

It("logs error if queuing row fails", func() {
address := []byte{1, 2, 3}
line := getFakeLine(address)
mockTailer := fakes.NewMockTailer()
watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
mockQueue := &mocks.MockStorageQueue{}
mockQueue.AddError = fakes.FakeError
watcher.Queue = mockQueue
keyNotFoundError := shared2.ErrStorageKeyNotFound{Key: "unknown_storage_key"}
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError}
watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
tempFile, err := ioutil.TempFile("", "log")
defer os.Remove(tempFile.Name())
Expect(err).NotTo(HaveOccurred())
logContent, readErr := ioutil.ReadFile(tempFile.Name())
Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(executionError.Error()))
}, watcher, mockTailer, []*tail.Line{line})
logrus.SetOutput(tempFile)

assert(func(err error) {
Expect(err).NotTo(HaveOccurred())
Expect(mockQueue.AddCalled).To(BeTrue())
logContent, readErr := ioutil.ReadFile(tempFile.Name())
Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error()))
}, watcher, mockTailer, []*tail.Line{line})
})

It("logs any other error", func() {
address := []byte{1, 2, 3}
line := getFakeLine(address)
mockTailer := fakes.NewMockTailer()
watcher := shared.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
executionError := errors.New("storage watcher failed attempting to execute transformer")
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: executionError}
watcher.AddTransformers([]storage.TransformerInitializer{fakeTransformer.FakeTransformerInitializer})
tempFile, err := ioutil.TempFile("", "log")
defer os.Remove(tempFile.Name())
Expect(err).NotTo(HaveOccurred())
logrus.SetOutput(tempFile)

assert(func(err error) {
Expect(err).NotTo(HaveOccurred())
logContent, readErr := ioutil.ReadFile(tempFile.Name())
Expect(readErr).NotTo(HaveOccurred())
Expect(string(logContent)).To(ContainSubstring(executionError.Error()))
}, watcher, mockTailer, []*tail.Line{line})
})
})
})

func assert(assertion func(err error), watcher shared.StorageWatcher, mockTailer *fakes.MockTailer, lines []*tail.Line) {
errs := make(chan error, 1)
done := make(chan bool, 1)
go execute(watcher, mockTailer, errs, done)
go execute(watcher, errs, done)
for _, line := range lines {
mockTailer.Lines <- line
}
Expand All @@ -165,11 +190,23 @@ func assert(assertion func(err error), watcher shared.StorageWatcher, mockTailer
}
}

func execute(watcher shared.StorageWatcher, tailer *fakes.MockTailer, errs chan error, done chan bool) {
func execute(watcher shared.StorageWatcher, errs chan error, done chan bool) {
err := watcher.Execute()
if err != nil {
errs <- err
} else {
done <- true
}
}

func getFakeLine(address []byte) *tail.Line {
blockHash := []byte{4, 5, 6}
blockHeight := int64(789)
storageKey := []byte{9, 8, 7}
storageValue := []byte{6, 5, 4}
return &tail.Line{
Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address), common.Bytes2Hex(blockHash), blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)),
Time: time.Time{},
Err: nil,
}
}
3 changes: 2 additions & 1 deletion pkg/transformers/storage_diffs/shared/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package shared

import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"math/big"
)
Expand All @@ -30,7 +31,7 @@ func Decode(row StorageDiffRow, metadata StorageValueMetadata) (interface{}, err
case Bytes32:
return row.StorageValue.Hex(), nil
default:
return nil, ErrTypeNotFound{}
panic(fmt.Sprintf("can't decode unknown type: %d", metadata.Type))
}
}

Expand Down
Loading

0 comments on commit 59bed23

Please sign in to comment.