Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
VDB-919 Generalise converter (#152)
Browse files Browse the repository at this point in the history
* Generalise transformer stack to use InsertionModel

* Add tests for event repository

* Restrict accepted values in InsertionModel

* Add call to repository.SetDB

* Improve error propagation/clarity on GetABI()

* Remove maker references in example

* Please golint

* refactor rollback error handling in repository

* Cleaner errors in repository, refactor tests
  • Loading branch information
m0ar authored Oct 28, 2019
1 parent 6c055a9 commit f7c4a67
Show file tree
Hide file tree
Showing 13 changed files with 460 additions and 76 deletions.
6 changes: 6 additions & 0 deletions integration_test/integration_test_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package integration_test

import (
"github.com/sirupsen/logrus"
"io/ioutil"
"testing"

. "github.com/onsi/ginkgo"
Expand All @@ -27,3 +29,7 @@ func TestIntegrationTest(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "IntegrationTest Suite")
}

var _ = BeforeSuite(func() {
logrus.SetOutput(ioutil.Discard)
})
10 changes: 7 additions & 3 deletions libraries/shared/factories/event/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package event

import "github.com/vulcanize/vulcanizedb/pkg/core"
import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)

// Converter transforms log data into general InsertionModels the Repository can persist__
type Converter interface {
ToEntities(contractAbi string, ethLog []core.HeaderSyncLog) ([]interface{}, error)
ToModels([]interface{}) ([]interface{}, error)
ToModels(contractAbi string, ethLog []core.HeaderSyncLog) ([]InsertionModel, error)
SetDB(db *postgres.DB)
}
158 changes: 156 additions & 2 deletions libraries/shared/factories/event/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,163 @@

package event

import "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
import (
"database/sql/driver"
"fmt"
"github.com/vulcanize/vulcanizedb/utils"
"strings"

"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)

const SetLogTransformedQuery = `UPDATE public.header_sync_logs SET transformed = true WHERE id = $1`

// Repository persists transformed values to the DB
type Repository interface {
Create(models []interface{}) error
Create(models []InsertionModel) error
SetDB(db *postgres.DB)
}

// LogFK is the name of log foreign key columns
const LogFK ColumnName = "log_id"

// AddressFK is the name of address foreign key columns
const AddressFK ColumnName = "address_id"

// HeaderFK is the name of header foreign key columns
const HeaderFK ColumnName = "header_id"

// SchemaName is the schema to work with
type SchemaName string

// TableName identifies the table for inserting the data
type TableName string

// ColumnName identifies columns on the given table
type ColumnName string

// ColumnValues maps a column to the value for insertion. This is restricted to []byte, bool, float64, int64, string, time.Time
type ColumnValues map[ColumnName]interface{}

// ErrUnsupportedValue is thrown when a model supplies a type of value the postgres driver cannot handle.
var ErrUnsupportedValue = func(value interface{}) error {
return fmt.Errorf("unsupported type of value supplied in model: %v (%T)", value, value)
}

// InsertionModel is the generalised data structure a converter returns, and contains everything the repository needs to
// persist the converted data.
type InsertionModel struct {
SchemaName SchemaName
TableName TableName
OrderedColumns []ColumnName // Defines the fields to insert, and in which order the table expects them
ColumnValues ColumnValues // Associated values for columns, restricted to []byte, bool, float64, int64, string, time.Time
}

// ModelToQuery stores memoised insertion queries to minimise computation
var ModelToQuery = map[string]string{}

// GetMemoizedQuery gets/creates a DB insertion query for the model
func GetMemoizedQuery(model InsertionModel) string {
// The schema and table name uniquely determines the insertion query, use that for memoization
queryKey := string(model.SchemaName) + string(model.TableName)
query, queryMemoized := ModelToQuery[queryKey]
if !queryMemoized {
query = GenerateInsertionQuery(model)
ModelToQuery[queryKey] = query
}
return query
}

// GenerateInsertionQuery creates an SQL insertion query from an insertion model.
// Should be called through GetMemoizedQuery, so the query is not generated on each call to Create.
func GenerateInsertionQuery(model InsertionModel) string {
var valuePlaceholders []string
var updateOnConflict []string
for i := 0; i < len(model.OrderedColumns); i++ {
valuePlaceholder := fmt.Sprintf("$%d", 1+i)
valuePlaceholders = append(valuePlaceholders, valuePlaceholder)
updateOnConflict = append(updateOnConflict,
fmt.Sprintf("%s = %s", model.OrderedColumns[i], valuePlaceholder))
}

baseQuery := `INSERT INTO %v.%v (%v) VALUES(%v)
ON CONFLICT (header_id, log_id) DO UPDATE SET %v;`

return fmt.Sprintf(baseQuery,
model.SchemaName,
model.TableName,
joinOrderedColumns(model.OrderedColumns),
strings.Join(valuePlaceholders, ", "),
strings.Join(updateOnConflict, ", "))
}

/*
Create generates an insertion query and persists to the DB, given a slice of InsertionModels.
ColumnValues are restricted to []byte, bool, float64, int64, string, time.Time.
testModel = shared.InsertionModel{
SchemaName: "public"
TableName: "testEvent",
OrderedColumns: []string{"header_id", "log_id", "variable1"},
ColumnValues: ColumnValues{
"header_id": 303
"log_id": "808",
"variable1": "value1",
},
}
*/
func Create(models []InsertionModel, db *postgres.DB) error {
if len(models) == 0 {
return fmt.Errorf("repository got empty model slice")
}

tx, dbErr := db.Beginx()
if dbErr != nil {
return dbErr
}

for _, model := range models {
// Maps can't be iterated over in a reliable manner, so we rely on OrderedColumns to define the order to insert
// tx.Exec is variadically typed in the args, so if we wrap in []interface{} we can apply them all automatically
var args []interface{}
for _, col := range model.OrderedColumns {
value := model.ColumnValues[col]
// Check whether or not PG can accept the type of value in the model
okPgValue := driver.IsValue(value)
if !okPgValue {
logrus.WithField("model", model).Errorf("PG cannot handle value of this type: %T", value)
return ErrUnsupportedValue(value)
}
args = append(args, value)
}

insertionQuery := GetMemoizedQuery(model)
_, execErr := tx.Exec(insertionQuery, args...) // couldn't pass varying types in bulk with args :: []string

if execErr != nil {
rollbackErr := tx.Rollback()
if rollbackErr != nil {
logrus.Error("failed to rollback ", rollbackErr)
}
return execErr
}

_, logErr := tx.Exec(SetLogTransformedQuery, model.ColumnValues[LogFK])

if logErr != nil {
utils.RollbackAndLogFailure(tx, logErr, "header_sync_logs.transformed")
return logErr
}
}

return tx.Commit()
}

func joinOrderedColumns(columns []ColumnName) string {
var stringColumns []string
for _, columnName := range columns {
stringColumns = append(stringColumns, string(columnName))
}
return strings.Join(stringColumns, ", ")
}
Loading

0 comments on commit f7c4a67

Please sign in to comment.