Skip to content

Commit

Permalink
Merge pull request #3 from parsyl/lookupschema
Browse files Browse the repository at this point in the history
added a few tweaks to make reading work
  • Loading branch information
cswank authored Jan 30, 2019
2 parents a75000e + 45752e9 commit f46a756
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 49 deletions.
52 changes: 43 additions & 9 deletions cmd/parquetgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
var (
typ = flag.String("type", "", "type name")
pkg = flag.String("package", "", "package name")
imp = flag.String("import", "", "the type's import statement (only if it doesn't live in 'package')")
pth = flag.String("input", "", "path to the go file that defines -type")
)

Expand All @@ -21,17 +22,15 @@ func main() {
i := input{
Package: *pkg,
Type: *typ,
Import: *imp,
}

var err error
var imp string
i.Fields, imp, err = parse.Fields(*typ, *pth)
i.Fields, err = parse.Fields(*typ, *pth)
if err != nil {
log.Fatal(err)
}

i.Import = imp

tmpl, err := template.New("output").Parse(tpl)
if err != nil {
log.Fatal(err)
Expand All @@ -57,7 +56,7 @@ type input struct {
Fields []string
}

var tpl = `package main
var tpl = `package {{.Package}}
// This code is generated by github.com/parsyl/parquet.
Expand All @@ -69,7 +68,7 @@ import (
"github.com/golang/snappy"
"github.com/parsyl/parquet"
{{.Import}}
"{{.Import}}"
)
// ParquetWriter reprents a row group
Expand Down Expand Up @@ -314,6 +313,9 @@ func (f *Uint32Field) Schema() parquet.Field {
}
func (f *Uint32Field) Scan(r *{{.Type}}) {
if len(f.vals) == 0 {
return
}
v := f.vals[0]
f.vals = f.vals[1:]
f.read(r, v)
Expand Down Expand Up @@ -455,6 +457,10 @@ func (f *Int32Field) Add(r {{.Type}}) {
}
func (f *Int32Field) Scan(r *{{.Type}}) {
if len(f.vals) == 0 {
return
}
v := f.vals[0]
f.vals = f.vals[1:]
f.read(r, v)
Expand Down Expand Up @@ -546,6 +552,10 @@ func (f *Int64Field) Schema() parquet.Field {
}
func (f *Int64Field) Scan(r *{{.Type}}) {
if len(f.vals) == 0 {
return
}
v := f.vals[0]
f.vals = f.vals[1:]
f.read(r, v)
Expand Down Expand Up @@ -685,6 +695,10 @@ func (f *Uint64Field) Read(r io.ReadSeeker, meta *parquet.Metadata, pos parquet.
}
func (f *Uint64Field) Scan(r *{{.Type}}) {
if len(f.vals) == 0 {
return
}
v := f.vals[0]
f.vals = f.vals[1:]
f.read(r, v)
Expand Down Expand Up @@ -802,6 +816,10 @@ func (f *Float32Field) Read(r io.ReadSeeker, meta *parquet.Metadata, pos parquet
}
func (f *Float32Field) Scan(r *{{.Type}}) {
if len(f.vals) == 0 {
return
}
v := f.vals[0]
f.vals = f.vals[1:]
f.read(r, v)
Expand Down Expand Up @@ -898,6 +916,10 @@ func (f *BoolField) Schema() parquet.Field {
}
func (f *BoolField) Scan(r *{{.Type}}) {
if len(f.vals) == 0 {
return
}
v := f.vals[0]
f.vals = f.vals[1:]
f.read(r, v)
Expand Down Expand Up @@ -1027,6 +1049,10 @@ func (f *StringField) Schema() parquet.Field {
}
func (f *StringField) Scan(r *{{.Type}}) {
if len(f.vals) == 0 {
return
}
v := f.vals[0]
f.vals = f.vals[1:]
f.read(r, v)
Expand Down Expand Up @@ -1144,7 +1170,7 @@ func (f *StringOptionalField) Read(r io.ReadSeeker, meta *parquet.Metadata, pos
}
for j := 0; j < pos.N; j++ {
if f.defs[0] == 0 {
if f.defs[j] == 0 {
continue
}
Expand All @@ -1153,7 +1179,7 @@ func (f *StringOptionalField) Read(r io.ReadSeeker, meta *parquet.Metadata, pos
return err
}
s := make([]byte, x)
if _, err := r.Read(s); err != nil {
if _, err := rr.Read(s); err != nil {
return err
}
Expand Down Expand Up @@ -1212,7 +1238,11 @@ func NewParquetReader(r io.ReadSeeker, opts ...func(*ParquetReader)) (*ParquetRe
return nil, err
}
pr.rows = pr.meta.Rows()
pr.offsets = pr.meta.Offsets()
var err error
pr.offsets, err = pr.meta.Offsets()
if err != nil {
return nil, err
}
}
_, err := r.Seek(4, io.SeekStart)
Expand Down Expand Up @@ -1261,6 +1291,10 @@ type ParquetReader struct {
meta *parquet.Metadata
}
func (p *ParquetReader) Rows() int64 {
return p.meta.Rows()
}
func (p *ParquetReader) Next() bool {
if p.cur >= p.rows {
return false
Expand Down
28 changes: 4 additions & 24 deletions internal/parse/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (f field) getFieldName() string {
// Fields gets the fields of the given struct.
// pth must be a go file that defines the typ struct.
// Any embedded structs must also be in that same file.
func Fields(typ, pth string) ([]string, string, error) {
func Fields(typ, pth string) ([]string, error) {
fullTyp := typ
typ = getType(fullTyp)

Expand All @@ -46,17 +46,12 @@ func Fields(typ, pth string) ([]string, string, error) {
ast.Walk(visitorFunc(f.findTypes), file)

if f.n == nil {
return nil, "", fmt.Errorf("could not find %s", typ)
return nil, fmt.Errorf("could not find %s", typ)
}

fields, err := doGetFields(f.n)
if err != nil {
return nil, "", err
}

imp, err := doGetImport(f.n)
if err != nil {
return nil, "", err
return nil, err
}

var out []field
Expand All @@ -72,7 +67,7 @@ func Fields(typ, pth string) ([]string, string, error) {
}
}

return formatFields(fullTyp, out), imp, nil
return formatFields(fullTyp, out), nil
}

func getType(typ string) string {
Expand Down Expand Up @@ -121,21 +116,6 @@ func doGetFields(n map[string]ast.Node) (map[string][]field, error) {
return fields, nil
}

func doGetImport(n map[string]ast.Node) (string, error) {
var imp string
for _, n := range n {
ast.Inspect(n, func(n ast.Node) bool {
switch x := n.(type) {
case *ast.ImportSpec:
imp = fmt.Sprintf("%s", x.Name)
return false
}
return true
})
}
return imp, nil
}

func getField(name string, x ast.Node) (field, error) {
var typ, tag string
var optional bool
Expand Down
29 changes: 18 additions & 11 deletions parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ type Field struct {
RepetitionType FieldFunc
}

type schema struct {
schema []*sch.SchemaElement
lookup map[string]sch.SchemaElement
}

type Metadata struct {
ts *thrift.TSerializer
schema []*sch.SchemaElement
schema schema
rows int64
rowGroups []rowGroup

Expand Down Expand Up @@ -88,8 +93,8 @@ func (m *Metadata) updateRowGroup(col string, pos int64, dataLen, compressedLen,
return err
}

func columnType(col string, fields []*sch.SchemaElement) (sch.Type, error) {
for _, f := range fields {
func columnType(col string, fields schema) (sch.Type, error) {
for _, f := range fields.schema {
if f.Name == col {
return *f.Type, nil
}
Expand All @@ -105,7 +110,7 @@ func (m *Metadata) Rows() int64 {
func (m *Metadata) Footer(w io.Writer) error {
rgs := make([]*sch.RowGroup, len(m.rowGroups))
for i, rg := range m.rowGroups {
for _, col := range rg.fields {
for _, col := range rg.fields.schema {
if col.Name == "root" {
continue
}
Expand All @@ -119,14 +124,14 @@ func (m *Metadata) Footer(w io.Writer) error {
rg.rowGroup.Columns = append(rg.rowGroup.Columns, &ch)
}

rg.rowGroup.NumRows = rg.rowGroup.NumRows / int64(len(rg.fields)-1)
rg.rowGroup.NumRows = rg.rowGroup.NumRows / int64(len(rg.fields.schema)-1)
rgs[i] = &rg.rowGroup
}

f := &sch.FileMetaData{
Version: 1,
Schema: m.schema,
NumRows: m.rows / int64(len(m.schema)-1),
Schema: m.schema.schema,
NumRows: m.rows / int64(len(m.schema.schema)-1),
RowGroups: rgs,
}

Expand All @@ -144,13 +149,13 @@ func (m *Metadata) Footer(w io.Writer) error {
}

type rowGroup struct {
fields []*sch.SchemaElement
fields schema
rowGroup sch.RowGroup
columns map[string]sch.ColumnChunk
child *rowGroup
}

func (r *rowGroup) updateColumnChunk(col string, pos int64, dataLen, compressedLen, count int, fields []*sch.SchemaElement) error {
func (r *rowGroup) updateColumnChunk(col string, pos int64, dataLen, compressedLen, count int, fields schema) error {
ch, ok := r.columns[col]
if !ok {
t, err := columnType(col, fields)
Expand All @@ -177,8 +182,9 @@ func (r *rowGroup) updateColumnChunk(col string, pos int64, dataLen, compressedL
return nil
}

func schemaElements(fields []Field) []*sch.SchemaElement {
func schemaElements(fields []Field) schema {
out := make([]*sch.SchemaElement, len(fields)+1)
m := make(map[string]sch.SchemaElement)
l := int32(len(fields))
rt := sch.FieldRepetitionType_REQUIRED
out[0] = &sch.SchemaElement{
Expand All @@ -200,9 +206,10 @@ func schemaElements(fields []Field) []*sch.SchemaElement {
f.Type(se)
f.RepetitionType(se)
out[i+1] = se
m[f.Name] = *se
}

return out
return schema{schema: out, lookup: m}
}

type FieldFunc func(*sch.SchemaElement)
Expand Down
16 changes: 11 additions & 5 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package parquet
import (
"encoding/binary"
"errors"
"fmt"
"io"
"io/ioutil"

Expand Down Expand Up @@ -33,15 +34,20 @@ type Position struct {
Offset int64
}

func (m *Metadata) Offsets() map[string][]Position {
func (m *Metadata) Offsets() (map[string][]Position, error) {
if len(m.metadata.RowGroups) == 0 {
return nil
return nil, nil
}

out := map[string][]Position{}
for _, rg := range m.metadata.RowGroups {
for i, ch := range rg.Columns {
se := m.schema[i+1] //skip root
for _, ch := range rg.Columns {
pth := ch.MetaData.PathInSchema
se, ok := m.schema.lookup[pth[len(pth)-1]]
if !ok {
return nil, fmt.Errorf("could not find schema for %v", pth)
}

pos := Position{
N: int(ch.MetaData.NumValues),
Offset: ch.FileOffset,
Expand All @@ -50,7 +56,7 @@ func (m *Metadata) Offsets() map[string][]Position {
out[se.Name] = append(out[se.Name], pos)
}
}
return out
return out, nil
}

func (m *Metadata) PageHeader(r io.ReadSeeker) (*sch.PageHeader, error) {
Expand Down

0 comments on commit f46a756

Please sign in to comment.