Skip to content

Commit

Permalink
Write/Read positions of the array elements in Portable Serialization …
Browse files Browse the repository at this point in the history
…[API-1619] (#895)

* write/read positions of the array elements of type Date, Time, Timestamp, TimestampWithZone, Decimal  in portable serialization

* remove redundant comments

* address review comments
  • Loading branch information
utku-caglayan authored and yuce committed Oct 5, 2022
1 parent 372ab9a commit 45e5c04
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 62 deletions.
37 changes: 32 additions & 5 deletions internal/serialization/default_portable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (pr *DefaultPortableReader) ReadTimestampWithTimezone(fieldName string) (t
func (pr *DefaultPortableReader) ReadDateArray(fieldName string) (t []types.LocalDate) {
pos := pr.positionByField(fieldName, serialization.TypeDateArray)
pr.runAtPosition(pos, func() {
v := readArrayOfTime(pr.input, ReadPortableDate)
v := pr.readArrayOfTime(ReadPortableDate)
t = *(*[]types.LocalDate)(unsafe.Pointer(&v))
})
return
Expand All @@ -391,7 +391,7 @@ func (pr *DefaultPortableReader) ReadDateArray(fieldName string) (t []types.Loca
func (pr *DefaultPortableReader) ReadTimeArray(fieldName string) (t []types.LocalTime) {
pos := pr.positionByField(fieldName, serialization.TypeTimeArray)
pr.runAtPosition(pos, func() {
v := readArrayOfTime(pr.input, ReadPortableTime)
v := pr.readArrayOfTime(ReadPortableTime)
t = *(*[]types.LocalTime)(unsafe.Pointer(&v))
})
return
Expand All @@ -400,7 +400,7 @@ func (pr *DefaultPortableReader) ReadTimeArray(fieldName string) (t []types.Loca
func (pr *DefaultPortableReader) ReadTimestampArray(fieldName string) (t []types.LocalDateTime) {
pos := pr.positionByField(fieldName, serialization.TypeTimestampArray)
pr.runAtPosition(pos, func() {
v := readArrayOfTime(pr.input, ReadPortableTimestamp)
v := pr.readArrayOfTime(ReadPortableTimestamp)
t = *(*[]types.LocalDateTime)(unsafe.Pointer(&v))
})
return
Expand All @@ -409,7 +409,7 @@ func (pr *DefaultPortableReader) ReadTimestampArray(fieldName string) (t []types
func (pr *DefaultPortableReader) ReadTimestampWithTimezoneArray(fieldName string) (t []types.OffsetDateTime) {
pos := pr.positionByField(fieldName, serialization.TypeTimestampWithTimezoneArray)
pr.runAtPosition(pos, func() {
v := readArrayOfTime(pr.input, ReadPortableTimestampWithTimezone)
v := pr.readArrayOfTime(ReadPortableTimestampWithTimezone)
t = *(*[]types.OffsetDateTime)(unsafe.Pointer(&v))
})
return
Expand All @@ -426,7 +426,18 @@ func (pr *DefaultPortableReader) ReadDecimal(fieldName string) (d *types.Decimal
func (pr *DefaultPortableReader) ReadDecimalArray(fieldName string) (ds []types.Decimal) {
pos := pr.positionByField(fieldName, serialization.TypeDecimalArray)
pr.runAtPosition(pos, func() {
ds = ReadDecimalArray(pr.input)
l := pr.input.ReadInt32()
if l == nilArrayLength {
return
}
ds = make([]types.Decimal, l)
offset := pr.input.Position()
for i := int32(0); i < l; i++ {
pr.input.SetPosition(offset + i*Int32SizeInBytes)
pos := pr.input.ReadInt32()
pr.input.SetPosition(pos)
ds[i] = ReadDecimal(pr.input)
}
})
return
}
Expand Down Expand Up @@ -454,6 +465,22 @@ func (pr *DefaultPortableReader) runAtPosition(pos int32, f func()) {
pr.input.SetPosition(backup)
}

func (pr *DefaultPortableReader) readArrayOfTime(f func(input serialization.DataInput) time.Time) (ts []time.Time) {
l := pr.input.ReadInt32()
if l == nilArrayLength {
return
}
ts = make([]time.Time, l)
offset := pr.input.Position()
objInput := pr.input.(*ObjectDataInput)
for i := int32(0); i < l; i++ {
pos := objInput.ReadInt32AtPosition(offset + i*Int32SizeInBytes)
pr.input.SetPosition(pos)
ts[i] = f(pr.input)
}
return
}

func ReadPortableDate(i serialization.DataInput) time.Time {
y, m, d := readPortableDate(i)
return time.Date(y, m, d, 0, 0, 0, 0, time.Local)
Expand Down
50 changes: 41 additions & 9 deletions internal/serialization/default_portable_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,26 +215,26 @@ func (pw *DefaultPortableWriter) WriteTimestampWithTimezone(fieldName string, t

func (pw *DefaultPortableWriter) WriteDateArray(fieldName string, a []types.LocalDate) {
pw.setPosition(fieldName, int32(serialization.TypeDateArray))
ts := (*([]time.Time))(unsafe.Pointer(&a))
writeArrayOfTime(pw.output.ObjectDataOutput, *ts, WritePortableDate)
ts := (*[]time.Time)(unsafe.Pointer(&a))
pw.writeArrayOfTime(*ts, WritePortableDate)
}

func (pw *DefaultPortableWriter) WriteTimeArray(fieldName string, a []types.LocalTime) {
pw.setPosition(fieldName, int32(serialization.TypeTimeArray))
ts := (*([]time.Time))(unsafe.Pointer(&a))
writeArrayOfTime(pw.output.ObjectDataOutput, *ts, WritePortableTime)
ts := (*[]time.Time)(unsafe.Pointer(&a))
pw.writeArrayOfTime(*ts, WritePortableTime)
}

func (pw *DefaultPortableWriter) WriteTimestampArray(fieldName string, a []types.LocalDateTime) {
pw.setPosition(fieldName, int32(serialization.TypeTimestampArray))
ts := (*([]time.Time))(unsafe.Pointer(&a))
writeArrayOfTime(pw.output.ObjectDataOutput, *ts, WritePortableTimestamp)
ts := (*[]time.Time)(unsafe.Pointer(&a))
pw.writeArrayOfTime(*ts, WritePortableTimestamp)
}

func (pw *DefaultPortableWriter) WriteTimestampWithTimezoneArray(fieldName string, a []types.OffsetDateTime) {
pw.setPosition(fieldName, int32(serialization.TypeTimestampWithTimezoneArray))
ts := (*([]time.Time))(unsafe.Pointer(&a))
writeArrayOfTime(pw.output.ObjectDataOutput, *ts, WritePortableTimestampWithTimezone)
ts := (*[]time.Time)(unsafe.Pointer(&a))
pw.writeArrayOfTime(*ts, WritePortableTimestampWithTimezone)
}

func (pw *DefaultPortableWriter) WriteDecimal(fieldName string, d *types.Decimal) {
Expand All @@ -245,7 +245,21 @@ func (pw *DefaultPortableWriter) WriteDecimal(fieldName string, d *types.Decimal

func (pw *DefaultPortableWriter) WriteDecimalArray(fieldName string, ds []types.Decimal) {
pw.setPosition(fieldName, int32(serialization.TypeDecimalArray))
WriteDecimalArray(pw.output.ObjectDataOutput, ds)
arrLen := len(ds)
if ds == nil {
arrLen = nilArrayLength
}
pw.output.WriteInt32(int32(arrLen))
if arrLen < 1 {
return
}
offset := pw.output.Position()
pw.output.WriteZeroBytes(arrLen * Int32SizeInBytes)
for i, v := range ds {
pos := pw.output.Position()
pw.output.PWriteInt32(offset+int32(Int32SizeInBytes*i), pos)
WriteDecimal(pw.output.ObjectDataOutput, v)
}
}

func (pw *DefaultPortableWriter) GetRawDataOutput() serialization.DataOutput {
Expand Down Expand Up @@ -287,6 +301,24 @@ func (pw *DefaultPortableWriter) writeNullableField(fieldName string, fieldType
}
}

func (pw *DefaultPortableWriter) writeArrayOfTime(ts []time.Time, f func(o serialization.DataOutput, t time.Time)) {
arrLen := len(ts)
if ts == nil {
arrLen = nilArrayLength
}
pw.output.WriteInt32(int32(arrLen))
if arrLen < 1 {
return
}
offset := pw.output.Position()
pw.output.WriteZeroBytes(arrLen * Int32SizeInBytes)
for i, t := range ts {
pos := pw.output.Position()
pw.output.PWriteInt32(offset+int32(Int32SizeInBytes*i), pos)
f(pw.output.ObjectDataOutput, t)
}
}

func WritePortableDate(o serialization.DataOutput, t time.Time) {
y, m, d := t.Date()
o.WriteInt16(int16(y))
Expand Down
48 changes: 0 additions & 48 deletions internal/serialization/object_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,28 +793,6 @@ func WriteDecimal(o serialization.DataOutput, d types.Decimal) {
o.WriteInt32(int32(d.Scale()))
}

func WriteDecimalArray(o serialization.DataOutput, ds []types.Decimal) {
if len(ds) == 0 {
o.WriteInt32(nilArrayLength)
return
}
o.WriteInt32(int32(len(ds)))
for _, d := range ds {
WriteDecimal(o, d)
}
}

func writeArrayOfTime(o serialization.DataOutput, ts []time.Time, f func(o serialization.DataOutput, t time.Time)) {
if len(ts) == 0 {
o.WriteInt32(nilArrayLength)
return
}
o.WriteInt32(int32(len(ts)))
for _, t := range ts {
f(o, t)
}
}

func ReadDate(i serialization.DataInput) time.Time {
y, m, d := readDate(i)
return time.Date(y, m, d, 0, 0, 0, 0, time.Local)
Expand Down Expand Up @@ -852,19 +830,6 @@ func ReadDecimal(i serialization.DataInput) types.Decimal {
return types.NewDecimal(v, int(scale))
}

func ReadDecimalArray(i serialization.DataInput) []types.Decimal {
var ds []types.Decimal
l := i.ReadInt32()
if l == nilArrayLength {
return ds
}
ds = make([]types.Decimal, l)
for j := 0; j < int(l); j++ {
ds[j] = ReadDecimal(i)
}
return ds
}

func readDate(i serialization.DataInput) (y int, m time.Month, d int) {
y = int(i.ReadInt32())
m = time.Month(i.ReadByte())
Expand All @@ -879,16 +844,3 @@ func readTime(i serialization.DataInput) (h, m, s, nanos int) {
nanos = int(i.ReadInt32())
return
}

func readArrayOfTime(i serialization.DataInput, f func(i serialization.DataInput) time.Time) []time.Time {
var ts []time.Time
l := i.ReadInt32()
if l == nilArrayLength {
return ts
}
ts = make([]time.Time, l)
for j := 0; j < int(l); j++ {
ts[j] = f(i)
}
return ts
}

0 comments on commit 45e5c04

Please sign in to comment.