Skip to content

Commit 8ed1da1

Browse files
authored
Async object store support (#6)
* feat: Async support * fix: Fix Bytes output for returned get * Update object-store/src/lib.rs * fix: fix visibility
1 parent 445e9d7 commit 8ed1da1

File tree

6 files changed

+296
-12
lines changed

6 files changed

+296
-12
lines changed

Cargo.lock

+14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

+30-3
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,20 @@ implementation, with some slight adjustments for ease of use in python programs.
4141
### `ObjectStore` api
4242

4343
```py
44-
from object_store import ObjectStore, ObjectMeta
44+
from object_store import ObjectStore, ObjectMeta, Path
4545

4646
# we use an in-memory store for demonstration purposes.
4747
# data will not be persisted and is not shared across store instances
4848
store = ObjectStore("memory://")
4949

50-
store.put("data", b"some data")
50+
store.put(Path("data"), b"some data")
5151

5252
data = store.get("data")
5353
assert data == b"some data"
5454

5555
blobs = store.list()
5656

57-
meta: ObjectMeta = store.head("data")
57+
meta = store.head("data")
5858

5959
range = store.get_range("data", start=0, length=4)
6060
assert range == b"some"
@@ -64,6 +64,33 @@ copied = store.get("copied")
6464
assert copied == data
6565
```
6666

67+
#### Async api
68+
69+
```py
70+
from object_store import ObjectStore, ObjectMeta, Path
71+
72+
# we use an in-memory store for demonstration purposes.
73+
# data will not be persisted and is not shared across store instances
74+
store = ObjectStore("memory://")
75+
76+
path = Path("data")
77+
await store.put_async(path, b"some data")
78+
79+
data = await store.get_async(path)
80+
assert data == b"some data"
81+
82+
blobs = await store.list_async()
83+
84+
meta = await store.head_async(path)
85+
86+
range = await store.get_range_async(path, start=0, length=4)
87+
assert range == b"some"
88+
89+
await store.copy_async(Path("data"), Path("copied"))
90+
copied = await store.get_async(Path("copied"))
91+
assert copied == data
92+
```
93+
6794
### Configuration
6895

6996
As much as possible we aim to make access to various storage backends dependent

object-store-internal/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ once_cell = "1.12.0"
1313
object_store = { version = "0.9", features = ["azure", "aws", "gcp"] }
1414
percent-encoding = "2"
1515
pyo3 = { version = "0.20", default-features = false, features = ["macros"] }
16+
pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"] }
1617
thiserror = "1.0.34"
1718
tokio = { version = "1.0", features = [
1819
"macros",

object-store-internal/src/lib.rs

+201-7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod builder;
22
mod file;
33
mod utils;
44

5+
use std::borrow::Cow;
56
use std::collections::HashMap;
67
use std::fmt;
78
use std::sync::Arc;
@@ -19,7 +20,7 @@ use pyo3::exceptions::{
1920
PyException, PyFileExistsError, PyFileNotFoundError, PyNotImplementedError,
2021
};
2122
use pyo3::prelude::*;
22-
use pyo3::{types::PyBytes, PyErr};
23+
use pyo3::PyErr;
2324
use tokio::runtime::Runtime;
2425

2526
pub use builder::ObjectStoreBuilder;
@@ -514,29 +515,82 @@ impl PyObjectStore {
514515
Ok(())
515516
}
516517

518+
/// Save the provided bytes to the specified location.
519+
#[pyo3(text_signature = "($self, location, bytes)")]
520+
fn put_async<'a>(
521+
&'a self,
522+
py: Python<'a>,
523+
location: PyPath,
524+
bytes: Vec<u8>,
525+
) -> PyResult<&PyAny> {
526+
let inner = self.inner.clone();
527+
pyo3_asyncio::tokio::future_into_py(py, async move {
528+
inner
529+
.put(&location.into(), bytes.into())
530+
.await
531+
.map_err(ObjectStoreError::from)?;
532+
Ok(())
533+
})
534+
}
535+
517536
/// Return the bytes that are stored at the specified location.
518537
#[pyo3(text_signature = "($self, location)")]
519-
fn get(&self, location: PyPath) -> PyResult<Py<PyBytes>> {
538+
fn get(&self, location: PyPath) -> PyResult<Cow<[u8]>> {
520539
let obj = self
521540
.rt
522541
.block_on(get_bytes(self.inner.as_ref(), &location.into()))
523542
.map_err(ObjectStoreError::from)?;
524-
Python::with_gil(|py| Ok(PyBytes::new(py, &obj).into_py(py)))
543+
Ok(Cow::Owned(obj.to_vec()))
544+
}
545+
546+
/// Return the bytes that are stored at the specified location.
547+
#[pyo3(text_signature = "($self, location)")]
548+
fn get_async<'a>(&'a self, py: Python<'a>, location: PyPath) -> PyResult<&PyAny> {
549+
let inner = self.inner.clone();
550+
pyo3_asyncio::tokio::future_into_py(py, async move {
551+
let obj = get_bytes(inner.as_ref(), &location.into())
552+
.await
553+
.map_err(ObjectStoreError::from)?;
554+
Ok(Cow::<[u8]>::Owned(obj.to_vec()))
555+
})
525556
}
526557

527558
/// Return the bytes that are stored at the specified location in the given byte range
528559
#[pyo3(text_signature = "($self, location, start, length)")]
529-
fn get_range(&self, location: PyPath, start: usize, length: usize) -> PyResult<Py<PyBytes>> {
560+
fn get_range(&self, location: PyPath, start: usize, length: usize) -> PyResult<Cow<[u8]>> {
530561
let range = std::ops::Range {
531562
start,
532563
end: start + length,
533564
};
534565
let obj = self
535566
.rt
536567
.block_on(self.inner.get_range(&location.into(), range))
537-
.map_err(ObjectStoreError::from)?
538-
.to_vec();
539-
Python::with_gil(|py| Ok(PyBytes::new(py, &obj).into_py(py)))
568+
.map_err(ObjectStoreError::from)?;
569+
Ok(Cow::Owned(obj.to_vec()))
570+
}
571+
572+
/// Return the bytes that are stored at the specified location in the given byte range
573+
#[pyo3(text_signature = "($self, location, start, length)")]
574+
fn get_range_async<'a>(
575+
&'a self,
576+
py: Python<'a>,
577+
location: PyPath,
578+
start: usize,
579+
length: usize,
580+
) -> PyResult<&PyAny> {
581+
let inner = self.inner.clone();
582+
let range = std::ops::Range {
583+
start,
584+
end: start + length,
585+
};
586+
587+
pyo3_asyncio::tokio::future_into_py(py, async move {
588+
let obj = inner
589+
.get_range(&location.into(), range)
590+
.await
591+
.map_err(ObjectStoreError::from)?;
592+
Ok(Cow::<[u8]>::Owned(obj.to_vec()))
593+
})
540594
}
541595

542596
/// Return the metadata for the specified location
@@ -549,6 +603,19 @@ impl PyObjectStore {
549603
Ok(meta.into())
550604
}
551605

606+
/// Return the metadata for the specified location
607+
#[pyo3(text_signature = "($self, location)")]
608+
fn head_async<'a>(&'a self, py: Python<'a>, location: PyPath) -> PyResult<&PyAny> {
609+
let inner = self.inner.clone();
610+
pyo3_asyncio::tokio::future_into_py(py, async move {
611+
let meta = inner
612+
.head(&location.into())
613+
.await
614+
.map_err(ObjectStoreError::from)?;
615+
Ok(PyObjectMeta::from(meta))
616+
})
617+
}
618+
552619
/// Delete the object at the specified location.
553620
#[pyo3(text_signature = "($self, location)")]
554621
fn delete(&self, location: PyPath) -> PyResult<()> {
@@ -558,6 +625,19 @@ impl PyObjectStore {
558625
Ok(())
559626
}
560627

628+
/// Delete the object at the specified location.
629+
#[pyo3(text_signature = "($self, location)")]
630+
fn delete_async<'a>(&'a self, py: Python<'a>, location: PyPath) -> PyResult<&PyAny> {
631+
let inner = self.inner.clone();
632+
pyo3_asyncio::tokio::future_into_py(py, async move {
633+
inner
634+
.delete(&location.into())
635+
.await
636+
.map_err(ObjectStoreError::from)?;
637+
Ok(())
638+
})
639+
}
640+
561641
/// List all the objects with the given prefix.
562642
///
563643
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
@@ -576,6 +656,25 @@ impl PyObjectStore {
576656
.collect())
577657
}
578658

