-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] support df.to_parquet and df.read_parquet() #165
base: main
Are you sure you want to change the base?
Changes from all commits
2b5f6b5
361b30d
e0ec01a
d75f3e8
75d3734
f958ec8
b2f9d6f
07ae55d
428a66d
bc4ffaa
79e40a1
74dd9ed
8797944
cf1856a
ff5d6bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -31,6 +31,8 @@ | |||||||||||
import fsspec.spec | ||||||||||||
|
||||||||||||
import obstore as obs | ||||||||||||
from obstore import open_writer | ||||||||||||
from obstore.store import AzureStore, GCSStore, S3Store | ||||||||||||
|
||||||||||||
|
||||||||||||
class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): | ||||||||||||
|
@@ -81,6 +83,32 @@ def __init__( | |||||||||||
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size | ||||||||||||
) | ||||||||||||
|
||||||||||||
def _split_path(self, path: str) -> Tuple[str, str]: | ||||||||||||
""" | ||||||||||||
Split bucket and file path | ||||||||||||
|
||||||||||||
Args: | ||||||||||||
path (str): Input path, like `s3://mybucket/path/to/file` | ||||||||||||
|
||||||||||||
Examples: | ||||||||||||
>>> split_path("s3://mybucket/path/to/file") | ||||||||||||
['mybucket', 'path/to/file'] | ||||||||||||
""" | ||||||||||||
|
||||||||||||
store_with_bucket = (S3Store, GCSStore, AzureStore) | ||||||||||||
|
||||||||||||
if not isinstance(self.store, store_with_bucket): | ||||||||||||
# no bucket name in path | ||||||||||||
return "", path | ||||||||||||
|
||||||||||||
if "/" not in path: | ||||||||||||
return path, "" | ||||||||||||
else: | ||||||||||||
path_li = path.split("/") | ||||||||||||
bucket = path_li[0] | ||||||||||||
file_path = "/".join(path_li[1:]) | ||||||||||||
return (bucket, file_path) | ||||||||||||
Comment on lines
+107
to
+110
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
would do this; but what about the "://" when the protocol is included? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added following code in #198 but haven't sync here yet. This can solve the :// issue if path.startswith(self.protocol + "://"):
path = path[len(self.protocol) + 3 :]
elif path.startswith(self.protocol + "::"):
path = path[len(self.protocol) + 2 :]
path = path.rstrip("/") There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fsspec also allows "s3a" for the same thing - I don't know if you want to allow that. Also, I don't know how "::" can appear here - it exists to join path elements, not protocol to path. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I actually copied this from fsspec and thought obstore may do the similar stuff. It looks like obstore does not need this, I'll take them out then. |
||||||||||||
|
||||||||||||
async def _rm_file(self, path, **kwargs): | ||||||||||||
return await obs.delete_async(self.store, path) | ||||||||||||
|
||||||||||||
|
@@ -148,17 +176,22 @@ async def _get_file(self, rpath, lpath, **kwargs): | |||||||||||
f.write(buffer) | ||||||||||||
|
||||||||||||
async def _info(self, path, **kwargs): | ||||||||||||
head = await obs.head_async(self.store, path) | ||||||||||||
return { | ||||||||||||
# Required of `info`: (?) | ||||||||||||
"name": head["path"], | ||||||||||||
"size": head["size"], | ||||||||||||
"type": "directory" if head["path"].endswith("/") else "file", | ||||||||||||
# Implementation-specific keys | ||||||||||||
"e_tag": head["e_tag"], | ||||||||||||
"last_modified": head["last_modified"], | ||||||||||||
"version": head["version"], | ||||||||||||
} | ||||||||||||
try: | ||||||||||||
head = await obs.head_async(self.store, path) | ||||||||||||
return { | ||||||||||||
# Required of `info`: (?) | ||||||||||||
"name": head["path"], | ||||||||||||
"size": head["size"], | ||||||||||||
"type": "directory" if head["path"].endswith("/") else "file", | ||||||||||||
# Implementation-specific keys | ||||||||||||
"e_tag": head["e_tag"], | ||||||||||||
"last_modified": head["last_modified"], | ||||||||||||
"version": head["version"], | ||||||||||||
} | ||||||||||||
except FileNotFoundError: | ||||||||||||
# use info in fsspec.AbstractFileSystem | ||||||||||||
loop = asyncio.get_running_loop() | ||||||||||||
return await loop.run_in_executor(None, super().info, path, **kwargs) | ||||||||||||
|
||||||||||||
async def _ls(self, path, detail=True, **kwargs): | ||||||||||||
result = await obs.list_with_delimiter_async(self.store, path) | ||||||||||||
|
@@ -177,15 +210,56 @@ async def _ls(self, path, detail=True, **kwargs): | |||||||||||
else: | ||||||||||||
return sorted([object["path"] for object in objects] + prefs) | ||||||||||||
|
||||||||||||
def _open(self, path, mode="rb", **kwargs): | ||||||||||||
def _open(self, path: str, mode="rb", **kwargs): | ||||||||||||
"""Return raw bytes-mode file-like from the file-system""" | ||||||||||||
return BufferedFileSimple(self, path, mode, **kwargs) | ||||||||||||
kylebarron marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
|
||||||||||||
_, path = self._split_path(path) | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should assert that the bucket of the path matches the bucket of the store. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @kylebarron , ['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'from_env', 'from_session', 'from_url'] It seems like obstore does not provide any attribute to get the bucket name from the store instance. Maybe we should add something like this in Rust? #[getter]
fn bucket(&self) -> Option<String> {
self.config.bucket.clone()
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the present release I think we could add a private method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is fixed in #210, so you can access bucket information out. |
||||||||||||
|
||||||||||||
if mode == "wb": | ||||||||||||
return BufferedFileWrite(self, path, mode, **kwargs) | ||||||||||||
if mode == "rb": | ||||||||||||
return BufferedFileRead(self, path, mode, **kwargs) | ||||||||||||
else: | ||||||||||||
raise ValueError(f"Only 'rb' and 'wb' mode is currently supported, got: {mode}") | ||||||||||||
|
||||||||||||
kylebarron marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
|
||||||||||||
class BufferedFileWrite(fsspec.spec.AbstractBufferedFile): | ||||||||||||
def __init__(self, *args, **kwargs): | ||||||||||||
super().__init__(*args, **kwargs) | ||||||||||||
|
||||||||||||
def _initiate_upload(self): | ||||||||||||
""" | ||||||||||||
Called by AbstractBufferedFile flusH() on the first flush | ||||||||||||
""" | ||||||||||||
self._writer = open_writer(self.fs.store, self.path) | ||||||||||||
|
||||||||||||
def _upload_chunk(self, final=False): | ||||||||||||
""" | ||||||||||||
Called every time fsspec flushes the write buffer | ||||||||||||
""" | ||||||||||||
if self.buffer and len(self.buffer.getbuffer()) > 0: | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It shouldn't be possible to get here without this condition being True There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can remove |
||||||||||||
self.buffer.seek(0) | ||||||||||||
self._writer.write(self.buffer.read()) | ||||||||||||
# flush all the data in buffer when closing | ||||||||||||
if final: | ||||||||||||
self._writer.flush() | ||||||||||||
return True | ||||||||||||
else: | ||||||||||||
return False | ||||||||||||
|
||||||||||||
def close(self): | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should also set |
||||||||||||
"""Close file | ||||||||||||
Ensure flushing the buffer | ||||||||||||
""" | ||||||||||||
if self.closed: | ||||||||||||
return | ||||||||||||
self.flush(force=True) | ||||||||||||
self._writer.close() | ||||||||||||
self.closed = True | ||||||||||||
|
||||||||||||
|
||||||||||||
class BufferedFileSimple(fsspec.spec.AbstractBufferedFile): | ||||||||||||
class BufferedFileRead(fsspec.spec.AbstractBufferedFile): | ||||||||||||
def __init__(self, fs, path, mode="rb", **kwargs): | ||||||||||||
if mode != "rb": | ||||||||||||
raise ValueError("Only 'rb' mode is currently supported") | ||||||||||||
kylebarron marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
super().__init__(fs, path, mode, **kwargs) | ||||||||||||
|
||||||||||||
def read(self, length: int = -1): | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wold call this _split_bucket to avoid confusion with fsspec's _split_protocol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I named it as
_split_path
to align with the naming in s3fs here, as this function is doing the same thing as s3fs'ssplit_path