-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] support df.to_parquet and df.read_parquet() #165
base: main
Are you sure you want to change the base?
Conversation
212fe05
to
75d3734
Compare
obstore/python/obstore/fsspec.py
Outdated
@@ -28,6 +28,7 @@ | |||
import fsspec.spec | |||
|
|||
import obstore as obs | |||
from obstore import open_reader, open_writer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the fsspec classes are async shouldn't we use the async reader and writer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, I used this as original BufferedFileSimple inherit from AbstractBufferedFile
, which is not async. I will change it to AbstractAsyncStreamedFile
and use async method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like df.to_parquet()
and df.read_parquet()
does not support async, as I got error like RuntimeWarning: coroutine 'AbstractAsyncStreamedFile.write' was never awaited
if trying to change to async
I found that there's also some bug in checking if parquet file exists in info() so I rename the title |
Hi @kylebarron , I am wondering about the test here. Originally, Lines 47 to 59 in 428a66d
Should I try to make its output as async def _cat(
self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
):
paths = await self._expand_path(path, recursive=recursive)
coros = [self._cat_file(path, **kwargs) for path in paths if not self._isdir(path)] # ignore dir for cat_file
batch_size = batch_size or self.batch_size Refer to fsspec, it simply gives |
@martindurant wrote that test and is obviously more familiar with fsspec than I am... @martindurant do you have any suggestions here? |
Keep pinging me until I have a chance to look at this :) |
"""Return raw bytes-mode file-like from the file-system""" | ||
assert mode in ( | ||
"rb", | ||
"wb", | ||
), f"Only 'rb' and 'wb' mode is currently supported, got: {mode}" | ||
|
||
_, path = self._split_path(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should assert that the bucket of the path matches the bucket of the store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @kylebarron ,
I run print(dir(store))
and it outputs:
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'from_env', 'from_session', 'from_url']
It seems like obstore does not provide any attribute to get the bucket name from the store instance. Maybe we should add something like this in Rust?
#[getter]
fn bucket(&self) -> Option<String> {
self.config.bucket.clone()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the present release I think we could add a private method _bucket
, however bucket won't always be defined, such as when the store is created by url.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fixed in #210, so you can access bucket information out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made some comments on the code as it stands.
However, the outstanding issues is: how to construct these instances via fsspec.open(). It would mean
- registering each of the expected protocols (s3, gs, ab) to override the fsspec default ones. Perhaps a top-level function in obstore would do this explicitly (I wouldn't do it implicitly on import).
- writing a _get_kwargs_from_urls to create the right obstore instance for the given path(s), including the bucket. This would also be a way to stash the value of the bucket, for later asserting the paths are right.
The alternative way, annoying for the user, would be to explicitly pass a premade instance with filesystem= (sometimes fs=) to the given loading function.
@@ -81,6 +83,32 @@ def __init__( | |||
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size | |||
) | |||
|
|||
def _split_path(self, path: str) -> Tuple[str, str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wold call this _split_bucket to avoid confusion with fsspec's _split_protocol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I named it as _split_path
to align with the naming in s3fs here, as this function is doing the same thing as s3fs's split_path
path_li = path.split("/") | ||
bucket = path_li[0] | ||
file_path = "/".join(path_li[1:]) | ||
return (bucket, file_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
path_li = path.split("/") | |
bucket = path_li[0] | |
file_path = "/".join(path_li[1:]) | |
return (bucket, file_path) | |
return path.split("/", 1) |
would do this; but what about the "://" when the protocol is included?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added following code in #198 but haven't sync here yet. This can solve the :// issue
if path.startswith(self.protocol + "://"):
path = path[len(self.protocol) + 3 :]
elif path.startswith(self.protocol + "::"):
path = path[len(self.protocol) + 2 :]
path = path.rstrip("/")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fsspec also allows "s3a" for the same thing - I don't know if you want to allow that.
Also, I don't know how "::" can appear here - it exists to join path elements, not protocol to path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I actually copied this from fsspec and thought obstore may do the similar stuff. It looks like obstore does not need this, I'll take them out then.
obstore/python/obstore/fsspec.py
Outdated
"version": head["version"], | ||
} | ||
except FileNotFoundError: | ||
# try ls, refer to the info implementation in fsspec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this PR need the extra code? Are you trying open() with globs? I don't know the details on head_async, whether it might already achieve this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the code that stores the parquet as: file.csv/00000
, file.csv/00001
, ...etc, when reading the file.csv/
from s3, the info()
will give FileNotFoundError
. As I known, s3's folder is not an object but a prefix, which cause this error from happending. So I add the code if getting FileNotFoundError
in head_async
to solve it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, the same old "is it a folder" problem - I am well familiar with this.
else: | ||
return False | ||
|
||
def close(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also set self.closed = True
""" | ||
Called every time fsspec flushes the write buffer | ||
""" | ||
if self.buffer and len(self.buffer.getbuffer()) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't be possible to get here without this condition being True
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove self.buffer
here, but for len(self.buffer.getbuffer()) > 0
, I think we might need this when flush(force=True)
when closing? Which does not ensure buffer contains data. See: https://github.com/fsspec/filesystem_spec/blob/f30bc759f30327dfb499f37e967648f175750fac/fsspec/spec.py#L2041
Co-authored-by: Martin Durant <martindurant@users.noreply.github.com>
Thank you!
I'm in favor of this approach. I definitely wouldn't do it explicitly on import, but I'd propose we have |
Hi @martindurant , I've opened a new draft PR for this to ensure consistency in how instances are constructed across methods. My goal is to align the usage with fsspec. With this PR, obstore can be registered as an fsspec storage backend using: fsspec.register_implementation("s3", S3FsspecStore) The bucket is extracted from the file path and used as a cache key when creating obstore objects. Here's an example usage that I would like to achieve: fsspec.register_implementation("s3", S3FsspecStore)
fs: AsyncFsspecStore = fsspec.filesystem(
"s3",
config={
"endpoint": "http://localhost:30002",
"access_key_id": "minio",
"secret_access_key": "miniostorage",
"virtual_hosted_style_request": True, # path contain bucket name
},
client_options={"timeout": "99999s", "allow_http": "true"},
retry_config={
"max_retries": 2,
"backoff": {
"base": 2,
"init_backoff": timedelta(seconds=2),
"max_backoff": timedelta(seconds=16),
},
"retry_timeout": timedelta(minutes=3),
},
)
fs.cat_file("my-s3-bucket/test.txt") Does this align with your expectations? Please let me know if you have any suggestions! |
Hi @kylebarron , I think we can directly use |
obstore/python/obstore/fsspec.py
Outdated
"version": head["version"], | ||
} | ||
except FileNotFoundError: | ||
# try ls, refer to the info implementation in fsspec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to duplicate this from upstream? Can we just call self.info
, to call the upstream code without vendoring it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try this out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works, I updated here:
obstore/obstore/python/obstore/fsspec.py
Lines 191 to 194 in ff5d6bd
except FileNotFoundError: | |
# use info in fsspec.AbstractFileSystem | |
loop = asyncio.get_running_loop() | |
return await loop.run_in_executor(None, super().info, path, **kwargs) |
I like this because it means that our fsspec subclasses could potentially stay private. So in theory the only API exported from But overall I think having |
Exactly what I was thinking - the user can call register themselves as int he example above, but it would be useful to provide a utility function that knows what to register, so the user only needs to call one thing once. |
I will continue on this once this PR is merge, so that we can use the new way to construct the obstore insntance in |
Add write() function for
BufferedFileSimple
used whan calling fsspec.open().obstore/obstore/python/obstore/fsspec.py
Lines 177 to 186 in b40d59b
Related to issue #164