Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] support df.to_parquet and df.read_parquet() #165

Open
wants to merge 15 commits into
base: main
Choose a base branch
from

Conversation

machichima
Copy link

@machichima machichima commented Jan 27, 2025

Add write() function for BufferedFileSimple used whan calling fsspec.open().

def _open(self, path, mode="rb", **kwargs):
"""Return raw bytes-mode file-like from the file-system"""
return BufferedFileSimple(self, path, mode, **kwargs)
class BufferedFileSimple(fsspec.spec.AbstractBufferedFile):
def __init__(self, fs, path, mode="rb", **kwargs):
if mode != "rb":
raise ValueError("Only 'rb' mode is currently supported")
super().__init__(fs, path, mode, **kwargs)

Related to issue #164

@@ -28,6 +28,7 @@
import fsspec.spec

import obstore as obs
from obstore import open_reader, open_writer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the fsspec classes are async shouldn't we use the async reader and writer?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I used this as original BufferedFileSimple inherit from AbstractBufferedFile, which is not async. I will change it to AbstractAsyncStreamedFile and use async method

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like df.to_parquet() and df.read_parquet() does not support async, as I got error like RuntimeWarning: coroutine 'AbstractAsyncStreamedFile.write' was never awaited if trying to change to async

@machichima machichima changed the title [WIP] fsspec write method for open() [WIP] support df.to_parquet and df.read_parquet() Jan 30, 2025
@machichima
Copy link
Author

I found that there's also some bug in checking if parquet file exists in info() so I rename the title

@machichima
Copy link
Author

Hi @kylebarron ,

I am wondering about the test here. Originally, fs.info("dir") for directory will raise file not found error, which cause error in using df.to_parquet(). After fixing it, the line fs.cat("dir", recursive=True) will raise FileNotFoundError for "dir" as fs.info("dir") has no error so "dir" will be processed.

def test_multi_file_ops(fs):
data = {"dir/test1": b"test data1", "dir/test2": b"test data2"}
fs.pipe(data)
out = fs.cat(list(data))
assert out == data
out = fs.cat("dir", recursive=True)
assert out == data
fs.cp("dir", "dir2", recursive=True)
out = fs.find("", detail=False)
assert out == ["afile", "dir/test1", "dir/test2", "dir2/test1", "dir2/test2"]
fs.rm(["dir", "dir2"], recursive=True)
out = fs.find("", detail=False)
assert out == ["afile"]

Should I try to make its output as {"dir/test1": b"test data1", "dir/test2": b"test data2"} here? Which requires to override _cat() in fsspec as follow

    async def _cat(
        self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
    ):
        paths = await self._expand_path(path, recursive=recursive)
        coros = [self._cat_file(path, **kwargs) for path in paths if not self._isdir(path)]   # ignore dir for cat_file
        batch_size = batch_size or self.batch_size

Refer to fsspec, it simply gives FileNotFoundError when doing so. Maybe we can just remove this line or make it assert if FileNotFound raise?

@kylebarron
Copy link
Member

@martindurant wrote that test and is obviously more familiar with fsspec than I am... @martindurant do you have any suggestions here?

@martindurant
Copy link
Contributor

Keep pinging me until I have a chance to look at this :)

"""Return raw bytes-mode file-like from the file-system"""
assert mode in (
"rb",
"wb",
), f"Only 'rb' and 'wb' mode is currently supported, got: {mode}"