659+
/// List all the objects with the given prefix.
660+
///
661+
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
662+
/// of `foo/bar/x` but not of `foo/bar_baz/x`.
663+
#[pyo3(text_signature = "($self, prefix)")]
664+
fn list_async<'a>(&'a self, py: Python<'a>, prefix: Option<PyPath>) -> PyResult<&PyAny> {
665+
let inner = self.inner.clone();
666+
pyo3_asyncio::tokio::future_into_py(py, async move {
667+
let object_metas = flatten_list_stream(inner.as_ref(), prefix.map(Path::from).as_ref())
668+
.await
669+
.map_err(ObjectStoreError::from)?;
670+
let py_object_metas = object_metas
671+
.into_iter()
672+
.map(PyObjectMeta::from)
673+
.collect::<Vec<_>>();
674+
Ok(py_object_metas)
675+
})
676+
}
677+
579678
/// List objects with the given prefix and an implementation specific
580679
/// delimiter. Returns common prefixes (directories) in addition to object
581680
/// metadata.
@@ -594,6 +693,28 @@ impl PyObjectStore {
594693
Ok(list.into())
595694
}
596695

696+
/// List objects with the given prefix and an implementation specific
697+
/// delimiter. Returns common prefixes (directories) in addition to object
698+
/// metadata.
699+
///
700+
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix
701+
/// of `foo/bar/x` but not of `foo/bar_baz/x`.
702+
#[pyo3(text_signature = "($self, prefix)")]
703+
fn list_with_delimiter_async<'a>(
704+
&'a self,
705+
py: Python<'a>,
706+
prefix: Option<PyPath>,
707+
) -> PyResult<&PyAny> {
708+
let inner = self.inner.clone();
709+
pyo3_asyncio::tokio::future_into_py(py, async move {
710+
let list_result = inner
711+
.list_with_delimiter(prefix.map(Path::from).as_ref())
712+
.await
713+
.map_err(ObjectStoreError::from)?;
714+
Ok(PyListResult::from(list_result))
715+
})
716+
}
717+
597718
/// Copy an object from one path to another in the same object store.
598719
///
599720
/// If there exists an object at the destination, it will be overwritten.
@@ -605,6 +726,21 @@ impl PyObjectStore {
605726
Ok(())
606727
}
607728

