diff --git a/obstore/python/obstore/fsspec.py b/obstore/python/obstore/fsspec.py index 61bfd599..cba86ef9 100644 --- a/obstore/python/obstore/fsspec.py +++ b/obstore/python/obstore/fsspec.py @@ -31,6 +31,8 @@ import fsspec.spec import obstore as obs +from obstore import open_writer +from obstore.store import AzureStore, GCSStore, S3Store class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem): @@ -81,6 +83,32 @@ def __init__( *args, asynchronous=asynchronous, loop=loop, batch_size=batch_size ) + def _split_path(self, path: str) -> Tuple[str, str]: + """ + Split bucket and file path + + Args: + path (str): Input path, like `s3://mybucket/path/to/file` + + Examples: + >>> split_path("s3://mybucket/path/to/file") + ['mybucket', 'path/to/file'] + """ + + store_with_bucket = (S3Store, GCSStore, AzureStore) + + if not isinstance(self.store, store_with_bucket): + # no bucket name in path + return "", path + + if "/" not in path: + return path, "" + else: + path_li = path.split("/") + bucket = path_li[0] + file_path = "/".join(path_li[1:]) + return (bucket, file_path) + async def _rm_file(self, path, **kwargs): return await obs.delete_async(self.store, path) @@ -148,17 +176,22 @@ async def _get_file(self, rpath, lpath, **kwargs): f.write(buffer) async def _info(self, path, **kwargs): - head = await obs.head_async(self.store, path) - return { - # Required of `info`: (?) - "name": head["path"], - "size": head["size"], - "type": "directory" if head["path"].endswith("/") else "file", - # Implementation-specific keys - "e_tag": head["e_tag"], - "last_modified": head["last_modified"], - "version": head["version"], - } + try: + head = await obs.head_async(self.store, path) + return { + # Required of `info`: (?) + "name": head["path"], + "size": head["size"], + "type": "directory" if head["path"].endswith("/") else "file", + # Implementation-specific keys + "e_tag": head["e_tag"], + "last_modified": head["last_modified"], + "version": head["version"], + } + except FileNotFoundError: + # use info in fsspec.AbstractFileSystem + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, super().info, path, **kwargs) async def _ls(self, path, detail=True, **kwargs): result = await obs.list_with_delimiter_async(self.store, path) @@ -177,15 +210,56 @@ async def _ls(self, path, detail=True, **kwargs): else: return sorted([object["path"] for object in objects] + prefs) - def _open(self, path, mode="rb", **kwargs): + def _open(self, path: str, mode="rb", **kwargs): """Return raw bytes-mode file-like from the file-system""" - return BufferedFileSimple(self, path, mode, **kwargs) + + _, path = self._split_path(path) + + if mode == "wb": + return BufferedFileWrite(self, path, mode, **kwargs) + if mode == "rb": + return BufferedFileRead(self, path, mode, **kwargs) + else: + raise ValueError(f"Only 'rb' and 'wb' mode is currently supported, got: {mode}") + + +class BufferedFileWrite(fsspec.spec.AbstractBufferedFile): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def _initiate_upload(self): + """ + Called by AbstractBufferedFile flusH() on the first flush + """ + self._writer = open_writer(self.fs.store, self.path) + + def _upload_chunk(self, final=False): + """ + Called every time fsspec flushes the write buffer + """ + if self.buffer and len(self.buffer.getbuffer()) > 0: + self.buffer.seek(0) + self._writer.write(self.buffer.read()) + # flush all the data in buffer when closing + if final: + self._writer.flush() + return True + else: + return False + + def close(self): + """Close file + Ensure flushing the buffer + """ + if self.closed: + return + self.flush(force=True) + self._writer.close() + self.closed = True -class BufferedFileSimple(fsspec.spec.AbstractBufferedFile): +class BufferedFileRead(fsspec.spec.AbstractBufferedFile): def __init__(self, fs, path, mode="rb", **kwargs): - if mode != "rb": - raise ValueError("Only 'rb' mode is currently supported") super().__init__(fs, path, mode, **kwargs) def read(self, length: int = -1):