Skip to content

Commit

Permalink
Move back to DataPageHeader v1
Browse files Browse the repository at this point in the history
I must be doing something wrong when using DataPageHeaderV2
because external tools can't read the generated parquet files
  • Loading branch information
cswank committed Aug 7, 2019
1 parent a2fafae commit 4e4ad64
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 30 deletions.
9 changes: 5 additions & 4 deletions fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ func (f *RequiredField) DoRead(r io.ReadSeeker, pg Page) (io.Reader, []int, erro
return nil, nil, err
}

sizes = append(sizes, int(ph.DataPageHeaderV2.NumValues))
sizes = append(sizes, int(ph.DataPageHeader.NumValues))

data, err := pageData(r, ph, pg)
if err != nil {
return nil, nil, err
}

out = append(out, data...)
nRead += int(ph.DataPageHeaderV2.NumValues)
nRead += int(ph.DataPageHeader.NumValues)
}
return bytes.NewBuffer(out), sizes, nil
}
Expand Down Expand Up @@ -254,6 +254,7 @@ func (f *OptionalField) DoRead(r io.ReadSeeker, pg Page) (io.Reader, []int, erro
var out []byte
var sizes []int
var rc *readCounter

for nRead < pg.Size {
rc = &readCounter{r: r}
ph, err := PageHeader(rc)
Expand All @@ -271,14 +272,14 @@ func (f *OptionalField) DoRead(r io.ReadSeeker, pg Page) (io.Reader, []int, erro
return nil, nil, err
}

f.Defs = append(f.Defs, defs[:int(ph.DataPageHeaderV2.NumValues)]...)
f.Defs = append(f.Defs, defs[:int(ph.DataPageHeader.NumValues)]...)
if f.repeated {
reps, l2, err := readLevels(bytes.NewBuffer(data[l:]), int32(bits.Len(uint(f.MaxLevels.Rep))))
if err != nil {
return nil, nil, err
}
l += l2
f.Reps = append(f.Reps, reps[:int(ph.DataPageHeaderV2.NumValues)]...)
f.Reps = append(f.Reps, reps[:int(ph.DataPageHeader.NumValues)]...)
}

n := f.valsFromDefs(defs, uint8(f.MaxLevels.Def))
Expand Down
6 changes: 3 additions & 3 deletions internal/dremel/dremel_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func (f *Int64OptionalField) Write(w io.Writer, meta *parquet.Metadata) error {
return err
}
}
return f.DoWrite(w, meta, buf.Bytes(), len(f.vals), f.stats)
return f.DoWrite(w, meta, buf.Bytes(), len(f.Defs), f.stats)
}