729+
/// Copy an object from one path to another in the same object store.
730+
///
731+
/// If there exists an object at the destination, it will be overwritten.
732+
#[pyo3(text_signature = "($self, from, to)")]
733+
fn copy_async<'a>(&'a self, py: Python<'a>, from: PyPath, to: PyPath) -> PyResult<&PyAny> {
734+
let inner = self.inner.clone();
735+
pyo3_asyncio::tokio::future_into_py(py, async move {
736+
inner
737+
.copy(&from.into(), &to.into())
738+
.await
739+
.map_err(ObjectStoreError::from)?;
740+
Ok(())
741+
})
742+
}
743+
608744
/// Copy an object from one path to another, only if destination is empty.
609745
///
610746
/// Will return an error if the destination already has an object.
@@ -616,6 +752,26 @@ impl PyObjectStore {
616752
Ok(())
617753
}
618754

755+
/// Copy an object from one path to another, only if destination is empty.
756+
///
757+
/// Will return an error if the destination already has an object.
758+
#[pyo3(text_signature = "($self, from, to)")]
759+
fn copy_if_not_exists_async<'a>(
760+
&'a self,
761+
py: Python<'a>,
762+
from: PyPath,
763+
to: PyPath,
764+
) -> PyResult<&PyAny> {
765+
let inner = self.inner.clone();
766+
pyo3_asyncio::tokio::future_into_py(py, async move {
767+
inner
768+
.copy_if_not_exists(&from.into(), &to.into())
769+
.await
770+
.map_err(ObjectStoreError::from)?;
771+
Ok(())
772+
})
773+
}
774+
619775
/// Move an object from one path to another in the same object store.
620776
///
621777
/// By default, this is implemented as a copy and then delete source. It may not
@@ -630,6 +786,24 @@ impl PyObjectStore {
630786
Ok(())
631787
}
632788

789+
/// Move an object from one path to another in the same object store.
790+
///
791+
/// By default, this is implemented as a copy and then delete source. It may not
792+
/// check when deleting source that it was the same object that was originally copied.
793+
///
794+
/// If there exists an object at the destination, it will be overwritten.
795+
#[pyo3(text_signature = "($self, from, to)")]
796+
fn rename_async<'a>(&'a self, py: Python<'a>, from: PyPath, to: PyPath) -> PyResult<&PyAny> {
797+
let inner = self.inner.clone();
798+
pyo3_asyncio::tokio::future_into_py(py, async move {
799+
inner
800+
.rename(&from.into(), &to.into())
801+
.await
802+
.map_err(ObjectStoreError::from)?;
803+
Ok(())
804+
})
805+
}
806+
633807
/// Move an object from one path to another in the same object store.
634808
///
635809
/// Will return an error if the destination already has an object.
@@ -641,6 +815,26 @@ impl PyObjectStore {
641815
Ok(())
642816
}
643817

818+
/// Move an object from one path to another in the same object store.
819+
///
820+
/// Will return an error if the destination already has an object.
821+
#[pyo3(text_signature = "($self, from, to)")]
822+
fn rename_if_not_exists_async<'a>(
823+
&'a self,
824+
py: Python<'a>,
825+
from: PyPath,
826+
to: PyPath,
827+
) -> PyResult<&PyAny> {
828+
let inner = self.inner.clone();
829+
pyo3_asyncio::tokio::future_into_py(py, async move {
830+
inner
831+
.rename_if_not_exists(&from.into(), &to.into())
832+
.await
833+
.map_err(ObjectStoreError::from)?;
834+
Ok(())
835+
})
836+
}
837+
644838
pub fn __getnewargs__(&self) -> PyResult<(String, Option<HashMap<String, String>>)> {
645839
Ok((self.root_url.clone(), self.options.clone()))
646840
}

0 commit comments

Comments
 (0)