Skip to content

Commit

Permalink
added comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cswank committed Mar 13, 2019
1 parent f199c15 commit 748e2b2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
19 changes: 19 additions & 0 deletions fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ func NewRequiredField(col string, opts ...func(*RequiredField)) RequiredField {
return r
}

// RequiredFieldSnappy sets the compression for a column to snappy
// It is an optional arg to NewRequiredField
func RequiredFieldSnappy(r *RequiredField) {
r.compression = sch.CompressionCodec_SNAPPY
}

// RequiredFieldUncompressed sets the compression to none
// It is an optional arg to NewRequiredField
func RequiredFieldUncompressed(r *RequiredField) {
r.compression = sch.CompressionCodec_UNCOMPRESSED
}
Expand Down Expand Up @@ -85,14 +89,20 @@ func NewOptionalField(col string, opts ...func(*OptionalField)) OptionalField {
return f
}

// OptionalFieldSnappy sets the compression for a column to snappy
// It is an optional arg to NewOptionalField
func OptionalFieldSnappy(r *OptionalField) {
r.compression = sch.CompressionCodec_SNAPPY
}

// OptionalFieldUncompressed sets the compression to none
// It is an optional arg to NewOptionalField
func OptionalFieldUncompressed(o *OptionalField) {
o.compression = sch.CompressionCodec_UNCOMPRESSED
}

// Values reads the definition levels and uses them
// to return the values from the page data.
func (f *OptionalField) Values() int {
return valsFromDefs(f.Defs)
}
Expand All @@ -107,6 +117,8 @@ func valsFromDefs(defs []int64) int {
return out
}

// DoWrite is called by all optional field types to write the definition levels
// and raw data to the io.Writer
func (f *OptionalField) DoWrite(w io.Writer, meta *Metadata, vals []byte, count int) error {
buf := bytes.Buffer{}
wc := &writeCounter{w: &buf}
Expand All @@ -125,6 +137,8 @@ func (f *OptionalField) DoWrite(w io.Writer, meta *Metadata, vals []byte, count
return err
}

// DoRead is called by all optional fields. It reads the definition levels and uses
// them to interpret the raw data.
func (f *OptionalField) DoRead(r io.ReadSeeker, meta *Metadata, pg Page) (io.Reader, []int, error) {
var nRead int
var out []byte
Expand Down Expand Up @@ -152,15 +166,20 @@ func (f *OptionalField) DoRead(r io.ReadSeeker, meta *Metadata, pg Page) (io.Rea
return bytes.NewBuffer(out), sizes, nil
}

// Name returns the column name of this field
func (f *OptionalField) Name() string {
return f.col
}

// writeCounter keeps track of the number of bytes written
// it is used for calls to binary.Write, which does not
// return the number of bytes written.
type writeCounter struct {
n int64
w io.Writer
}

// Write makes writeCounter an io.Writer
func (w *writeCounter) Write(p []byte) (int, error) {
n, err := w.w.Write(p)
w.n += int64(n)
Expand Down
19 changes: 18 additions & 1 deletion parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
sch "github.com/parsyl/parquet/generated"
)

// Field holds the type information for a parquet column
type Field struct {
Name string
Type FieldFunc
Expand All @@ -31,6 +32,9 @@ type schema struct {
lookup map[string]sch.SchemaElement
}

// Metadata keeps track of the things that need to
// be kept track of in order to write the FileMetaData
// at the end of the parquet file.
type Metadata struct {
ts *thrift.TSerializer
schema schema
Expand All @@ -40,6 +44,8 @@ type Metadata struct {
metadata *sch.FileMetaData
}

// New returns a Metadata struct and reads the first row group
// into memory.
func New(fields ...Field) *Metadata {
ts := thrift.NewTSerializer()
ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport)
Expand All @@ -52,13 +58,15 @@ func New(fields ...Field) *Metadata {
return m
}

// StartRowGroup is called when starting a new row group
func (m *Metadata) StartRowGroup(fields ...Field) {
m.rowGroups = append(m.rowGroups, RowGroup{
fields: schemaElements(fields),
columns: make(map[string]sch.ColumnChunk),
})
}

// RowGroups returns a summary of each schema.RowGroup
func (m *Metadata) RowGroups() []RowGroup {
rgs := make([]RowGroup, len(m.metadata.RowGroups))
for i, rg := range m.metadata.RowGroups {
Expand All @@ -70,7 +78,7 @@ func (m *Metadata) RowGroups() []RowGroup {
return rgs
}

// WritePageHeader indicates you are done writing this columns's chunk
// WritePageHeader is called when no more data is written to a column chunk
func (m *Metadata) WritePageHeader(w io.Writer, col string, dataLen, compressedLen, count int, comp sch.CompressionCodec) error {
m.rows += int64(count)
ph := &sch.PageHeader{
Expand Down Expand Up @@ -172,6 +180,9 @@ func (m *Metadata) Footer(w io.Writer) error {
return binary.Write(w, binary.LittleEndian, uint32(n))
}

// RowGroup wraps schema.RowGroup and adds accounting functions
// that are used to keep track of number of rows written, byte size,
// etc.
type RowGroup struct {
fields schema
rowGroup sch.RowGroup
Expand All @@ -181,6 +192,7 @@ type RowGroup struct {
Rows int64
}

// Columns returns the Columns of the row group.
func (r *RowGroup) Columns() []*sch.ColumnChunk {
return r.rowGroup.Columns
}
Expand Down Expand Up @@ -266,13 +278,15 @@ func (m *Metadata) Pages() (map[string][]Page, error) {
return out, nil
}

// PageHeader reads the page header from a column page
func (m *Metadata) PageHeader(r io.ReadSeeker) (*sch.PageHeader, error) {
p := thrift.NewTCompactProtocol(&thrift.StreamTransport{Reader: r})
pg := &sch.PageHeader{}
err := pg.Read(p)
return pg, err
}

// ReadMetaData reads the FileMetaData from the end of a parquet file
func ReadMetaData(r io.ReadSeeker) (*sch.FileMetaData, error) {
p := thrift.NewTCompactProtocol(&thrift.StreamTransport{Reader: r})
size, err := getMetaDataSize(r)
Expand All @@ -289,12 +303,14 @@ func ReadMetaData(r io.ReadSeeker) (*sch.FileMetaData, error) {
return m, m.Read(p)
}

// ReadFooter reads the parquet metadata
func (m *Metadata) ReadFooter(r io.ReadSeeker) error {
meta, err := ReadMetaData(r)
m.metadata = meta
return err
}

// FieldFunc is used to set some of the metadata for each column
type FieldFunc func(*sch.SchemaElement)

func RepetitionRequired(se *sch.SchemaElement) {
Expand Down Expand Up @@ -351,6 +367,7 @@ func StringType(se *sch.SchemaElement) {
se.Type = &t
}

// GetBools reads a byte array and turns each bit into a bool
func GetBools(r io.Reader, n int, pageSizes []int) ([]bool, error) {
var vals [8]bool
data, _ := ioutil.ReadAll(r)
Expand Down

0 comments on commit 748e2b2

Please sign in to comment.