func (f *Int64OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error {
Expand Down Expand Up @@ -847,7 +847,7 @@ func (f *StringOptionalField) Write(w io.Writer, meta *parquet.Metadata) error {
buf.Write([]byte(s))
}

return f.DoWrite(w, meta, buf.Bytes(), len(f.vals), f.stats)
return f.DoWrite(w, meta, buf.Bytes(), len(f.Defs), f.stats)
}

func (f *StringOptionalField) Read(r io.ReadSeeker, pg parquet.Page) error {
Expand All @@ -856,7 +856,7 @@ func (f *StringOptionalField) Read(r io.ReadSeeker, pg parquet.Page) error {
return err
}

for j := 0; j < pg.N; j++ {
for j := 0; j < f.Values(); j++ {
var x int32
if err := binary.Read(rr, binary.LittleEndian, &x); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/gen/template_bool_optional.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (f *BoolOptionalField) Write(w io.Writer, meta *parquet.Metadata) error {
}
}
return f.DoWrite(w, meta, rawBuf, len(f.vals), f.stats)
return f.DoWrite(w, meta, rawBuf, len(f.Defs), f.stats)
}
func (f *BoolOptionalField) Levels() ([]uint8, []uint8) {
Expand Down
2 changes: 1 addition & 1 deletion internal/gen/template_optional.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (f *{{.FieldType}}) Write(w io.Writer, meta *parquet.Metadata) error {
return err
}
}
return f.DoWrite(w, meta, buf.Bytes(), len(f.vals), f.stats)
return f.DoWrite(w, meta, buf.Bytes(), len(f.Defs), f.stats)
}
func (f *{{.FieldType}}) Read(r io.ReadSeeker, pg parquet.Page) error {
Expand Down
4 changes: 2 additions & 2 deletions internal/gen/template_string_optional.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (f *StringOptionalField) Write(w io.Writer, meta *parquet.Metadata) error {
buf.Write([]byte(s))
}
return f.DoWrite(w, meta, buf.Bytes(), len(f.vals), f.stats)
return f.DoWrite(w, meta, buf.Bytes(), len(f.Defs), f.stats)
}
func (f *StringOptionalField) Read(r io.ReadSeeker, pg parquet.Page) error {
Expand All @@ -62,7 +62,7 @@ func (f *StringOptionalField) Read(r io.ReadSeeker, pg parquet.Page) error {
return err
}
for j := 0; j < pg.N; j++ {
for j := 0; j < f.Values(); j++ {
var x int32
if err := binary.Read(rr, binary.LittleEndian, &x); err != nil {
return err
Expand Down
14 changes: 6 additions & 8 deletions parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,11 @@ func (m *Metadata) WritePageHeader(w io.Writer, pth []string, dataLen, compresse
Type: sch.PageType_DATA_PAGE,
UncompressedPageSize: int32(dataLen),
CompressedPageSize: int32(compressedLen),
DataPageHeaderV2: &sch.DataPageHeaderV2{
NumValues: int32(defCount),
NumNulls: int32(defCount - count),
NumRows: int32(defCount),
DefinitionLevelsByteLength: int32(defLen),
RepetitionLevelsByteLength: int32(repLen),
Encoding: sch.Encoding_PLAIN,
DataPageHeader: &sch.DataPageHeader{
NumValues: int32(count),
Encoding: sch.Encoding_PLAIN,
DefinitionLevelEncoding: sch.Encoding_RLE,
RepetitionLevelEncoding: sch.Encoding_RLE,
Statistics: &sch.Statistics{
NullCount: stats.NullCount(),
DistinctCount: stats.DistinctCount(),
Expand Down Expand Up @@ -425,7 +423,7 @@ func PageHeadersAtOffset(r io.ReadSeeker, o, n int64) ([]sch.PageHeader, error)
return nil, fmt.Errorf("unable to seek to next page: %s", err)
}

nRead += int64(ph.DataPageHeaderV2.NumValues)
nRead += int64(ph.DataPageHeader.NumValues)
}
return out, nil
}
Expand Down
14 changes: 7 additions & 7 deletions parquet_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func (f *Int32OptionalField) Write(w io.Writer, meta *parquet.Metadata) error {
return err
}
}
return f.DoWrite(w, meta, buf.Bytes(), len(f.vals), f.stats)
return f.DoWrite(w, meta, buf.Bytes(), len(f.Defs), f.stats)
}

func (f *Int32OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error {
Expand Down Expand Up @@ -830,7 +830,7 @@ func (f *Int64OptionalField) Write(w io.Writer, meta *parquet.Metadata) error {
return err
}
}
return f.DoWrite(w, meta, buf.Bytes(), len(f.vals), f.stats)
return f.DoWrite(w, meta, buf.Bytes(), len(f.Defs), f.stats)
}

func (f *Int64OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error {
Expand Down Expand Up @@ -922,7 +922,7 @@ func (f *StringOptionalField) Write(w io.Writer, meta *parquet.Metadata) error {
buf.Write([]byte(s))
}

return f.DoWrite(w, meta, buf.Bytes(), len(f.vals), f.stats)
return f.DoWrite(w, meta, buf.Bytes(), len(f.Defs), f.stats)
}

func (f *StringOptionalField) Read(r io.ReadSeeker, pg parquet.Page) error {
Expand All @@ -931,7 +931,7 @@ func (f *StringOptionalField) Read(r io.ReadSeeker, pg parquet.Page) error {
return err
}

for j := 0; j < pg.N; j++ {
for j := 0; j < f.Values(); j++ {
var x int32
if err := binary.Read(rr, binary.LittleEndian, &x); err != nil {
return err
Expand Down Expand Up @@ -1102,7 +1102,7 @@ func (f *Float32OptionalField) Write(w io.Writer, meta *parquet.Metadata) error
return err
}
}
return f.DoWrite(w, meta, buf.Bytes(), len(f.vals), f.stats)
return f.DoWrite(w, meta, buf.Bytes(), len(f.Defs), f.stats)
}

func (f *Float32OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error {
Expand Down Expand Up @@ -1206,7 +1206,7 @@ func (f *BoolOptionalField) Write(w io.Writer, meta *parquet.Metadata) error {
}
}

return f.DoWrite(w, meta, rawBuf, len(f.vals), f.stats)
return f.DoWrite(w, meta, rawBuf, len(f.Defs), f.stats)
}

func (f *BoolOptionalField) Levels() ([]uint8, []uint8) {
Expand Down Expand Up @@ -1303,7 +1303,7 @@ func (f *Uint64OptionalField) Write(w io.Writer, meta *parquet.Metadata) error {
return err
}
}
return f.DoWrite(w, meta, buf.Bytes(), len(f.vals), f.stats)
return f.DoWrite(w, meta, buf.Bytes(), len(f.Defs), f.stats)
}

func (f *Uint64OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error {
Expand Down
8 changes: 4 additions & 4 deletions parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,12 +657,12 @@ func TestStats(t *testing.T) {
}
for i, st := range tc.stats {
ph := pages[i]
assert.Equal(t, st.min, ph.DataPageHeaderV2.Statistics.MinValue)
assert.Equal(t, st.max, ph.DataPageHeaderV2.Statistics.MaxValue)
assert.Equal(t, st.min, ph.DataPageHeader.Statistics.MinValue)
assert.Equal(t, st.max, ph.DataPageHeader.Statistics.MaxValue)
if st.nilCount == nil {
assert.Equal(t, st.nilCount, ph.DataPageHeaderV2.Statistics.NullCount)
assert.Equal(t, st.nilCount, ph.DataPageHeader.Statistics.NullCount)
} else {
assert.Equal(t, *st.nilCount, *ph.DataPageHeaderV2.Statistics.NullCount)
assert.Equal(t, *st.nilCount, *ph.DataPageHeader.Statistics.NullCount)
}
}
})
Expand Down

0 comments on commit 4e4ad64

Please sign in to comment.