Skip to content
This repository has been archived by the owner on Mar 31, 2019. It is now read-only.

Commit

Permalink
works with the Arrow buffers that come from Spark
Browse files Browse the repository at this point in the history
  • Loading branch information
jpivarski committed Jun 11, 2018
1 parent fd08d78 commit f3d15b3
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions oamap/backend/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,18 @@ def chop(self, name):

def frombuffer(self, chunk, bufferindex):
def truncate(array, length, offset=0):
if length is None:
return array
else:
return array[:length + offset]
return array[:length + offset]

def mask(index, length):
unmasked = truncate(numpy.unpackbits(numpy.frombuffer(chunk.buffers()[index], dtype=numpy.uint8)).view(numpy.bool_), length)
mask = numpy.empty(len(unmasked), dtype=oamap.generator.Masked.maskdtype)
mask[unmasked] = numpy.arange(unmasked.sum(), dtype=mask.dtype)
mask[~unmasked] = oamap.generator.Masked.maskedvalue
return mask
buf = chunk.buffers()[index]
if buf is None:
return numpy.arange(length, dtype=oamap.generator.Masked.maskdtype)
else:
unmasked = truncate(numpy.unpackbits(numpy.frombuffer(buf, dtype=numpy.uint8)).view(numpy.bool_), length)
mask = numpy.empty(len(unmasked), dtype=oamap.generator.Masked.maskdtype)
mask[unmasked] = numpy.arange(unmasked.sum(), dtype=mask.dtype)
mask[~unmasked] = oamap.generator.Masked.maskedvalue
return mask

def recurse(tpe, index, length):
if isinstance(tpe, pyarrow.lib.ListType):
Expand Down Expand Up @@ -111,7 +112,7 @@ def recurse(tpe, index, length):
else:
raise NotImplementedError

return recurse(chunk.type, 0, None)
return recurse(chunk.type, 0, len(chunk))

def getall(self, names):
out = {}
Expand Down

0 comments on commit f3d15b3

Please sign in to comment.