_, path = self._split_path(path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should assert that the bucket of the path matches the bucket of the store.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kylebarron ,
I run print(dir(store)) and it outputs:

['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'from_env', 'from_session', 'from_url']

It seems like obstore does not provide any attribute to get the bucket name from the store instance. Maybe we should add something like this in Rust?

#[getter]
fn bucket(&self) -> Option<String> {
    self.config.bucket.clone()
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the present release I think we could add a private method _bucket, however bucket won't always be defined, such as when the store is created by url.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixed in #210, so you can access bucket information out.

Copy link
Contributor

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some comments on the code as it stands.

However, the outstanding issues is: how to construct these instances via fsspec.open(). It would mean

  • registering each of the expected protocols (s3, gs, ab) to override the fsspec default ones. Perhaps a top-level function in obstore would do this explicitly (I wouldn't do it implicitly on import).
  • writing a _get_kwargs_from_urls to create the right obstore instance for the given path(s), including the bucket. This would also be a way to stash the value of the bucket, for later asserting the paths are right.

The alternative way, annoying for the user, would be to explicitly pass a premade instance with filesystem= (sometimes fs=) to the given loading function.

@@ -81,6 +83,32 @@ def __init__(
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size
)

def _split_path(self, path: str) -> Tuple[str, str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wold call this _split_bucket to avoid confusion with fsspec's _split_protocol

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named it as _split_path to align with the naming in s3fs here, as this function is doing the same thing as s3fs's split_path

Comment on lines +107 to +110
path_li = path.split("/")
bucket = path_li[0]
file_path = "/".join(path_li[1:])
return (bucket, file_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
path_li = path.split("/")
bucket = path_li[0]
file_path = "/".join(path_li[1:])
return (bucket, file_path)
return path.split("/", 1)

would do this; but what about the "://" when the protocol is included?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added following code in #198 but haven't sync here yet. This can solve the :// issue

if path.startswith(self.protocol + "://"):
    path = path[len(self.protocol) + 3 :]
elif path.startswith(self.protocol + "::"):
    path = path[len(self.protocol) + 2 :]
path = path.rstrip("/")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fsspec also allows "s3a" for the same thing - I don't know if you want to allow that.

Also, I don't know how "::" can appear here - it exists to join path elements, not protocol to path.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I actually copied this from fsspec and thought obstore may do the similar stuff. It looks like obstore does not need this, I'll take them out then.

"version": head["version"],
}
except FileNotFoundError:
# try ls, refer to the info implementation in fsspec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this PR need the extra code? Are you trying open() with globs? I don't know the details on head_async, whether it might already achieve this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the code that stores the parquet as: file.csv/00000, file.csv/00001, ...etc, when reading the file.csv/ from s3, the info() will give FileNotFoundError. As I known, s3's folder is not an object but a prefix, which cause this error from happending. So I add the code if getting FileNotFoundError in head_async to solve it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, the same old "is it a folder" problem - I am well familiar with this.

obstore/python/obstore/fsspec.py Outdated Show resolved Hide resolved
obstore/python/obstore/fsspec.py Show resolved Hide resolved
else:
return False

def close(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also set self.closed = True

obstore/python/obstore/fsspec.py Outdated Show resolved Hide resolved
"""
Called every time fsspec flushes the write buffer
"""
if self.buffer and len(self.buffer.getbuffer()) > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be possible to get here without this condition being True

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove self.buffer here, but for len(self.buffer.getbuffer()) > 0, I think we might need this when flush(force=True) when closing? Which does not ensure buffer contains data. See: https://github.com/fsspec/filesystem_spec/blob/f30bc759f30327dfb499f37e967648f175750fac/fsspec/spec.py#L2041

Co-authored-by: Martin Durant <martindurant@users.noreply.github.com>
@kylebarron
Copy link
Member

I made some comments on the code as it stands.

Thank you!

  • registering each of the expected protocols (s3, gs, ab) to override the fsspec default ones. Perhaps a top-level function in obstore would do this explicitly (I wouldn't do it implicitly on import).

I'm in favor of this approach. I definitely wouldn't do it explicitly on import, but I'd propose we have obstore.fsspec.register() which would register these protocols with fsspec's registry.

@machichima
Copy link
Author

machichima commented Feb 6, 2025

I made some comments on the code as it stands.

However, the outstanding issues is: how to construct these instances via fsspec.open(). It would mean

  • registering each of the expected protocols (s3, gs, ab) to override the fsspec default ones. Perhaps a top-level function in obstore would do this explicitly (I wouldn't do it implicitly on import).
  • writing a _get_kwargs_from_urls to create the right obstore instance for the given path(s), including the bucket. This would also be a way to stash the value of the bucket, for later asserting the paths are right.

The alternative way, annoying for the user, would be to explicitly pass a premade instance with filesystem= (sometimes fs=) to the given loading function.

Hi @martindurant ,

I've opened a new draft PR for this to ensure consistency in how instances are constructed across methods. My goal is to align the usage with fsspec.

With this PR, obstore can be registered as an fsspec storage backend using:

fsspec.register_implementation("s3", S3FsspecStore)

The bucket is extracted from the file path and used as a cache key when creating obstore objects. Here's an example usage that I would like to achieve:

fsspec.register_implementation("s3", S3FsspecStore)
fs: AsyncFsspecStore = fsspec.filesystem(
    "s3",
    config={
        "endpoint": "http://localhost:30002",
        "access_key_id": "minio",
        "secret_access_key": "miniostorage",
        "virtual_hosted_style_request": True,  # path contain bucket name
    },
    client_options={"timeout": "99999s", "allow_http": "true"},
    retry_config={
        "max_retries": 2,
        "backoff": {
            "base": 2,
            "init_backoff": timedelta(seconds=2),
            "max_backoff": timedelta(seconds=16),
        },
        "retry_timeout": timedelta(minutes=3),
    },
)

fs.cat_file("my-s3-bucket/test.txt")

Does this align with your expectations? Please let me know if you have any suggestions!
Thanks!

@machichima
Copy link
Author

I'm in favor of this approach. I definitely wouldn't do it explicitly on import, but I'd propose we have obstore.fsspec.register() which would register these protocols with fsspec's registry.

Hi @kylebarron ,

I think we can directly use fsspec.register() for this? Which can be used as: fsspec.register_implementation("s3", AsyncFsspecStore). Or do you mean that we can do something like: obstore.fsspec.register("s3") so that we do not need to create more classes inherit from AsyncFsspecStore?

"version": head["version"],
}
except FileNotFoundError:
# try ls, refer to the info implementation in fsspec
Copy link
Member

@kylebarron kylebarron Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to duplicate this from upstream? Can we just call self.info, to call the upstream code without vendoring it here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try this out

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works, I updated here:

except FileNotFoundError:
# use info in fsspec.AbstractFileSystem
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, super().info, path, **kwargs)

@kylebarron
Copy link
Member

Or do you mean that we can do something like: obstore.fsspec.register("s3") so that we do not need to create more classes inherit from AsyncFsspecStore?

I like this because it means that our fsspec subclasses could potentially stay private. So in theory the only API exported from obstore.fsspec would be register(). In practice, that might not be enough for all fsspec use cases.

But overall I think having obstore.fsspec.register, even if that function is a one-liner that wraps fsspec.register, is useful for simplicity.

@martindurant
Copy link
Contributor

I think having obstore.fsspec.register, even if that function is a one-liner that wraps fsspec.register

Exactly what I was thinking - the user can call register themselves as int he example above, but it would be useful to provide a utility function that knows what to register, so the user only needs to call one thing once.

@machichima
Copy link
Author

I will continue on this once this PR is merge, so that we can use the new way to construct the obstore insntance in open() too

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants