Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] support df.to_parquet and df.read_parquet() #165

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 90 additions & 16 deletions obstore/python/obstore/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import fsspec.spec

import obstore as obs
from obstore import open_writer
from obstore.store import AzureStore, GCSStore, S3Store


class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem):
Expand Down Expand Up @@ -81,6 +83,32 @@ def __init__(
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size
)

def _split_path(self, path: str) -> Tuple[str, str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wold call this _split_bucket to avoid confusion with fsspec's _split_protocol

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named it as _split_path to align with the naming in s3fs here, as this function is doing the same thing as s3fs's split_path

"""
Split bucket and file path

Args:
path (str): Input path, like `s3://mybucket/path/to/file`

Examples:
>>> split_path("s3://mybucket/path/to/file")
['mybucket', 'path/to/file']
"""

store_with_bucket = (S3Store, GCSStore, AzureStore)

if not isinstance(self.store, store_with_bucket):
# no bucket name in path
return "", path

if "/" not in path:
return path, ""
else:
path_li = path.split("/")
bucket = path_li[0]
file_path = "/".join(path_li[1:])
return (bucket, file_path)
Comment on lines +107 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
path_li = path.split("/")
bucket = path_li[0]
file_path = "/".join(path_li[1:])
return (bucket, file_path)
return path.split("/", 1)

would do this; but what about the "://" when the protocol is included?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added following code in #198 but haven't sync here yet. This can solve the :// issue

if path.startswith(self.protocol + "://"):
    path = path[len(self.protocol) + 3 :]
elif path.startswith(self.protocol + "::"):
    path = path[len(self.protocol) + 2 :]
path = path.rstrip("/")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fsspec also allows "s3a" for the same thing - I don't know if you want to allow that.

Also, I don't know how "::" can appear here - it exists to join path elements, not protocol to path.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I actually copied this from fsspec and thought obstore may do the similar stuff. It looks like obstore does not need this, I'll take them out then.


async def _rm_file(self, path, **kwargs):
return await obs.delete_async(self.store, path)

Expand Down Expand Up @@ -148,17 +176,22 @@ async def _get_file(self, rpath, lpath, **kwargs):
f.write(buffer)

async def _info(self, path, **kwargs):
head = await obs.head_async(self.store, path)
return {
# Required of `info`: (?)
"name": head["path"],
"size": head["size"],
"type": "directory" if head["path"].endswith("/") else "file",
# Implementation-specific keys
"e_tag": head["e_tag"],
"last_modified": head["last_modified"],
"version": head["version"],
}
try:
head = await obs.head_async(self.store, path)
return {
# Required of `info`: (?)
"name": head["path"],
"size": head["size"],
"type": "directory" if head["path"].endswith("/") else "file",
# Implementation-specific keys
"e_tag": head["e_tag"],
"last_modified": head["last_modified"],
"version": head["version"],
}
except FileNotFoundError:
# use info in fsspec.AbstractFileSystem
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, super().info, path, **kwargs)

async def _ls(self, path, detail=True, **kwargs):
result = await obs.list_with_delimiter_async(self.store, path)
Expand All @@ -177,15 +210,56 @@ async def _ls(self, path, detail=True, **kwargs):
else:
return sorted([object["path"] for object in objects] + prefs)

def _open(self, path, mode="rb", **kwargs):
def _open(self, path: str, mode="rb", **kwargs):
"""Return raw bytes-mode file-like from the file-system"""
return BufferedFileSimple(self, path, mode, **kwargs)
kylebarron marked this conversation as resolved.
Show resolved Hide resolved

_, path = self._split_path(path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should assert that the bucket of the path matches the bucket of the store.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kylebarron ,
I run print(dir(store)) and it outputs:

['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'from_env', 'from_session', 'from_url']

It seems like obstore does not provide any attribute to get the bucket name from the store instance. Maybe we should add something like this in Rust?

#[getter]
fn bucket(&self) -> Option<String> {
    self.config.bucket.clone()
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the present release I think we could add a private method _bucket, however bucket won't always be defined, such as when the store is created by url.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixed in #210, so you can access bucket information out.


if mode == "wb":
return BufferedFileWrite(self, path, mode, **kwargs)
if mode == "rb":
return BufferedFileRead(self, path, mode, **kwargs)
else:
raise ValueError(f"Only 'rb' and 'wb' mode is currently supported, got: {mode}")

kylebarron marked this conversation as resolved.
Show resolved Hide resolved

class BufferedFileWrite(fsspec.spec.AbstractBufferedFile):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def _initiate_upload(self):
"""
Called by AbstractBufferedFile flusH() on the first flush
"""
self._writer = open_writer(self.fs.store, self.path)

def _upload_chunk(self, final=False):
"""
Called every time fsspec flushes the write buffer
"""
if self.buffer and len(self.buffer.getbuffer()) > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be possible to get here without this condition being True

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove self.buffer here, but for len(self.buffer.getbuffer()) > 0, I think we might need this when flush(force=True) when closing? Which does not ensure buffer contains data. See: https://github.com/fsspec/filesystem_spec/blob/f30bc759f30327dfb499f37e967648f175750fac/fsspec/spec.py#L2041

self.buffer.seek(0)
self._writer.write(self.buffer.read())
# flush all the data in buffer when closing
if final:
self._writer.flush()
return True
else:
return False

def close(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also set self.closed = True

"""Close file
Ensure flushing the buffer
"""
if self.closed:
return
self.flush(force=True)
self._writer.close()
self.closed = True


class BufferedFileSimple(fsspec.spec.AbstractBufferedFile):
class BufferedFileRead(fsspec.spec.AbstractBufferedFile):
def __init__(self, fs, path, mode="rb", **kwargs):
if mode != "rb":
raise ValueError("Only 'rb' mode is currently supported")
kylebarron marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(fs, path, mode, **kwargs)

def read(self, length: int = -1):
Expand Down
Loading