diff --git a/cmd/bitpackgen/main.go b/cmd/bitpackgen/main.go index d3a939a..dd582cb 100644 --- a/cmd/bitpackgen/main.go +++ b/cmd/bitpackgen/main.go @@ -159,18 +159,20 @@ var ( // Code generated by github.com/parsyl/parquet. DO NOT EDIT. -func Pack(width int, vals []uint8) []byte { +const MaxSize = {{.Max}} + +func Pack(b []byte, width int, vals []uint8) []byte { switch width { {{range $i := N 1 .Max }}case {{$i}}: - return pack{{$i}}(vals) + return pack{{$i}}(b, vals) {{end}}default: - return []byte{} + return b } } {{range $i := N 1 .Max}} -func pack{{$i}}(vals []uint8) []byte { -return []byte{ {{template "bytes" $i}} } +func pack{{$i}}(b []byte, vals []uint8) []byte { +return append(b, {{template "bytes" $i}} ) } {{end}} diff --git a/cmd/parquetgen/dremel/read.go b/cmd/parquetgen/dremel/read.go index b461b44..6c4b549 100644 --- a/cmd/parquetgen/dremel/read.go +++ b/cmd/parquetgen/dremel/read.go @@ -18,7 +18,8 @@ func readOptional(f fields.Field) string { n := f.MaxDef() for def := 0; def < n; def++ { out += fmt.Sprintf(`case x.%s == nil: - return nil, []uint8{%d}, nil + defs = append(defs, %d) + return vals, defs, reps `, nilField(def, f), def) } @@ -29,13 +30,15 @@ func readOptional(f fields.Field) string { } out += fmt.Sprintf(` default: - return []%s{%sx.%s}, []uint8{%d}, nil`, cleanTypeName(f.Type), ptr, nilField(n, f), n) + vals = append(vals, %sx.%s) + defs = append(defs, %d) + return vals, defs, reps`, ptr, nilField(n, f), n) - return fmt.Sprintf(`func read%s(x %s) ([]%s, []uint8, []uint8) { + return fmt.Sprintf(`func read%s(x %s, vals []%s, defs, reps []uint8) ([]%s, []uint8, []uint8) { switch { %s } - }`, strings.Join(f.FieldNames(), ""), f.StructType(), cleanTypeName(f.Type), out) + }`, strings.Join(f.FieldNames(), ""), f.StructType(), cleanTypeName(f.Type), cleanTypeName(f.Type), out) } func cleanTypeName(s string) string { diff --git a/cmd/parquetgen/dremel/read_repeated.go b/cmd/parquetgen/dremel/read_repeated.go index ccd77f3..3eb15b6 100644 --- a/cmd/parquetgen/dremel/read_repeated.go +++ b/cmd/parquetgen/dremel/read_repeated.go @@ -60,9 +60,7 @@ type readClause struct { } func readRepeated(f fields.Field) string { - return fmt.Sprintf(`func read%s(x %s) ([]%s, []uint8, []uint8) { - var vals []%s - var defs, reps []uint8 + return fmt.Sprintf(`func read%s(x %s, vals []%s, defs, reps []uint8) ([]%s, []uint8, []uint8) { var lastRep uint8 %s diff --git a/cmd/parquetgen/dremel/read_test.go b/cmd/parquetgen/dremel/read_test.go index bcfd710..c3c39dc 100644 --- a/cmd/parquetgen/dremel/read_test.go +++ b/cmd/parquetgen/dremel/read_test.go @@ -32,12 +32,15 @@ func TestRead(t *testing.T) { f: fields.Field{ Type: "int32", Name: "ID", RepetitionType: fields.Optional, }, - result: `func readID(x Person) ([]int32, []uint8, []uint8) { + result: `func readID(x Person, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8) { switch { case x.ID == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []int32{*x.ID}, []uint8{1}, nil + vals = append(vals, *x.ID) + defs = append(defs, 1) + return vals, defs, reps } }`, }, @@ -61,14 +64,18 @@ func TestRead(t *testing.T) { {Type: "int32", Name: "Difficulty", RepetitionType: fields.Optional}, }, }, - result: `func readHobbyDifficulty(x Person) ([]int32, []uint8, []uint8) { + result: `func readHobbyDifficulty(x Person, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8) { switch { case x.Hobby == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps case x.Hobby.Difficulty == nil: - return nil, []uint8{1}, nil + defs = append(defs, 1) + return vals, defs, reps default: - return []int32{*x.Hobby.Difficulty}, []uint8{2}, nil + vals = append(vals, *x.Hobby.Difficulty) + defs = append(defs, 2) + return vals, defs, reps } }`, }, @@ -79,12 +86,15 @@ func TestRead(t *testing.T) { {Type: "string", Name: "Name", RepetitionType: fields.Required}, }, }, - result: `func readHobbyName(x Person) ([]string, []uint8, []uint8) { + result: `func readHobbyName(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.Hobby == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []string{x.Hobby.Name}, []uint8{1}, nil + vals = append(vals, x.Hobby.Name) + defs = append(defs, 1) + return vals, defs, reps } }`, }, @@ -95,12 +105,15 @@ func TestRead(t *testing.T) { {Type: "string", Name: "Name", RepetitionType: fields.Optional}, }, }, - result: `func readHobbyName(x Person) ([]string, []uint8, []uint8) { + result: `func readHobbyName(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.Hobby.Name == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []string{*x.Hobby.Name}, []uint8{1}, nil + vals = append(vals, *x.Hobby.Name) + defs = append(defs, 1) + return vals, defs, reps } }`, }, @@ -113,14 +126,18 @@ func TestRead(t *testing.T) { }}, }, }, - result: `func readFriendHobbyName(x Person) ([]string, []uint8, []uint8) { + result: `func readFriendHobbyName(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.Friend == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps case x.Friend.Hobby.Name == nil: - return nil, []uint8{1}, nil + defs = append(defs, 1) + return vals, defs, reps default: - return []string{*x.Friend.Hobby.Name}, []uint8{2}, nil + vals = append(vals, *x.Friend.Hobby.Name) + defs = append(defs, 2) + return vals, defs, reps } }`, }, @@ -133,14 +150,18 @@ func TestRead(t *testing.T) { }}, }, }, - result: `func readFriendHobbyName(x Person) ([]string, []uint8, []uint8) { + result: `func readFriendHobbyName(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.Friend.Hobby == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps case x.Friend.Hobby.Name == nil: - return nil, []uint8{1}, nil + defs = append(defs, 1) + return vals, defs, reps default: - return []string{*x.Friend.Hobby.Name}, []uint8{2}, nil + vals = append(vals, *x.Friend.Hobby.Name) + defs = append(defs, 2) + return vals, defs, reps } }`, }, @@ -153,14 +174,18 @@ func TestRead(t *testing.T) { }}, }, }, - result: `func readFriendHobbyName(x Person) ([]string, []uint8, []uint8) { + result: `func readFriendHobbyName(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.Friend == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps case x.Friend.Hobby == nil: - return nil, []uint8{1}, nil + defs = append(defs, 1) + return vals, defs, reps default: - return []string{x.Friend.Hobby.Name}, []uint8{2}, nil + vals = append(vals, x.Friend.Hobby.Name) + defs = append(defs, 2) + return vals, defs, reps } }`, }, @@ -173,16 +198,21 @@ func TestRead(t *testing.T) { }}, }, }, - result: `func readFriendHobbyName(x Person) ([]string, []uint8, []uint8) { + result: `func readFriendHobbyName(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.Friend == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps case x.Friend.Hobby == nil: - return nil, []uint8{1}, nil + defs = append(defs, 1) + return vals, defs, reps case x.Friend.Hobby.Name == nil: - return nil, []uint8{2}, nil + defs = append(defs, 2) + return vals, defs, reps default: - return []string{*x.Friend.Hobby.Name}, []uint8{3}, nil + vals = append(vals, *x.Friend.Hobby.Name) + defs = append(defs, 3) + return vals, defs, reps } }`, }, @@ -197,18 +227,24 @@ func TestRead(t *testing.T) { }}, }, }, - result: `func readFriendHobbyNameFirst(x Person) ([]string, []uint8, []uint8) { + result: `func readFriendHobbyNameFirst(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.Friend == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps case x.Friend.Hobby == nil: - return nil, []uint8{1}, nil + defs = append(defs, 1) + return vals, defs, reps case x.Friend.Hobby.Name == nil: - return nil, []uint8{2}, nil + defs = append(defs, 2) + return vals, defs, reps case x.Friend.Hobby.Name.First == nil: - return nil, []uint8{3}, nil + defs = append(defs, 3) + return vals, defs, reps default: - return []string{*x.Friend.Hobby.Name.First}, []uint8{4}, nil + vals = append(vals, *x.Friend.Hobby.Name.First) + defs = append(defs, 4) + return vals, defs, reps } }`, }, @@ -223,16 +259,21 @@ func TestRead(t *testing.T) { }}, }, }, - result: `func readFriendHobbyNameFirst(x Person) ([]string, []uint8, []uint8) { + result: `func readFriendHobbyNameFirst(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.Friend.Hobby == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps case x.Friend.Hobby.Name == nil: - return nil, []uint8{1}, nil + defs = append(defs, 1) + return vals, defs, reps case x.Friend.Hobby.Name.First == nil: - return nil, []uint8{2}, nil + defs = append(defs, 2) + return vals, defs, reps default: - return []string{*x.Friend.Hobby.Name.First}, []uint8{3}, nil + vals = append(vals, *x.Friend.Hobby.Name.First) + defs = append(defs, 3) + return vals, defs, reps } }`, }, @@ -247,16 +288,21 @@ func TestRead(t *testing.T) { }}, }, }, - result: `func readFriendHobbyNameFirst(x Person) ([]string, []uint8, []uint8) { + result: `func readFriendHobbyNameFirst(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.Friend == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps case x.Friend.Hobby == nil: - return nil, []uint8{1}, nil + defs = append(defs, 1) + return vals, defs, reps case x.Friend.Hobby.Name == nil: - return nil, []uint8{2}, nil + defs = append(defs, 2) + return vals, defs, reps default: - return []string{x.Friend.Hobby.Name.First}, []uint8{3}, nil + vals = append(vals, x.Friend.Hobby.Name.First) + defs = append(defs, 3) + return vals, defs, reps } }`, }, @@ -265,9 +311,7 @@ func TestRead(t *testing.T) { f: fields.Field{ Type: "string", Name: "Friends", RepetitionType: fields.Repeated, }, - result: `func readFriends(x Person) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 + result: `func readFriends(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Friends) == 0 { @@ -295,9 +339,7 @@ func TestRead(t *testing.T) { {Type: "int64", Name: "Forward", RepetitionType: fields.Repeated}, }, }, - result: `func readLinkForward(x Document) ([]int64, []uint8, []uint8) { - var vals []int64 - var defs, reps []uint8 + result: `func readLinkForward(x Document, vals []int64, defs, reps []uint8) ([]int64, []uint8, []uint8) { var lastRep uint8 if x.Link == nil { @@ -332,9 +374,7 @@ func TestRead(t *testing.T) { }}, }, }, - result: `func readNamesLanguagesCode(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 + result: `func readNamesLanguagesCode(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Names) == 0 { @@ -374,9 +414,7 @@ func TestRead(t *testing.T) { }}, }, }, - result: `func readNamesLanguagesCountry(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 + result: `func readNamesLanguagesCountry(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Names) == 0 { @@ -419,9 +457,7 @@ func TestRead(t *testing.T) { {Type: "string", Name: "URL", RepetitionType: fields.Optional}, }, }, - result: `func readNamesURL(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 + result: `func readNamesURL(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Names) == 0 { @@ -456,9 +492,7 @@ func TestRead(t *testing.T) { }}, }, }, - result: `func readFriendsNameLast(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 + result: `func readFriendsNameLast(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Friends) == 0 { @@ -488,9 +522,7 @@ func TestRead(t *testing.T) { }}, }, }, - result: `func readFriendNameAliases(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 + result: `func readFriendNameAliases(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Friend.Name.Aliases) == 0 { @@ -522,9 +554,7 @@ func TestRead(t *testing.T) { }}, }, }, - result: `func readOtherFriendsNameMiddle(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 + result: `func readOtherFriendsNameMiddle(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if x.Other == nil { diff --git a/cmd/parquetgen/dremel/testcases/doc/generated.go b/cmd/parquetgen/dremel/testcases/doc/generated.go index 3b860a6..ccd857b 100644 --- a/cmd/parquetgen/dremel/testcases/doc/generated.go +++ b/cmd/parquetgen/dremel/testcases/doc/generated.go @@ -63,9 +63,7 @@ func writeDocID(x *Document, vals []int64) { x.DocID = vals[0] } -func readLinksBackward(x Document) ([]int64, []uint8, []uint8) { - var vals []int64 - var defs, reps []uint8 +func readLinksBackward(x Document, vals []int64, defs, reps []uint8) ([]int64, []uint8, []uint8) { var lastRep uint8 if x.Links == nil { @@ -121,9 +119,7 @@ func writeLinksBackward(x *Document, vals []int64, defs, reps []uint8) (int, int return nVals, nLevels } -func readLinksForward(x Document) ([]int64, []uint8, []uint8) { - var vals []int64 - var defs, reps []uint8 +func readLinksForward(x Document, vals []int64, defs, reps []uint8) ([]int64, []uint8, []uint8) { var lastRep uint8 if x.Links == nil { @@ -172,9 +168,7 @@ func writeLinksForward(x *Document, vals []int64, defs, reps []uint8) (int, int) return nVals, nLevels } -func readNamesLanguagesCode(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readNamesLanguagesCode(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Names) == 0 { @@ -235,9 +229,7 @@ func writeNamesLanguagesCode(x *Document, vals []string, defs, reps []uint8) (in return nVals, nLevels } -func readNamesLanguagesCountry(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readNamesLanguagesCountry(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Names) == 0 { @@ -296,9 +288,7 @@ func writeNamesLanguagesCountry(x *Document, vals []string, defs, reps []uint8) return nVals, nLevels } -func readNamesURL(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readNamesURL(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Names) == 0 { @@ -411,8 +401,10 @@ func MaxPageSize(m int) func(*ParquetWriter) error { } } +var par1 = []byte("PAR1") + func begin(p *ParquetWriter) error { - _, err := p.w.Write([]byte("PAR1")) + _, err := p.w.Write(par1) return err } @@ -475,7 +467,7 @@ func (p *ParquetWriter) Close() error { return err } - _, err := p.w.Write([]byte("PAR1")) + _, err := p.w.Write(par1) return err } @@ -734,12 +726,12 @@ func (f *Int64Field) Levels() ([]uint8, []uint8) { type Int64OptionalField struct { parquet.OptionalField vals []int64 - read func(r Document) ([]int64, []uint8, []uint8) - write func(r *Document, vals []int64, def, rep []uint8) (int, int) + read func(r Document, vals []int64, defs, reps []uint8) ([]int64, []uint8, []uint8) + write func(r *Document, vals []int64, defs, reps []uint8) (int, int) stats *int64optionalStats } -func NewInt64OptionalField(read func(r Document) ([]int64, []uint8, []uint8), write func(r *Document, vals []int64, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Int64OptionalField { +func NewInt64OptionalField(read func(r Document, vals []int64, defs, reps []uint8) ([]int64, []uint8, []uint8), write func(r *Document, vals []int64, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Int64OptionalField { return &Int64OptionalField{ read: read, write: write, @@ -779,11 +771,11 @@ func (f *Int64OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Int64OptionalField) Add(r Document) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *Int64OptionalField) Scan(r *Document) { @@ -806,12 +798,12 @@ func (f *Int64OptionalField) Levels() ([]uint8, []uint8) { type StringOptionalField struct { parquet.OptionalField vals []string - read func(r Document) ([]string, []uint8, []uint8) + read func(r Document, vals []string, def, rep []uint8) ([]string, []uint8, []uint8) write func(r *Document, vals []string, def, rep []uint8) (int, int) stats *stringOptionalStats } -func NewStringOptionalField(read func(r Document) ([]string, []uint8, []uint8), write func(r *Document, vals []string, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *StringOptionalField { +func NewStringOptionalField(read func(r Document, vals []string, def, rep []uint8) ([]string, []uint8, []uint8), write func(r *Document, vals []string, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *StringOptionalField { return &StringOptionalField{ read: read, write: write, @@ -825,11 +817,11 @@ func (f *StringOptionalField) Schema() parquet.Field { } func (f *StringOptionalField) Add(r Document) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *StringOptionalField) Scan(r *Document) { diff --git a/cmd/parquetgen/dremel/testcases/person/generated.go b/cmd/parquetgen/dremel/testcases/person/generated.go index 01a7e84..8182687 100644 --- a/cmd/parquetgen/dremel/testcases/person/generated.go +++ b/cmd/parquetgen/dremel/testcases/person/generated.go @@ -62,12 +62,15 @@ func writeName(x *Person, vals []string) { x.Name = vals[0] } -func readHobbyName(x Person) ([]string, []uint8, []uint8) { +func readHobbyName(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.Hobby == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []string{x.Hobby.Name}, []uint8{1}, nil + vals = append(vals, x.Hobby.Name) + defs = append(defs, 1) + return vals, defs, reps } } @@ -82,14 +85,18 @@ func writeHobbyName(x *Person, vals []string, defs, reps []uint8) (int, int) { return 0, 1 } -func readHobbyDifficulty(x Person) ([]int32, []uint8, []uint8) { +func readHobbyDifficulty(x Person, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8) { switch { case x.Hobby == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps case x.Hobby.Difficulty == nil: - return nil, []uint8{1}, nil + defs = append(defs, 1) + return vals, defs, reps default: - return []int32{*x.Hobby.Difficulty}, []uint8{2}, nil + vals = append(vals, *x.Hobby.Difficulty) + defs = append(defs, 2) + return vals, defs, reps } } @@ -104,9 +111,7 @@ func writeHobbyDifficulty(x *Person, vals []int32, defs, reps []uint8) (int, int return 0, 1 } -func readHobbySkillsName(x Person) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readHobbySkillsName(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if x.Hobby == nil { @@ -155,9 +160,7 @@ func writeHobbySkillsName(x *Person, vals []string, defs, reps []uint8) (int, in return nVals, nLevels } -func readHobbySkillsDifficulty(x Person) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readHobbySkillsDifficulty(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if x.Hobby == nil { @@ -270,8 +273,10 @@ func MaxPageSize(m int) func(*ParquetWriter) error { } } +var par1 = []byte("PAR1") + func begin(p *ParquetWriter) error { - _, err := p.w.Write([]byte("PAR1")) + _, err := p.w.Write(par1) return err } @@ -334,7 +339,7 @@ func (p *ParquetWriter) Close() error { return err } - _, err := p.w.Write([]byte("PAR1")) + _, err := p.w.Write(par1) return err } @@ -604,12 +609,12 @@ func (f *StringField) Levels() ([]uint8, []uint8) { type StringOptionalField struct { parquet.OptionalField vals []string - read func(r Person) ([]string, []uint8, []uint8) + read func(r Person, vals []string, def, rep []uint8) ([]string, []uint8, []uint8) write func(r *Person, vals []string, def, rep []uint8) (int, int) stats *stringOptionalStats } -func NewStringOptionalField(read func(r Person) ([]string, []uint8, []uint8), write func(r *Person, vals []string, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *StringOptionalField { +func NewStringOptionalField(read func(r Person, vals []string, def, rep []uint8) ([]string, []uint8, []uint8), write func(r *Person, vals []string, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *StringOptionalField { return &StringOptionalField{ read: read, write: write, @@ -623,11 +628,11 @@ func (f *StringOptionalField) Schema() parquet.Field { } func (f *StringOptionalField) Add(r Person) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *StringOptionalField) Scan(r *Person) { @@ -687,12 +692,12 @@ func (f *StringOptionalField) Levels() ([]uint8, []uint8) { type Int32OptionalField struct { parquet.OptionalField vals []int32 - read func(r Person) ([]int32, []uint8, []uint8) - write func(r *Person, vals []int32, def, rep []uint8) (int, int) + read func(r Person, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8) + write func(r *Person, vals []int32, defs, reps []uint8) (int, int) stats *int32optionalStats } -func NewInt32OptionalField(read func(r Person) ([]int32, []uint8, []uint8), write func(r *Person, vals []int32, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Int32OptionalField { +func NewInt32OptionalField(read func(r Person, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8), write func(r *Person, vals []int32, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Int32OptionalField { return &Int32OptionalField{ read: read, write: write, @@ -732,11 +737,11 @@ func (f *Int32OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Int32OptionalField) Add(r Person) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *Int32OptionalField) Scan(r *Person) { diff --git a/cmd/parquetgen/dremel/testcases/repetition/generated.go b/cmd/parquetgen/dremel/testcases/repetition/generated.go index bcb84f8..89f098a 100644 --- a/cmd/parquetgen/dremel/testcases/repetition/generated.go +++ b/cmd/parquetgen/dremel/testcases/repetition/generated.go @@ -53,9 +53,7 @@ func Fields(compression compression) []Field { } } -func readLinksBackwardCodes(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readLinksBackwardCodes(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Links) == 0 { @@ -135,9 +133,7 @@ func writeLinksBackwardCodes(x *Document, vals []string, defs, reps []uint8) (in return nVals, nLevels } -func readLinksBackwardURL(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readLinksBackwardURL(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Links) == 0 { @@ -196,9 +192,7 @@ func writeLinksBackwardURL(x *Document, vals []string, defs, reps []uint8) (int, return nVals, nLevels } -func readLinksBackwardCountries(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readLinksBackwardCountries(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Links) == 0 { @@ -262,9 +256,7 @@ func writeLinksBackwardCountries(x *Document, vals []string, defs, reps []uint8) return nVals, nLevels } -func readLinksForwardCodes(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readLinksForwardCodes(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Links) == 0 { @@ -335,9 +327,7 @@ func writeLinksForwardCodes(x *Document, vals []string, defs, reps []uint8) (int return nVals, nLevels } -func readLinksForwardURL(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readLinksForwardURL(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Links) == 0 { @@ -396,9 +386,7 @@ func writeLinksForwardURL(x *Document, vals []string, defs, reps []uint8) (int, return nVals, nLevels } -func readLinksForwardCountries(x Document) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readLinksForwardCountries(x Document, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Links) == 0 { @@ -526,8 +514,10 @@ func MaxPageSize(m int) func(*ParquetWriter) error { } } +var par1 = []byte("PAR1") + func begin(p *ParquetWriter) error { - _, err := p.w.Write([]byte("PAR1")) + _, err := p.w.Write(par1) return err } @@ -590,7 +580,7 @@ func (p *ParquetWriter) Close() error { return err } - _, err := p.w.Write([]byte("PAR1")) + _, err := p.w.Write(par1) return err } @@ -783,12 +773,12 @@ func (p *ParquetReader) Scan(x *Document) { type StringOptionalField struct { parquet.OptionalField vals []string - read func(r Document) ([]string, []uint8, []uint8) + read func(r Document, vals []string, def, rep []uint8) ([]string, []uint8, []uint8) write func(r *Document, vals []string, def, rep []uint8) (int, int) stats *stringOptionalStats } -func NewStringOptionalField(read func(r Document) ([]string, []uint8, []uint8), write func(r *Document, vals []string, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *StringOptionalField { +func NewStringOptionalField(read func(r Document, vals []string, def, rep []uint8) ([]string, []uint8, []uint8), write func(r *Document, vals []string, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *StringOptionalField { return &StringOptionalField{ read: read, write: write, @@ -802,11 +792,11 @@ func (f *StringOptionalField) Schema() parquet.Field { } func (f *StringOptionalField) Add(r Document) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *StringOptionalField) Scan(r *Document) { diff --git a/cmd/parquetgen/gen/template.go b/cmd/parquetgen/gen/template.go index 672d2ac..bade652 100644 --- a/cmd/parquetgen/gen/template.go +++ b/cmd/parquetgen/gen/template.go @@ -125,8 +125,10 @@ func MaxPageSize(m int) func(*ParquetWriter) error { } } +var par1 = []byte("PAR1") + func begin(p *ParquetWriter) error { - _, err := p.w.Write([]byte("PAR1")) + _, err := p.w.Write(par1) return err } @@ -189,7 +191,7 @@ func (p *ParquetWriter) Close() error { return err } - _, err := p.w.Write([]byte("PAR1")) + _, err := p.w.Write(par1) return err } @@ -282,7 +284,7 @@ type ParquetReader struct { rows int64 rowGroupCursor int64 rowGroupCount int64 - pages map[string][]parquet.Page + pages map[string][]parquet.Page meta *parquet.Metadata err error diff --git a/cmd/parquetgen/gen/template_bool.go b/cmd/parquetgen/gen/template_bool.go index 33bce48..96c5035 100644 --- a/cmd/parquetgen/gen/template_bool.go +++ b/cmd/parquetgen/gen/template_bool.go @@ -49,7 +49,7 @@ func (f *BoolField) Scan(r *{{.StructType}}) { if len(f.vals) == 0 { return } - + f.write(r, f.vals) f.vals = f.vals[1:] } diff --git a/cmd/parquetgen/gen/template_bool_optional.go b/cmd/parquetgen/gen/template_bool_optional.go index ed3777e..c5a8a31 100644 --- a/cmd/parquetgen/gen/template_bool_optional.go +++ b/cmd/parquetgen/gen/template_bool_optional.go @@ -3,12 +3,12 @@ package gen var boolOptionalTpl = `{{define "boolOptionalField"}}type BoolOptionalField struct { parquet.OptionalField vals []bool - read func(r {{.StructType}}) ([]{{removeStar .TypeName}}, []uint8, []uint8) + read func(r {{.StructType}}, vals []{{removeStar .TypeName}}, defs, reps []uint8) ([]{{removeStar .TypeName}}, []uint8, []uint8) write func(r *{{.StructType}}, vals []{{removeStar .TypeName}}, defs, reps []uint8) (int, int) stats *boolOptionalStats } -func NewBoolOptionalField(read func(r {{.StructType}}) ([]{{removeStar .TypeName}}, []uint8, []uint8), write func(r *{{.StructType}}, vals []{{removeStar .TypeName}}, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *BoolOptionalField { +func NewBoolOptionalField(read func(r {{.StructType}}, vals []{{removeStar .TypeName}}, defs, reps []uint8) ([]{{removeStar .TypeName}}, []uint8, []uint8), write func(r *{{.StructType}}, vals []{{removeStar .TypeName}}, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *BoolOptionalField { return &BoolOptionalField{ read: read, write: write, @@ -46,11 +46,11 @@ func (f *BoolOptionalField) Scan(r *{{.StructType}}) { } func (f *BoolOptionalField) Add(r {{.StructType}}) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *BoolOptionalField) Write(w io.Writer, meta *parquet.Metadata) error { diff --git a/cmd/parquetgen/gen/template_optional.go b/cmd/parquetgen/gen/template_optional.go index 1a6a1e0..666f779 100644 --- a/cmd/parquetgen/gen/template_optional.go +++ b/cmd/parquetgen/gen/template_optional.go @@ -7,12 +7,12 @@ var optionalNumericTpl = `{{define "optionalField"}} type {{.FieldType}} struct { parquet.OptionalField vals []{{removeStar .TypeName}} - read func(r {{.StructType}}) ([]{{removeStar .TypeName}}, []uint8, []uint8) - write func(r *{{.StructType}}, vals []{{removeStar .TypeName}}, def, rep []uint8) (int, int) + read func(r {{.StructType}}, vals []{{removeStar .TypeName}}, defs, reps []uint8) ([]{{removeStar .TypeName}}, []uint8, []uint8) + write func(r *{{.StructType}}, vals []{{removeStar .TypeName}}, defs, reps []uint8) (int, int) stats *{{removeStar .TypeName}}optionalStats } -func New{{.FieldType}}(read func(r {{.StructType}}) ([]{{removeStar .TypeName}}, []uint8, []uint8), write func(r *{{.StructType}}, vals []{{removeStar .TypeName}}, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *{{.FieldType}} { +func New{{.FieldType}}(read func(r {{.StructType}}, vals []{{removeStar .TypeName}}, defs, reps []uint8) ([]{{removeStar .TypeName}}, []uint8, []uint8), write func(r *{{.StructType}}, vals []{{removeStar .TypeName}}, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *{{.FieldType}} { return &{{.FieldType}}{ read: read, write: write, @@ -52,11 +52,11 @@ func (f *{{.FieldType}}) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *{{.FieldType}}) Add(r {{.StructType}}) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *{{.FieldType}}) Scan(r *{{.StructType}}) { @@ -108,7 +108,7 @@ func (f *{{removeStar .TypeName}}optionalStats) add(vals []{{removeStar .TypeNam } if val > f.max { f.max = val - } + } } } } diff --git a/cmd/parquetgen/gen/template_string_optional.go b/cmd/parquetgen/gen/template_string_optional.go index 47e3e23..9a16f64 100644 --- a/cmd/parquetgen/gen/template_string_optional.go +++ b/cmd/parquetgen/gen/template_string_optional.go @@ -4,12 +4,12 @@ var stringOptionalTpl = `{{define "stringOptionalField"}} type StringOptionalField struct { parquet.OptionalField vals []string - read func(r {{.StructType}}) ([]{{removeStar .TypeName}}, []uint8, []uint8) + read func(r {{.StructType}}, vals []{{removeStar .TypeName}}, def, rep []uint8) ([]{{removeStar .TypeName}}, []uint8, []uint8) write func(r *{{.StructType}}, vals []{{removeStar .TypeName}}, def, rep []uint8) (int, int) stats *stringOptionalStats } -func NewStringOptionalField(read func(r {{.StructType}}) ([]{{removeStar .TypeName}}, []uint8, []uint8), write func(r *{{.StructType}}, vals []{{removeStar .TypeName}}, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *StringOptionalField { +func NewStringOptionalField(read func(r {{.StructType}}, vals []{{removeStar .TypeName}}, def, rep []uint8) ([]{{removeStar .TypeName}}, []uint8, []uint8), write func(r *{{.StructType}}, vals []{{removeStar .TypeName}}, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *StringOptionalField { return &StringOptionalField{ read: read, write: write, @@ -23,11 +23,11 @@ func (f *StringOptionalField) Schema() parquet.Field { } func (f *StringOptionalField) Add(r {{.StructType}}) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *StringOptionalField) Scan(r *{{.StructType}}) { diff --git a/fields.go b/fields.go index 9959961..63ec69e 100644 --- a/fields.go +++ b/fields.go @@ -3,10 +3,11 @@ package parquet import ( "bytes" "compress/gzip" - "github.com/valyala/bytebufferpool" "math/bits" "strings" + "github.com/valyala/bytebufferpool" + "fmt" "io" @@ -28,8 +29,7 @@ const ( ) var ( - buffpool = bytebufferpool.Pool{} - compresspool = bytebufferpool.Pool{} + buffpool = bytebufferpool.Pool{} ) type RepetitionTypes []RepetitionType @@ -94,8 +94,8 @@ func RequiredFieldUncompressed(r *RequiredField) { // DoWrite writes the actual raw data. func (f *RequiredField) DoWrite(w io.Writer, meta *Metadata, vals []byte, count int, stats Stats) error { - buff := compresspool.Get() - defer compresspool.Put(buff) + buff := buffpool.Get() + defer buffpool.Put(buff) l, cl, vals, err := compress(f.compression, buff, vals) if err != nil { @@ -227,7 +227,6 @@ func (f *OptionalField) valsFromDefs(defs []uint8, max uint8) int { return out } - // DoWrite is called by all optional field types to write the definition levels // and raw data to the io.Writer func (f *OptionalField) DoWrite(w io.Writer, meta *Metadata, vals []byte, count int, stats Stats) error { @@ -256,10 +255,10 @@ func (f *OptionalField) DoWrite(w io.Writer, meta *Metadata, vals []byte, count return err } - compresBuf := compresspool.Get() - defer compresspool.Put(compresBuf) + compressed := buffpool.Get() + defer buffpool.Put(compressed) - l, cl, vals, err := compress(f.compression, compresBuf, buf.Bytes()) + l, cl, vals, err := compress(f.compression, compressed, buf.Bytes()) if err != nil { return err } diff --git a/go.mod b/go.mod index 8511a82..5b26a49 100644 --- a/go.mod +++ b/go.mod @@ -7,5 +7,5 @@ require ( github.com/bxcodec/faker/v3 v3.6.0 github.com/golang/snappy v0.0.2 github.com/stretchr/testify v1.7.0 - github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 ) diff --git a/internal/bitpack/bitpack.go b/internal/bitpack/bitpack.go index 088d493..a04fdd5 100644 --- a/internal/bitpack/bitpack.go +++ b/internal/bitpack/bitpack.go @@ -2,23 +2,25 @@ package bitpack // Code generated by github.com/parsyl/parquet. DO NOT EDIT. -func Pack(width int, vals []uint8) []byte { +const MaxSize = 4 + +func Pack(b []byte, width int, vals []uint8) []byte { switch width { case 1: - return pack1(vals) + return pack1(b, vals) case 2: - return pack2(vals) + return pack2(b, vals) case 3: - return pack3(vals) + return pack3(b, vals) case 4: - return pack4(vals) + return pack4(b, vals) default: - return []byte{} + return b } } -func pack1(vals []uint8) []byte { - return []byte{ +func pack1(b []byte, vals []uint8) []byte { + return append(b, (byte((vals[0]&1)<<0) | byte((vals[1]&1)<<1) | byte((vals[2]&1)<<2) | @@ -27,11 +29,11 @@ func pack1(vals []uint8) []byte { byte((vals[5]&1)<<5) | byte((vals[6]&1)<<6) | byte((vals[7]&1)<<7)), - } + ) } -func pack2(vals []uint8) []byte { - return []byte{ +func pack2(b []byte, vals []uint8) []byte { + return append(b, (byte((vals[0]&3)<<0) | byte((vals[1]&3)<<2) | byte((vals[2]&3)<<4) | @@ -40,11 +42,11 @@ func pack2(vals []uint8) []byte { byte((vals[5]&3)<<2) | byte((vals[6]&3)<<4) | byte((vals[7]&3)<<6)), - } + ) } -func pack3(vals []uint8) []byte { - return []byte{ +func pack3(b []byte, vals []uint8) []byte { + return append(b, (byte((vals[0]&7)<<0) | byte((vals[1]&7)<<3) | byte((vals[2]&3)<<6)), @@ -55,11 +57,11 @@ func pack3(vals []uint8) []byte { (byte((vals[5]&6)>>1) | byte((vals[6]&7)<<2) | byte((vals[7]&7)<<5)), - } + ) } -func pack4(vals []uint8) []byte { - return []byte{ +func pack4(b []byte, vals []uint8) []byte { + return append(b, (byte((vals[0]&15)<<0) | byte((vals[1]&15)<<4)), (byte((vals[2]&15)<<0) | @@ -68,7 +70,7 @@ func pack4(vals []uint8) []byte { byte((vals[5]&15)<<4)), (byte((vals[6]&15)<<0) | byte((vals[7]&15)<<4)), - } + ) } func Unpack(width int, vals []byte) []uint8 { diff --git a/internal/bitpack/bitpack_test.go b/internal/bitpack/bitpack_test.go index 983b5d7..72f693f 100644 --- a/internal/bitpack/bitpack_test.go +++ b/internal/bitpack/bitpack_test.go @@ -45,7 +45,8 @@ func TestPackAndUnpack(t *testing.T) { for i, tc := range testCases { t.Run(fmt.Sprintf("%d %s", i, tc.name), func(t *testing.T) { - b := bitpack.Pack(tc.width, tc.ints) + b := make([]byte, 0, bitpack.MaxSize) + b = bitpack.Pack(b, tc.width, tc.ints) if len(tc.bytes) > 0 { assert.Equal(t, tc.bytes, b) } diff --git a/internal/rle/rle.go b/internal/rle/rle.go index f5890cd..74f54e0 100644 --- a/internal/rle/rle.go +++ b/internal/rle/rle.go @@ -76,7 +76,9 @@ func (r *RLE) writeOrAppendBitPackedRun() { r.headerPointer = r.out.size() - 1 } - r.out.write(bitpack.Pack(int(r.bitWidth), r.valBuf)) + tmp := make([]byte, 0, bitpack.MaxSize) + tmp = bitpack.Pack(tmp, int(r.bitWidth), r.valBuf) + r.out.write(tmp) r.bufCount = 0 r.repeatCount = 0 r.groupCount++ diff --git a/parquet_generated_test.go b/parquet_generated_test.go index 0051b06..9ee7204 100644 --- a/parquet_generated_test.go +++ b/parquet_generated_test.go @@ -87,12 +87,15 @@ func writeName(x *Person, vals []string) { x.Name = vals[0] } -func readAge(x Person) ([]int32, []uint8, []uint8) { +func readAge(x Person, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8) { switch { case x.Age == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []int32{*x.Age}, []uint8{1}, nil + vals = append(vals, *x.Age) + defs = append(defs, 1) + return vals, defs, reps } } @@ -115,12 +118,15 @@ func writeHappiness(x *Person, vals []int64) { x.Happiness = vals[0] } -func readSadness(x Person) ([]int64, []uint8, []uint8) { +func readSadness(x Person, vals []int64, defs, reps []uint8) ([]int64, []uint8, []uint8) { switch { case x.Sadness == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []int64{*x.Sadness}, []uint8{1}, nil + vals = append(vals, *x.Sadness) + defs = append(defs, 1) + return vals, defs, reps } } @@ -135,12 +141,15 @@ func writeSadness(x *Person, vals []int64, defs, reps []uint8) (int, int) { return 0, 1 } -func readCode(x Person) ([]string, []uint8, []uint8) { +func readCode(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.Code == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []string{*x.Code}, []uint8{1}, nil + vals = append(vals, *x.Code) + defs = append(defs, 1) + return vals, defs, reps } } @@ -171,12 +180,15 @@ func writeBoldness(x *Person, vals []float64) { x.Boldness = vals[0] } -func readLameness(x Person) ([]float32, []uint8, []uint8) { +func readLameness(x Person, vals []float32, defs, reps []uint8) ([]float32, []uint8, []uint8) { switch { case x.Lameness == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []float32{*x.Lameness}, []uint8{1}, nil + vals = append(vals, *x.Lameness) + defs = append(defs, 1) + return vals, defs, reps } } @@ -191,12 +203,15 @@ func writeLameness(x *Person, vals []float32, defs, reps []uint8) (int, int) { return 0, 1 } -func readKeen(x Person) ([]bool, []uint8, []uint8) { +func readKeen(x Person, vals []bool, defs, reps []uint8) ([]bool, []uint8, []uint8) { switch { case x.Keen == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []bool{*x.Keen}, []uint8{1}, nil + vals = append(vals, *x.Keen) + defs = append(defs, 1) + return vals, defs, reps } } @@ -219,12 +234,15 @@ func writeBirthday(x *Person, vals []uint32) { x.Birthday = vals[0] } -func readAnniversary(x Person) ([]uint64, []uint8, []uint8) { +func readAnniversary(x Person, vals []uint64, defs, reps []uint8) ([]uint64, []uint8, []uint8) { switch { case x.Anniversary == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []uint64{*x.Anniversary}, []uint8{1}, nil + vals = append(vals, *x.Anniversary) + defs = append(defs, 1) + return vals, defs, reps } } @@ -255,12 +273,15 @@ func writeHungry(x *Person, vals []bool) { x.Hungry = vals[0] } -func readHobbyName(x Person) ([]string, []uint8, []uint8) { +func readHobbyName(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.Hobby == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []string{x.Hobby.Name}, []uint8{1}, nil + vals = append(vals, x.Hobby.Name) + defs = append(defs, 1) + return vals, defs, reps } } @@ -275,14 +296,18 @@ func writeHobbyName(x *Person, vals []string, defs, reps []uint8) (int, int) { return 0, 1 } -func readHobbyDifficulty(x Person) ([]int32, []uint8, []uint8) { +func readHobbyDifficulty(x Person, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8) { switch { case x.Hobby == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps case x.Hobby.Difficulty == nil: - return nil, []uint8{1}, nil + defs = append(defs, 1) + return vals, defs, reps default: - return []int32{*x.Hobby.Difficulty}, []uint8{2}, nil + vals = append(vals, *x.Hobby.Difficulty) + defs = append(defs, 2) + return vals, defs, reps } } @@ -297,9 +322,7 @@ func writeHobbyDifficulty(x *Person, vals []int32, defs, reps []uint8) (int, int return 0, 1 } -func readHobbySkillsName(x Person) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readHobbySkillsName(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if x.Hobby == nil { @@ -348,9 +371,7 @@ func writeHobbySkillsName(x *Person, vals []string, defs, reps []uint8) (int, in return nVals, nLevels } -func readHobbySkillsDifficulty(x Person) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readHobbySkillsDifficulty(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if x.Hobby == nil { @@ -399,9 +420,7 @@ func writeHobbySkillsDifficulty(x *Person, vals []string, defs, reps []uint8) (i return nVals, nLevels } -func readFriendsID(x Person) ([]int32, []uint8, []uint8) { - var vals []int32 - var defs, reps []uint8 +func readFriendsID(x Person, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8) { var lastRep uint8 if len(x.Friends) == 0 { @@ -445,9 +464,7 @@ func writeFriendsID(x *Person, vals []int32, defs, reps []uint8) (int, int) { return nVals, nLevels } -func readFriendsName(x Person) ([]string, []uint8, []uint8) { - var vals []string - var defs, reps []uint8 +func readFriendsName(x Person, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { var lastRep uint8 if len(x.Friends) == 0 { @@ -491,9 +508,7 @@ func writeFriendsName(x *Person, vals []string, defs, reps []uint8) (int, int) { return nVals, nLevels } -func readFriendsAge(x Person) ([]int32, []uint8, []uint8) { - var vals []int32 - var defs, reps []uint8 +func readFriendsAge(x Person, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8) { var lastRep uint8 if len(x.Friends) == 0 { @@ -614,8 +629,10 @@ func MaxPageSize(m int) func(*ParquetWriter) error { } } +var par1 = []byte("PAR1") + func begin(p *ParquetWriter) error { - _, err := p.w.Write([]byte("PAR1")) + _, err := p.w.Write(par1) return err } @@ -678,7 +695,7 @@ func (p *ParquetWriter) Close() error { return err } - _, err := p.w.Write([]byte("PAR1")) + _, err := p.w.Write(par1) return err } @@ -1014,12 +1031,12 @@ func (f *StringField) Levels() ([]uint8, []uint8) { type Int32OptionalField struct { parquet.OptionalField vals []int32 - read func(r Person) ([]int32, []uint8, []uint8) - write func(r *Person, vals []int32, def, rep []uint8) (int, int) + read func(r Person, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8) + write func(r *Person, vals []int32, defs, reps []uint8) (int, int) stats *int32optionalStats } -func NewInt32OptionalField(read func(r Person) ([]int32, []uint8, []uint8), write func(r *Person, vals []int32, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Int32OptionalField { +func NewInt32OptionalField(read func(r Person, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8), write func(r *Person, vals []int32, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Int32OptionalField { return &Int32OptionalField{ read: read, write: write, @@ -1059,11 +1076,11 @@ func (f *Int32OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Int32OptionalField) Add(r Person) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *Int32OptionalField) Scan(r *Person) { @@ -1152,12 +1169,12 @@ func (f *Int64Field) Levels() ([]uint8, []uint8) { type Int64OptionalField struct { parquet.OptionalField vals []int64 - read func(r Person) ([]int64, []uint8, []uint8) - write func(r *Person, vals []int64, def, rep []uint8) (int, int) + read func(r Person, vals []int64, defs, reps []uint8) ([]int64, []uint8, []uint8) + write func(r *Person, vals []int64, defs, reps []uint8) (int, int) stats *int64optionalStats } -func NewInt64OptionalField(read func(r Person) ([]int64, []uint8, []uint8), write func(r *Person, vals []int64, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Int64OptionalField { +func NewInt64OptionalField(read func(r Person, vals []int64, defs, reps []uint8) ([]int64, []uint8, []uint8), write func(r *Person, vals []int64, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Int64OptionalField { return &Int64OptionalField{ read: read, write: write, @@ -1197,11 +1214,11 @@ func (f *Int64OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Int64OptionalField) Add(r Person) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *Int64OptionalField) Scan(r *Person) { @@ -1224,12 +1241,12 @@ func (f *Int64OptionalField) Levels() ([]uint8, []uint8) { type StringOptionalField struct { parquet.OptionalField vals []string - read func(r Person) ([]string, []uint8, []uint8) + read func(r Person, vals []string, def, rep []uint8) ([]string, []uint8, []uint8) write func(r *Person, vals []string, def, rep []uint8) (int, int) stats *stringOptionalStats } -func NewStringOptionalField(read func(r Person) ([]string, []uint8, []uint8), write func(r *Person, vals []string, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *StringOptionalField { +func NewStringOptionalField(read func(r Person, vals []string, def, rep []uint8) ([]string, []uint8, []uint8), write func(r *Person, vals []string, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *StringOptionalField { return &StringOptionalField{ read: read, write: write, @@ -1243,11 +1260,11 @@ func (f *StringOptionalField) Schema() parquet.Field { } func (f *StringOptionalField) Add(r Person) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *StringOptionalField) Scan(r *Person) { @@ -1439,12 +1456,12 @@ func (f *Float64Field) Levels() ([]uint8, []uint8) { type Float32OptionalField struct { parquet.OptionalField vals []float32 - read func(r Person) ([]float32, []uint8, []uint8) - write func(r *Person, vals []float32, def, rep []uint8) (int, int) + read func(r Person, vals []float32, defs, reps []uint8) ([]float32, []uint8, []uint8) + write func(r *Person, vals []float32, defs, reps []uint8) (int, int) stats *float32optionalStats } -func NewFloat32OptionalField(read func(r Person) ([]float32, []uint8, []uint8), write func(r *Person, vals []float32, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Float32OptionalField { +func NewFloat32OptionalField(read func(r Person, vals []float32, defs, reps []uint8) ([]float32, []uint8, []uint8), write func(r *Person, vals []float32, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Float32OptionalField { return &Float32OptionalField{ read: read, write: write, @@ -1484,11 +1501,11 @@ func (f *Float32OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Float32OptionalField) Add(r Person) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *Float32OptionalField) Scan(r *Person) { @@ -1511,12 +1528,12 @@ func (f *Float32OptionalField) Levels() ([]uint8, []uint8) { type BoolOptionalField struct { parquet.OptionalField vals []bool - read func(r Person) ([]bool, []uint8, []uint8) + read func(r Person, vals []bool, defs, reps []uint8) ([]bool, []uint8, []uint8) write func(r *Person, vals []bool, defs, reps []uint8) (int, int) stats *boolOptionalStats } -func NewBoolOptionalField(read func(r Person) ([]bool, []uint8, []uint8), write func(r *Person, vals []bool, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *BoolOptionalField { +func NewBoolOptionalField(read func(r Person, vals []bool, defs, reps []uint8) ([]bool, []uint8, []uint8), write func(r *Person, vals []bool, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *BoolOptionalField { return &BoolOptionalField{ read: read, write: write, @@ -1554,11 +1571,11 @@ func (f *BoolOptionalField) Scan(r *Person) { } func (f *BoolOptionalField) Add(r Person) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *BoolOptionalField) Write(w io.Writer, meta *parquet.Metadata) error { @@ -1648,12 +1665,12 @@ func (f *Uint32Field) Levels() ([]uint8, []uint8) { type Uint64OptionalField struct { parquet.OptionalField vals []uint64 - read func(r Person) ([]uint64, []uint8, []uint8) - write func(r *Person, vals []uint64, def, rep []uint8) (int, int) + read func(r Person, vals []uint64, defs, reps []uint8) ([]uint64, []uint8, []uint8) + write func(r *Person, vals []uint64, defs, reps []uint8) (int, int) stats *uint64optionalStats } -func NewUint64OptionalField(read func(r Person) ([]uint64, []uint8, []uint8), write func(r *Person, vals []uint64, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Uint64OptionalField { +func NewUint64OptionalField(read func(r Person, vals []uint64, defs, reps []uint8) ([]uint64, []uint8, []uint8), write func(r *Person, vals []uint64, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Uint64OptionalField { return &Uint64OptionalField{ read: read, write: write, @@ -1693,11 +1710,11 @@ func (f *Uint64OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Uint64OptionalField) Add(r Person) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *Uint64OptionalField) Scan(r *Person) { @@ -2066,14 +2083,14 @@ func (s *stringOptionalStats) add(vals []string, defs []uint8) { s.nils++ } else { val := vals[i] - if s.min == nilString { + if s.min == nilOptString { s.min = val } else { if val < s.min { s.min = val } } - if s.max == nilString { + if s.max == nilOptString { s.max = val } else { if val > s.max { diff --git a/performance/base/parquet.go b/performance/base/parquet.go index b2c6fe8..a7436ef 100644 --- a/performance/base/parquet.go +++ b/performance/base/parquet.go @@ -3,7 +3,6 @@ package base // Code generated by github.com/parsyl/parquet. DO NOT EDIT. import ( - "bytes" "encoding/binary" "fmt" "io" @@ -12,8 +11,8 @@ import ( "github.com/parsyl/parquet" . "github.com/parsyl/parquet/performance/message" sch "github.com/parsyl/parquet/schema" + "github.com/valyala/bytebufferpool" "math" - "sort" ) type compression int @@ -25,6 +24,8 @@ const ( compressionUnknown compression = -1 ) +var buffpool = bytebufferpool.Pool{} + // ParquetWriter reprents a row group type ParquetWriter struct { fields []Field @@ -1033,13 +1034,16 @@ func (f *StringOptionalField) Scan(r *Message) { } func (f *StringOptionalField) Write(w io.Writer, meta *parquet.Metadata) error { - buf := bytes.Buffer{} + buf := buffpool.Get() + defer buffpool.Put(buf) + bs := make([]byte, 4) for _, s := range f.vals { - if err := binary.Write(&buf, binary.LittleEndian, int32(len(s))); err != nil { + binary.LittleEndian.PutUint32(bs, uint32(len(s))) + if _, err := buf.Write(bs); err != nil { return err } - buf.Write([]byte(s)) + buf.WriteString(s) } return f.DoWrite(w, meta, buf.Bytes(), len(f.Defs), f.stats) @@ -1092,13 +1096,16 @@ func (f *StringField) Schema() parquet.Field { } func (f *StringField) Write(w io.Writer, meta *parquet.Metadata) error { - buf := bytes.Buffer{} + buf := buffpool.Get() + defer buffpool.Put(buf) + bs := make([]byte, 4) for _, s := range f.vals { - if err := binary.Write(&buf, binary.LittleEndian, int32(len(s))); err != nil { + binary.LittleEndian.PutUint32(bs, uint32(len(s))) + if _, err := buf.Write(bs); err != nil { return err } - buf.Write([]byte(s)) + buf.WriteString(s) } return f.DoWrite(w, meta, buf.Bytes(), len(f.vals), f.stats) @@ -1166,9 +1173,13 @@ func (f *Int64OptionalField) Schema() parquet.Field { } func (f *Int64OptionalField) Write(w io.Writer, meta *parquet.Metadata) error { - var buf bytes.Buffer + buf := buffpool.Get() + defer buffpool.Put(buf) + + bs := make([]byte, 8) for _, v := range f.vals { - if err := binary.Write(&buf, binary.LittleEndian, v); err != nil { + binary.LittleEndian.PutUint64(bs, uint64(v)) + if _, err := buf.Write(bs); err != nil { return err } } @@ -1246,9 +1257,13 @@ func (f *Int64Field) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Int64Field) Write(w io.Writer, meta *parquet.Metadata) error { - var buf bytes.Buffer + buf := buffpool.Get() + defer buffpool.Put(buf) + + bs := make([]byte, 8) for _, v := range f.vals { - if err := binary.Write(&buf, binary.LittleEndian, v); err != nil { + binary.LittleEndian.PutUint64(bs, uint64(v)) + if _, err := buf.Write(bs); err != nil { return err } } @@ -1296,9 +1311,13 @@ func (f *Int32OptionalField) Schema() parquet.Field { } func (f *Int32OptionalField) Write(w io.Writer, meta *parquet.Metadata) error { - var buf bytes.Buffer + buf := buffpool.Get() + defer buffpool.Put(buf) + + bs := make([]byte, 4) for _, v := range f.vals { - if err := binary.Write(&buf, binary.LittleEndian, v); err != nil { + binary.LittleEndian.PutUint32(bs, uint32(v)) + if _, err := buf.Write(bs); err != nil { return err } } @@ -1376,9 +1395,13 @@ func (f *Int32Field) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Int32Field) Write(w io.Writer, meta *parquet.Metadata) error { - var buf bytes.Buffer + buf := buffpool.Get() + defer buffpool.Put(buf) + + bs := make([]byte, 4) for _, v := range f.vals { - if err := binary.Write(&buf, binary.LittleEndian, v); err != nil { + binary.LittleEndian.PutUint32(bs, uint32(v)) + if _, err := buf.Write(bs); err != nil { return err } } @@ -1426,9 +1449,13 @@ func (f *Float64OptionalField) Schema() parquet.Field { } func (f *Float64OptionalField) Write(w io.Writer, meta *parquet.Metadata) error { - var buf bytes.Buffer + buf := buffpool.Get() + defer buffpool.Put(buf) + + bs := make([]byte, 8) for _, v := range f.vals { - if err := binary.Write(&buf, binary.LittleEndian, v); err != nil { + binary.LittleEndian.PutUint64(bs, math.Float64bits(v)) + if _, err := buf.Write(bs); err != nil { return err } } @@ -1506,9 +1533,13 @@ func (f *Float64Field) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Float64Field) Write(w io.Writer, meta *parquet.Metadata) error { - var buf bytes.Buffer + buf := buffpool.Get() + defer buffpool.Put(buf) + + bs := make([]byte, 8) for _, v := range f.vals { - if err := binary.Write(&buf, binary.LittleEndian, v); err != nil { + binary.LittleEndian.PutUint64(bs, math.Float64bits(v)) + if _, err := buf.Write(bs); err != nil { return err } } @@ -1556,9 +1587,13 @@ func (f *Float32OptionalField) Schema() parquet.Field { } func (f *Float32OptionalField) Write(w io.Writer, meta *parquet.Metadata) error { - var buf bytes.Buffer + buf := buffpool.Get() + defer buffpool.Put(buf) + + bs := make([]byte, 4) for _, v := range f.vals { - if err := binary.Write(&buf, binary.LittleEndian, v); err != nil { + binary.LittleEndian.PutUint32(bs, math.Float32bits(v)) + if _, err := buf.Write(bs); err != nil { return err } } @@ -1636,9 +1671,13 @@ func (f *Float32Field) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Float32Field) Write(w io.Writer, meta *parquet.Metadata) error { - var buf bytes.Buffer + buf := buffpool.Get() + defer buffpool.Put(buf) + + bs := make([]byte, 4) for _, v := range f.vals { - if err := binary.Write(&buf, binary.LittleEndian, v); err != nil { + binary.LittleEndian.PutUint32(bs, math.Float32bits(v)) + if _, err := buf.Write(bs); err != nil { return err } } @@ -1797,16 +1836,21 @@ func (f *BoolField) Levels() ([]uint8, []uint8) { return nil, nil } +const nilOptString = "__#NIL#__" + type stringOptionalStats struct { - vals []string - min []byte - max []byte + min string + max string nils int64 maxDef uint8 } func newStringOptionalStats(d uint8) *stringOptionalStats { - return &stringOptionalStats{maxDef: d} + return &stringOptionalStats{ + min: nilOptString, + max: nilOptString, + maxDef: d, + } } func (s *stringOptionalStats) add(vals []string, defs []uint8) { @@ -1815,7 +1859,21 @@ func (s *stringOptionalStats) add(vals []string, defs []uint8) { if def < s.maxDef { s.nils++ } else { - s.vals = append(s.vals, vals[i]) + val := vals[i] + if s.min == nilOptString { + s.min = val + } else { + if val < s.min { + s.min = val + } + } + if s.max == nilOptString { + s.max = val + } else { + if val > s.max { + s.max = val + } + } i++ } } @@ -1830,43 +1888,48 @@ func (s *stringOptionalStats) DistinctCount() *int64 { } func (s *stringOptionalStats) Min() []byte { - if s.min == nil { - s.minMax() + if s.min == nilOptString { + return nil } - return s.min + return []byte(s.min) } func (s *stringOptionalStats) Max() []byte { - if s.max == nil { - s.minMax() + if s.max == nilOptString { + return nil } - return s.max + return []byte(s.max) } -func (s *stringOptionalStats) minMax() { - if len(s.vals) == 0 { - return - } - - tmp := make([]string, len(s.vals)) - copy(tmp, s.vals) - sort.Strings(tmp) - s.min = []byte(tmp[0]) - s.max = []byte(tmp[len(tmp)-1]) -} +const nilString = "__#NIL#__" type stringStats struct { - vals []string - min []byte - max []byte + min string + max string } func newStringStats() *stringStats { - return &stringStats{} + return &stringStats{ + min: nilString, + max: nilString, + } } func (s *stringStats) add(val string) { - s.vals = append(s.vals, val) + if s.min == nilString { + s.min = val + } else { + if val < s.min { + s.min = val + } + } + if s.max == nilString { + s.max = val + } else { + if val > s.max { + s.max = val + } + } } func (s *stringStats) NullCount() *int64 { @@ -1878,29 +1941,17 @@ func (s *stringStats) DistinctCount() *int64 { } func (s *stringStats) Min() []byte { - if s.min == nil { - s.minMax() + if s.min == nilString { + return nil } - return s.min + return []byte(s.min) } func (s *stringStats) Max() []byte { - if s.max == nil { - s.minMax() - } - return s.max -} - -func (s *stringStats) minMax() { - if len(s.vals) == 0 { - return + if s.max == nilString { + return nil } - - tmp := make([]string, len(s.vals)) - copy(tmp, s.vals) - sort.Strings(tmp) - s.min = []byte(tmp[0]) - s.max = []byte(tmp[len(tmp)-1]) + return []byte(s.max) } type int64optionalStats struct { @@ -1938,10 +1989,10 @@ func (f *int64optionalStats) add(vals []int64, defs []uint8) { } } -func (f *int64optionalStats) bytes(val int64) []byte { - var buf bytes.Buffer - binary.Write(&buf, binary.LittleEndian, val) - return buf.Bytes() +func (f *int64optionalStats) bytes(v int64) []byte { + bs := make([]byte, 8) + binary.LittleEndian.PutUint64(bs, uint64(v)) + return bs } func (f *int64optionalStats) NullCount() *int64 { @@ -1986,10 +2037,10 @@ func (i *int64stats) add(val int64) { } } -func (f *int64stats) bytes(val int64) []byte { - var buf bytes.Buffer - binary.Write(&buf, binary.LittleEndian, val) - return buf.Bytes() +func (f *int64stats) bytes(v int64) []byte { + bs := make([]byte, 8) + binary.LittleEndian.PutUint64(bs, uint64(v)) + return bs } func (f *int64stats) NullCount() *int64 { @@ -2043,10 +2094,10 @@ func (f *int32optionalStats) add(vals []int32, defs []uint8) { } } -func (f *int32optionalStats) bytes(val int32) []byte { - var buf bytes.Buffer - binary.Write(&buf, binary.LittleEndian, val) - return buf.Bytes() +func (f *int32optionalStats) bytes(v int32) []byte { + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, uint32(v)) + return bs } func (f *int32optionalStats) NullCount() *int64 { @@ -2091,10 +2142,10 @@ func (i *int32stats) add(val int32) { } } -func (f *int32stats) bytes(val int32) []byte { - var buf bytes.Buffer - binary.Write(&buf, binary.LittleEndian, val) - return buf.Bytes() +func (f *int32stats) bytes(v int32) []byte { + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, uint32(v)) + return bs } func (f *int32stats) NullCount() *int64 { @@ -2148,10 +2199,10 @@ func (f *float64optionalStats) add(vals []float64, defs []uint8) { } } -func (f *float64optionalStats) bytes(val float64) []byte { - var buf bytes.Buffer - binary.Write(&buf, binary.LittleEndian, val) - return buf.Bytes() +func (f *float64optionalStats) bytes(v float64) []byte { + bs := make([]byte, 8) + binary.LittleEndian.PutUint64(bs, math.Float64bits(v)) + return bs } func (f *float64optionalStats) NullCount() *int64 { @@ -2196,10 +2247,10 @@ func (i *float64stats) add(val float64) { } } -func (f *float64stats) bytes(val float64) []byte { - var buf bytes.Buffer - binary.Write(&buf, binary.LittleEndian, val) - return buf.Bytes() +func (f *float64stats) bytes(v float64) []byte { + bs := make([]byte, 8) + binary.LittleEndian.PutUint64(bs, math.Float64bits(v)) + return bs } func (f *float64stats) NullCount() *int64 { @@ -2253,10 +2304,10 @@ func (f *float32optionalStats) add(vals []float32, defs []uint8) { } } -func (f *float32optionalStats) bytes(val float32) []byte { - var buf bytes.Buffer - binary.Write(&buf, binary.LittleEndian, val) - return buf.Bytes() +func (f *float32optionalStats) bytes(v float32) []byte { + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, math.Float32bits(v)) + return bs } func (f *float32optionalStats) NullCount() *int64 { @@ -2301,10 +2352,10 @@ func (i *float32stats) add(val float32) { } } -func (f *float32stats) bytes(val float32) []byte { - var buf bytes.Buffer - binary.Write(&buf, binary.LittleEndian, val) - return buf.Bytes() +func (f *float32stats) bytes(v float32) []byte { + bs := make([]byte, 4) + binary.LittleEndian.PutUint32(bs, math.Float32bits(v)) + return bs } func (f *float32stats) NullCount() *int64 { diff --git a/performance/parquet.go b/performance/parquet.go index f4c4a48..3a7a1e5 100644 --- a/performance/parquet.go +++ b/performance/parquet.go @@ -89,12 +89,15 @@ func Fields(compression compression) []Field { } } -func readColStr0(x Message) ([]string, []uint8, []uint8) { +func readColStr0(x Message, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.ColStr0 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []string{*x.ColStr0}, []uint8{1}, nil + vals = append(vals, *x.ColStr0) + defs = append(defs, 1) + return vals, defs, reps } } @@ -117,12 +120,15 @@ func writeColStr1(x *Message, vals []string) { x.ColStr1 = vals[0] } -func readColStr2(x Message) ([]string, []uint8, []uint8) { +func readColStr2(x Message, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.ColStr2 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []string{*x.ColStr2}, []uint8{1}, nil + vals = append(vals, *x.ColStr2) + defs = append(defs, 1) + return vals, defs, reps } } @@ -145,12 +151,15 @@ func writeColStr3(x *Message, vals []string) { x.ColStr3 = vals[0] } -func readColStr4(x Message) ([]string, []uint8, []uint8) { +func readColStr4(x Message, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.ColStr4 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []string{*x.ColStr4}, []uint8{1}, nil + vals = append(vals, *x.ColStr4) + defs = append(defs, 1) + return vals, defs, reps } } @@ -173,12 +182,15 @@ func writeColStr5(x *Message, vals []string) { x.ColStr5 = vals[0] } -func readColStr6(x Message) ([]string, []uint8, []uint8) { +func readColStr6(x Message, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.ColStr6 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []string{*x.ColStr6}, []uint8{1}, nil + vals = append(vals, *x.ColStr6) + defs = append(defs, 1) + return vals, defs, reps } } @@ -201,12 +213,15 @@ func writeColStr7(x *Message, vals []string) { x.ColStr7 = vals[0] } -func readColStr8(x Message) ([]string, []uint8, []uint8) { +func readColStr8(x Message, vals []string, defs, reps []uint8) ([]string, []uint8, []uint8) { switch { case x.ColStr8 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []string{*x.ColStr8}, []uint8{1}, nil + vals = append(vals, *x.ColStr8) + defs = append(defs, 1) + return vals, defs, reps } } @@ -229,12 +244,15 @@ func writeColStr9(x *Message, vals []string) { x.ColStr9 = vals[0] } -func readColInt0(x Message) ([]int64, []uint8, []uint8) { +func readColInt0(x Message, vals []int64, defs, reps []uint8) ([]int64, []uint8, []uint8) { switch { case x.ColInt0 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []int64{*x.ColInt0}, []uint8{1}, nil + vals = append(vals, *x.ColInt0) + defs = append(defs, 1) + return vals, defs, reps } } @@ -257,12 +275,15 @@ func writeColInt1(x *Message, vals []int64) { x.ColInt1 = vals[0] } -func readColInt2(x Message) ([]int64, []uint8, []uint8) { +func readColInt2(x Message, vals []int64, defs, reps []uint8) ([]int64, []uint8, []uint8) { switch { case x.ColInt2 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []int64{*x.ColInt2}, []uint8{1}, nil + vals = append(vals, *x.ColInt2) + defs = append(defs, 1) + return vals, defs, reps } } @@ -285,12 +306,15 @@ func writeColInt3(x *Message, vals []int64) { x.ColInt3 = vals[0] } -func readColInt4(x Message) ([]int64, []uint8, []uint8) { +func readColInt4(x Message, vals []int64, defs, reps []uint8) ([]int64, []uint8, []uint8) { switch { case x.ColInt4 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []int64{*x.ColInt4}, []uint8{1}, nil + vals = append(vals, *x.ColInt4) + defs = append(defs, 1) + return vals, defs, reps } } @@ -305,12 +329,15 @@ func writeColInt4(x *Message, vals []int64, defs, reps []uint8) (int, int) { return 0, 1 } -func readColInt32_0(x Message) ([]int32, []uint8, []uint8) { +func readColInt32_0(x Message, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8) { switch { case x.ColInt32_0 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []int32{*x.ColInt32_0}, []uint8{1}, nil + vals = append(vals, *x.ColInt32_0) + defs = append(defs, 1) + return vals, defs, reps } } @@ -333,12 +360,15 @@ func writeColInt32_1(x *Message, vals []int32) { x.ColInt32_1 = vals[0] } -func readColInt32_2(x Message) ([]int32, []uint8, []uint8) { +func readColInt32_2(x Message, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8) { switch { case x.ColInt32_2 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []int32{*x.ColInt32_2}, []uint8{1}, nil + vals = append(vals, *x.ColInt32_2) + defs = append(defs, 1) + return vals, defs, reps } } @@ -361,12 +391,15 @@ func writeColInt32_3(x *Message, vals []int32) { x.ColInt32_3 = vals[0] } -func readColInt32_4(x Message) ([]int32, []uint8, []uint8) { +func readColInt32_4(x Message, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8) { switch { case x.ColInt32_4 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []int32{*x.ColInt32_4}, []uint8{1}, nil + vals = append(vals, *x.ColInt32_4) + defs = append(defs, 1) + return vals, defs, reps } } @@ -381,12 +414,15 @@ func writeColInt32_4(x *Message, vals []int32, defs, reps []uint8) (int, int) { return 0, 1 } -func readColFloat0(x Message) ([]float64, []uint8, []uint8) { +func readColFloat0(x Message, vals []float64, defs, reps []uint8) ([]float64, []uint8, []uint8) { switch { case x.ColFloat0 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []float64{*x.ColFloat0}, []uint8{1}, nil + vals = append(vals, *x.ColFloat0) + defs = append(defs, 1) + return vals, defs, reps } } @@ -409,12 +445,15 @@ func writeColFloat1(x *Message, vals []float64) { x.ColFloat1 = vals[0] } -func readColFloat2(x Message) ([]float64, []uint8, []uint8) { +func readColFloat2(x Message, vals []float64, defs, reps []uint8) ([]float64, []uint8, []uint8) { switch { case x.ColFloat2 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []float64{*x.ColFloat2}, []uint8{1}, nil + vals = append(vals, *x.ColFloat2) + defs = append(defs, 1) + return vals, defs, reps } } @@ -437,12 +476,15 @@ func writeColFloat3(x *Message, vals []float64) { x.ColFloat3 = vals[0] } -func readColFloat4(x Message) ([]float64, []uint8, []uint8) { +func readColFloat4(x Message, vals []float64, defs, reps []uint8) ([]float64, []uint8, []uint8) { switch { case x.ColFloat4 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []float64{*x.ColFloat4}, []uint8{1}, nil + vals = append(vals, *x.ColFloat4) + defs = append(defs, 1) + return vals, defs, reps } } @@ -457,12 +499,15 @@ func writeColFloat4(x *Message, vals []float64, defs, reps []uint8) (int, int) { return 0, 1 } -func readColFloat32_0(x Message) ([]float32, []uint8, []uint8) { +func readColFloat32_0(x Message, vals []float32, defs, reps []uint8) ([]float32, []uint8, []uint8) { switch { case x.ColFloat32_0 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []float32{*x.ColFloat32_0}, []uint8{1}, nil + vals = append(vals, *x.ColFloat32_0) + defs = append(defs, 1) + return vals, defs, reps } } @@ -485,12 +530,15 @@ func writeColFloat32_1(x *Message, vals []float32) { x.ColFloat32_1 = vals[0] } -func readColFloat32_2(x Message) ([]float32, []uint8, []uint8) { +func readColFloat32_2(x Message, vals []float32, defs, reps []uint8) ([]float32, []uint8, []uint8) { switch { case x.ColFloat32_2 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []float32{*x.ColFloat32_2}, []uint8{1}, nil + vals = append(vals, *x.ColFloat32_2) + defs = append(defs, 1) + return vals, defs, reps } } @@ -513,12 +561,15 @@ func writeColFloat32_3(x *Message, vals []float32) { x.ColFloat32_3 = vals[0] } -func readColFloat32_4(x Message) ([]float32, []uint8, []uint8) { +func readColFloat32_4(x Message, vals []float32, defs, reps []uint8) ([]float32, []uint8, []uint8) { switch { case x.ColFloat32_4 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []float32{*x.ColFloat32_4}, []uint8{1}, nil + vals = append(vals, *x.ColFloat32_4) + defs = append(defs, 1) + return vals, defs, reps } } @@ -533,12 +584,15 @@ func writeColFloat32_4(x *Message, vals []float32, defs, reps []uint8) (int, int return 0, 1 } -func readColBool0(x Message) ([]bool, []uint8, []uint8) { +func readColBool0(x Message, vals []bool, defs, reps []uint8) ([]bool, []uint8, []uint8) { switch { case x.ColBool0 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []bool{*x.ColBool0}, []uint8{1}, nil + vals = append(vals, *x.ColBool0) + defs = append(defs, 1) + return vals, defs, reps } } @@ -561,12 +615,15 @@ func writeColBool1(x *Message, vals []bool) { x.ColBool1 = vals[0] } -func readColBool2(x Message) ([]bool, []uint8, []uint8) { +func readColBool2(x Message, vals []bool, defs, reps []uint8) ([]bool, []uint8, []uint8) { switch { case x.ColBool2 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []bool{*x.ColBool2}, []uint8{1}, nil + vals = append(vals, *x.ColBool2) + defs = append(defs, 1) + return vals, defs, reps } } @@ -589,12 +646,15 @@ func writeColBool3(x *Message, vals []bool) { x.ColBool3 = vals[0] } -func readColBool4(x Message) ([]bool, []uint8, []uint8) { +func readColBool4(x Message, vals []bool, defs, reps []uint8) ([]bool, []uint8, []uint8) { switch { case x.ColBool4 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []bool{*x.ColBool4}, []uint8{1}, nil + vals = append(vals, *x.ColBool4) + defs = append(defs, 1) + return vals, defs, reps } } @@ -617,12 +677,15 @@ func writeColBool5(x *Message, vals []bool) { x.ColBool5 = vals[0] } -func readColBool6(x Message) ([]bool, []uint8, []uint8) { +func readColBool6(x Message, vals []bool, defs, reps []uint8) ([]bool, []uint8, []uint8) { switch { case x.ColBool6 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []bool{*x.ColBool6}, []uint8{1}, nil + vals = append(vals, *x.ColBool6) + defs = append(defs, 1) + return vals, defs, reps } } @@ -645,12 +708,15 @@ func writeColBool7(x *Message, vals []bool) { x.ColBool7 = vals[0] } -func readColBool8(x Message) ([]bool, []uint8, []uint8) { +func readColBool8(x Message, vals []bool, defs, reps []uint8) ([]bool, []uint8, []uint8) { switch { case x.ColBool8 == nil: - return nil, []uint8{0}, nil + defs = append(defs, 0) + return vals, defs, reps default: - return []bool{*x.ColBool8}, []uint8{1}, nil + vals = append(vals, *x.ColBool8) + defs = append(defs, 1) + return vals, defs, reps } } @@ -737,8 +803,10 @@ func MaxPageSize(m int) func(*ParquetWriter) error { } } +var par1 = []byte("PAR1") + func begin(p *ParquetWriter) error { - _, err := p.w.Write([]byte("PAR1")) + _, err := p.w.Write(par1) return err } @@ -801,7 +869,7 @@ func (p *ParquetWriter) Close() error { return err } - _, err := p.w.Write([]byte("PAR1")) + _, err := p.w.Write(par1) return err } @@ -994,12 +1062,12 @@ func (p *ParquetReader) Scan(x *Message) { type StringOptionalField struct { parquet.OptionalField vals []string - read func(r Message) ([]string, []uint8, []uint8) + read func(r Message, vals []string, def, rep []uint8) ([]string, []uint8, []uint8) write func(r *Message, vals []string, def, rep []uint8) (int, int) stats *stringOptionalStats } -func NewStringOptionalField(read func(r Message) ([]string, []uint8, []uint8), write func(r *Message, vals []string, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *StringOptionalField { +func NewStringOptionalField(read func(r Message, vals []string, def, rep []uint8) ([]string, []uint8, []uint8), write func(r *Message, vals []string, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *StringOptionalField { return &StringOptionalField{ read: read, write: write, @@ -1013,11 +1081,11 @@ func (f *StringOptionalField) Schema() parquet.Field { } func (f *StringOptionalField) Add(r Message) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *StringOptionalField) Scan(r *Message) { @@ -1154,12 +1222,12 @@ func (f *StringField) Levels() ([]uint8, []uint8) { type Int64OptionalField struct { parquet.OptionalField vals []int64 - read func(r Message) ([]int64, []uint8, []uint8) - write func(r *Message, vals []int64, def, rep []uint8) (int, int) + read func(r Message, vals []int64, defs, reps []uint8) ([]int64, []uint8, []uint8) + write func(r *Message, vals []int64, defs, reps []uint8) (int, int) stats *int64optionalStats } -func NewInt64OptionalField(read func(r Message) ([]int64, []uint8, []uint8), write func(r *Message, vals []int64, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Int64OptionalField { +func NewInt64OptionalField(read func(r Message, vals []int64, defs, reps []uint8) ([]int64, []uint8, []uint8), write func(r *Message, vals []int64, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Int64OptionalField { return &Int64OptionalField{ read: read, write: write, @@ -1199,11 +1267,11 @@ func (f *Int64OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Int64OptionalField) Add(r Message) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *Int64OptionalField) Scan(r *Message) { @@ -1292,12 +1360,12 @@ func (f *Int64Field) Levels() ([]uint8, []uint8) { type Int32OptionalField struct { parquet.OptionalField vals []int32 - read func(r Message) ([]int32, []uint8, []uint8) - write func(r *Message, vals []int32, def, rep []uint8) (int, int) + read func(r Message, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8) + write func(r *Message, vals []int32, defs, reps []uint8) (int, int) stats *int32optionalStats } -func NewInt32OptionalField(read func(r Message) ([]int32, []uint8, []uint8), write func(r *Message, vals []int32, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Int32OptionalField { +func NewInt32OptionalField(read func(r Message, vals []int32, defs, reps []uint8) ([]int32, []uint8, []uint8), write func(r *Message, vals []int32, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Int32OptionalField { return &Int32OptionalField{ read: read, write: write, @@ -1337,11 +1405,11 @@ func (f *Int32OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Int32OptionalField) Add(r Message) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *Int32OptionalField) Scan(r *Message) { @@ -1430,12 +1498,12 @@ func (f *Int32Field) Levels() ([]uint8, []uint8) { type Float64OptionalField struct { parquet.OptionalField vals []float64 - read func(r Message) ([]float64, []uint8, []uint8) - write func(r *Message, vals []float64, def, rep []uint8) (int, int) + read func(r Message, vals []float64, defs, reps []uint8) ([]float64, []uint8, []uint8) + write func(r *Message, vals []float64, defs, reps []uint8) (int, int) stats *float64optionalStats } -func NewFloat64OptionalField(read func(r Message) ([]float64, []uint8, []uint8), write func(r *Message, vals []float64, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Float64OptionalField { +func NewFloat64OptionalField(read func(r Message, vals []float64, defs, reps []uint8) ([]float64, []uint8, []uint8), write func(r *Message, vals []float64, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Float64OptionalField { return &Float64OptionalField{ read: read, write: write, @@ -1475,11 +1543,11 @@ func (f *Float64OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Float64OptionalField) Add(r Message) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *Float64OptionalField) Scan(r *Message) { @@ -1568,12 +1636,12 @@ func (f *Float64Field) Levels() ([]uint8, []uint8) { type Float32OptionalField struct { parquet.OptionalField vals []float32 - read func(r Message) ([]float32, []uint8, []uint8) - write func(r *Message, vals []float32, def, rep []uint8) (int, int) + read func(r Message, vals []float32, defs, reps []uint8) ([]float32, []uint8, []uint8) + write func(r *Message, vals []float32, defs, reps []uint8) (int, int) stats *float32optionalStats } -func NewFloat32OptionalField(read func(r Message) ([]float32, []uint8, []uint8), write func(r *Message, vals []float32, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Float32OptionalField { +func NewFloat32OptionalField(read func(r Message, vals []float32, defs, reps []uint8) ([]float32, []uint8, []uint8), write func(r *Message, vals []float32, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *Float32OptionalField { return &Float32OptionalField{ read: read, write: write, @@ -1613,11 +1681,11 @@ func (f *Float32OptionalField) Read(r io.ReadSeeker, pg parquet.Page) error { } func (f *Float32OptionalField) Add(r Message) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *Float32OptionalField) Scan(r *Message) { @@ -1706,12 +1774,12 @@ func (f *Float32Field) Levels() ([]uint8, []uint8) { type BoolOptionalField struct { parquet.OptionalField vals []bool - read func(r Message) ([]bool, []uint8, []uint8) + read func(r Message, vals []bool, defs, reps []uint8) ([]bool, []uint8, []uint8) write func(r *Message, vals []bool, defs, reps []uint8) (int, int) stats *boolOptionalStats } -func NewBoolOptionalField(read func(r Message) ([]bool, []uint8, []uint8), write func(r *Message, vals []bool, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *BoolOptionalField { +func NewBoolOptionalField(read func(r Message, vals []bool, defs, reps []uint8) ([]bool, []uint8, []uint8), write func(r *Message, vals []bool, defs, reps []uint8) (int, int), path []string, types []int, opts ...func(*parquet.OptionalField)) *BoolOptionalField { return &BoolOptionalField{ read: read, write: write, @@ -1749,11 +1817,11 @@ func (f *BoolOptionalField) Scan(r *Message) { } func (f *BoolOptionalField) Add(r Message) { - vals, defs, reps := f.read(r) - f.stats.add(vals, defs) - f.vals = append(f.vals, vals...) - f.Defs = append(f.Defs, defs...) - f.Reps = append(f.Reps, reps...) + vals, defs, reps := f.read(r, f.vals, f.Defs, f.Reps) + f.stats.add(vals[len(f.vals):], defs[len(f.Defs):]) + f.vals = vals + f.Defs = defs + f.Reps = reps } func (f *BoolOptionalField) Write(w io.Writer, meta *parquet.Metadata) error { diff --git a/performance/parquet_performance_test.go b/performance/parquet_performance_test.go index c5b46bc..6e5e421 100644 --- a/performance/parquet_performance_test.go +++ b/performance/parquet_performance_test.go @@ -2,11 +2,12 @@ package performance import ( "bytes" + "math/rand" + "testing" + "github.com/bxcodec/faker/v3" "github.com/parsyl/parquet/performance/base" "github.com/parsyl/parquet/performance/message" - "math/rand" - "testing" ) const (