-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
173 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
OBJECT_STORE_OSS = "OSS" | ||
OBJECT_STORE_MINIO = "MINIO" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
import io | ||
import os | ||
from abc import ABC | ||
from pathlib import Path | ||
|
||
import minio | ||
from minio import credentials, S3Error | ||
|
||
from omnistore.objstore.objstore import ObjStore | ||
|
||
|
||
class MinIO(ObjStore): | ||
def __init__(self, endpoint: str, bucket: str): | ||
""" | ||
Construct a new client to communicate with the provider. | ||
""" | ||
auth = credentials.EnvMinioProvider() | ||
self.client = minio.Minio(endpoint, credentials=auth,secure=False) | ||
self.bucket_name = bucket | ||
|
||
# Make sure the bucket exists | ||
if not self.client.bucket_exists(bucket): | ||
self.client.make_bucket(bucket) | ||
|
||
def create_dir(self, dirname: str): | ||
if not dirname.endswith("/"): | ||
dirname += "/" | ||
empty_stream = io.BytesIO(b"") | ||
self.client.put_object(self.bucket_name, dirname, empty_stream, 0) | ||
|
||
def delete_dir(self, dirname: str): | ||
if not dirname.endswith("/"): | ||
dirname += "/" | ||
objects = self.client.list_objects( | ||
self.bucket_name, prefix=dirname, recursive=True | ||
) | ||
for obj in objects: | ||
self.client.remove_object(self.bucket_name, obj.object_name) | ||
|
||
def upload(self, src: str, dest: str): | ||
self.client.fput_object(self.bucket_name, dest, src) | ||
|
||
def upload_dir(self, src_dir: str, dest_dir: str): | ||
for file in Path(src_dir).rglob("*"): | ||
if file.is_file(): | ||
dest_path = f"{dest_dir}/{file.relative_to(src_dir)}" | ||
self.upload(str(file), dest_path) | ||
elif file.is_dir(): | ||
self.create_dir(f"{dest_dir}/{file.relative_to(src_dir)}/") | ||
|
||
def download(self, src: str, dest: str): | ||
self.client.fget_object(self.bucket_name, src, dest) | ||
|
||
def download_dir(self, src_dir: str, dest_dir: str): | ||
if not src_dir.endswith("/"): | ||
src_dir += "/" | ||
path = Path(dest_dir) | ||
if not path.exists(): | ||
path.mkdir(parents=True) | ||
objects = self.client.list_objects( | ||
self.bucket_name, prefix=src_dir, recursive=True | ||
) | ||
for obj in objects: | ||
file_path = Path(dest_dir, Path(obj.object_name).relative_to(src_dir)) | ||
if not file_path.parent.exists(): | ||
file_path.parent.mkdir(parents=True, exist_ok=True) | ||
if obj.object_name.endswith("/"): | ||
continue | ||
self.download(obj.object_name, str(file_path)) | ||
|
||
def delete(self, filename: str): | ||
self.client.remove_object(self.bucket_name, filename) | ||
|
||
def exists(self, filename: str): | ||
try: | ||
self.client.stat_object(self.bucket_name, filename) | ||
return True | ||
except S3Error as e: | ||
if e.code == "NoSuchKey": | ||
return False | ||
else: | ||
raise e |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
import os | ||
import shutil | ||
|
||
import pytest | ||
from dotenv import load_dotenv | ||
|
||
from omnistore.objstore import StoreFactory | ||
from omnistore.objstore.constant import OBJECT_STORE_MINIO | ||
|
||
load_dotenv() | ||
|
||
class TestMinio: | ||
@pytest.fixture(scope="module", autouse=True) | ||
def setup_and_teardown(self): | ||
print("Setting up the test environment.") | ||
try: | ||
os.makedirs("./test-tmp", exist_ok=True) | ||
except Exception as e: | ||
print(f"An error occurred: {e}") | ||
|
||
yield | ||
|
||
print("Tearing down the test environment.") | ||
shutil.rmtree("./test-tmp") | ||
|
||
def test_upload_and_download_files(self): | ||
endpoint = os.getenv("ENDPOINT") | ||
bucket = os.getenv("BUCKET") | ||
|
||
client = StoreFactory.new_client( | ||
provider=OBJECT_STORE_MINIO, endpoint=endpoint, bucket=bucket | ||
) | ||
assert False == client.exists("foo.txt") | ||
|
||
with open("./test-tmp/foo.txt", "w") as file: | ||
file.write("test") | ||
|
||
client.upload("./test-tmp/foo.txt", "foo.txt") | ||
assert True == client.exists("foo.txt") | ||
|
||
client.download("foo.txt", "./test-tmp/bar.txt") | ||
assert True == os.path.exists("./test-tmp/bar.txt") | ||
|
||
client.delete("foo.txt") | ||
assert False == client.exists("foo.txt") | ||
|
||
def test_upload_and_download_dir(self): | ||
endpoint = os.getenv("ENDPOINT") | ||
bucket = os.getenv("BUCKET") | ||
|
||
client = StoreFactory.new_client( | ||
provider=OBJECT_STORE_MINIO, endpoint=endpoint, bucket=bucket | ||
) | ||
assert False == client.exists("/test/foo.txt") | ||
|
||
os.makedirs("./test-tmp/test/111", exist_ok=True) | ||
with open("./test-tmp/test/111/foo.txt", "w") as file: | ||
file.write("test") | ||
|
||
client.upload_dir("./test-tmp/test", "test") | ||
assert True == client.exists("test/111/foo.txt") | ||
|
||
client.download_dir("test", "./test-tmp/test1") | ||
assert True == os.path.exists("./test-tmp/test1/111/foo.txt") | ||
|
||
client.delete_dir("test") | ||
assert False == client.exists("test/foo.txt") |