diff --git a/setup.py b/setup.py index 8c08b8e9..060c6174 100644 --- a/setup.py +++ b/setup.py @@ -41,8 +41,9 @@ def read(fname): azure_deps = ['azure-storage-blob', 'azure-common', 'azure-core'] http_deps = ['requests'] ssh_deps = ['paramiko'] +lakefs_deps = ['lakefs_client'] -all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps +all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps + lakefs_deps tests_require = all_deps + [ 'moto[server]', 'responses', @@ -65,7 +66,7 @@ def read(fname): url='https://github.com/piskvorky/smart_open', download_url='http://pypi.python.org/pypi/smart_open', - keywords='file streaming, s3, hdfs, gcs, azure blob storage', + keywords='file streaming, s3, hdfs, gcs, azure blob storage, lakefs', license='MIT', platforms='any', @@ -80,6 +81,7 @@ def read(fname): 'http': http_deps, 'webhdfs': http_deps, 'ssh': ssh_deps, + 'lakefs': lakefs_deps, }, python_requires=">=3.6,<4.0", diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py new file mode 100644 index 00000000..d0843054 --- /dev/null +++ b/smart_open/lakefs.py @@ -0,0 +1,255 @@ +from __future__ import annotations + +import dataclasses +import functools +import io +import logging +import os +import re +from typing import IO + +try: + from lakefs_client import apis, configuration, models + from lakefs_client import client as lfs_client +except ImportError: + MISSING_DEPS = True + +from smart_open import constants, utils + +SCHEME = "lakefs" + +URI_EXAMPLES = ( + "lakefs://REPO/REF/file", + "lakefs:///REPO/main/file.bz2", +) + +DEFAULT_BUFFER_SIZE = 4 * 1024**2 + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass +class ParsedURI: + scheme: str + repo: str + ref: str + key: str + + +def parse_uri(uri_as_string: str) -> ParsedURI: + """lakefs protocol URIs. + + lakeFS uses a specific format for path URIs. The URI lakefs://// + is a path to objects in the given repo and ref expression under key. This is used + both for path prefixes and for full paths. In similar fashion, lakefs:/// + identifies the repository at a ref expression, and lakefs:// identifies a repo. + """ + sr = utils.safe_urlsplit(uri_as_string) + if sr.scheme != SCHEME: + raise ValueError(f"Scheme is not `lakefs` in {uri_as_string}") + _pattern = r"^/(?P[^/]+)/(?P.+)" + _match = re.fullmatch(_pattern, sr.path) + if _match is None: + raise ValueError( + f"Missing `branch/commit` and `path` in {uri_as_string}." + "The URI should have the format of `lakefs:////`" + ) + else: + ref, key = _match.groups() + return ParsedURI(scheme=sr.scheme, repo=sr.netloc, ref=ref, key=key) + + +def open_uri(uri: str, mode: str, transport_params: dict) -> IO: + """Return a file-like object pointing to the URI. + + :param str uri: The URI to open + :param str mode: Either "rb" or "wb". + :param dict transport_params: Any additional parameters to pass to `open`. + + :returns: file-like object. + :rtype: file-like + """ + parsed_uri = parse_uri(uri) + kwargs = utils.check_kwargs(open, transport_params) + return open(parsed_uri.repo, parsed_uri.ref, parsed_uri.key, mode, **kwargs) + + +def open( + repo: str, + ref: str, + key: str, + mode: str, + client: lfs_client.LakeFSClient | None = None, + commit_message: str | None = None, + buffer_size: int = DEFAULT_BUFFER_SIZE, +): + """Open a lakefs object for reading or writing. + + Parameters + ---------- + repo: str + The name of the repository this object resides in. + ref: str + The name of the branch or commit. + key: str + The path to the object for a given repo and branch. + mode: str + The mode for opening the object. Must be either "rb" or "wb". + client: lakefs_client.client.LakeFSClient + The lakefs client to use. + commit_message: str + Only when writing. The message to include in the commit. + buffer_size: int, optional + The buffer size to use when performing I/O. + """ + if client is None: + try: + conf = configuration.Configuration( + host=os.environ["LAKECTL_SERVER_ENDPOINT_URL"], + username=os.environ["LAKECTL_CREDENTIALS_ACCESS_KEY_ID"], + password=os.environ["LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY"], + ) + client = lfs_client.LakeFSClient(conf) + except KeyError as e: + raise ValueError( + "Missing lakectl credentials. Please set " + "LAKECTL_SERVER_ENDPOINT_URL, LAKECTL_CREDENTIALS_ACCESS_KEY_ID, " + "and LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY" + ) from e + + if mode == constants.READ_BINARY: + raw = _RawReader(client, repo, ref, key) + return io.BufferedReader(raw, buffer_size) + elif mode == constants.WRITE_BINARY: + raw_writer = _RawWriter(client, repo, ref, key, commit_message) + return io.BufferedWriter(raw_writer, buffer_size) + else: + raise NotImplementedError(f"Lakefs support for mode {mode} not implemented") + + +class _RawReader(io.RawIOBase): + """Read a lakeFS object. + + Provides low-level access to the underlying lakefs api. + High level primitives are implemented using io.BufferedReader. + """ + + def __init__( + self, + client: lfs_client.LakeFSClient, + repo: str, + ref: str, + key: str, + ): + self._client = client + self._repo = repo + self._ref = ref + self._path = key + self._position = 0 + self.name = key + + def seekable(self) -> bool: + return True + + def readable(self) -> bool: + return True + + @functools.cached_property + def content_length(self) -> int: + objects: apis.ObjectsApi = self._client.objects + obj_stats: models.ObjectStats = objects.stat_object( + self._repo, self._ref, self._path + ) + return obj_stats.size_bytes + + @property + def eof(self) -> bool: + return self._position == self.content_length + + def seek(self, __offset: int, __whence: int = constants.WHENCE_START) -> int: + """Seek to the specified position. + + :param int offset: The byte offset. + :param int whence: Where the offset is from. + + :returns: The position after seeking. + :rtype: int + """ + if __whence not in constants.WHENCE_CHOICES: + raise ValueError( + "invalid whence, expected one of %r" % constants.WHENCE_CHOICES + ) + + if __whence == constants.WHENCE_START: + start = max(0, __offset) + elif __whence == constants.WHENCE_CURRENT: + start = max(0, self._position + __offset) + elif __whence == constants.WHENCE_END: + start = max(0, self.content_length + __offset) + + self._position = min(start, self.content_length) + + return self._position + + def readinto(self, __buffer) -> int | None: + """Read bytes into a pre-allocated bytes-like object __buffer. + + :param int size: number of bytes to read. + + :returns: the number of bytes read from lakefs + :rtype: int + """ + if self._position >= self.content_length: + return 0 + size = len(__buffer) + start_range = self._position + end_range = min(self.content_length, (start_range + size)) - 1 + range = f"bytes={start_range}-{end_range}" + objects: apis.ObjectsApi = self._client.objects + data = objects.get_object(self._repo, self._ref, self._path, range=range).read() + if not data: + return 0 + self._position += len(data) + __buffer[: len(data)] = data + return len(data) + + +class _RawWriter(io.RawIOBase): + """Write a lakefs object. + + Provides low-level access to the underlying lakefs api. + High level primitives are implemented using io.BufferedReader. + """ + + def __init__( + self, + client: lfs_client.LakeFSClient, + repo: str, + ref: str, + key: str, + commit_message: str | None, + ): + self._client = client + self._repo = repo + self._ref = ref + self._path = key + if commit_message: + self._message = commit_message + else: + self._message = f"Update {self._path}." + self.name = key + + def writable(self) -> bool: + return True + + def write(self, __b) -> int | None: + objects: apis.ObjectsApi = self._client.objects + commits: apis.CommitsApi = self._client.commits + stream = io.BytesIO(__b) + stream.name = self._path + object_stats = objects.upload_object( + self._repo, self._ref, self._path, content=stream + ) + message = models.CommitCreation(self._message) + _ = commits.commit(self._repo, self._ref, message) + return object_stats.size_bytes diff --git a/smart_open/tests/docker-compose.yml b/smart_open/tests/docker-compose.yml new file mode 100644 index 00000000..368a03cd --- /dev/null +++ b/smart_open/tests/docker-compose.yml @@ -0,0 +1,31 @@ +version: '3' +services: + lakefs: + image: "treeverse/lakefs:0.89.0" + ports: + - "8000:8000" + depends_on: + - "postgres" + environment: + - LAKEFS_AUTH_ENCRYPT_SECRET_KEY=${LAKEFS_AUTH_ENCRYPT_SECRET_KEY:-some random secret string} + - LAKEFS_DATABASE_TYPE=${LAKEFS_DATABASE_TYPE:-postgres} + - LAKEFS_DATABASE_POSTGRES_CONNECTION_STRING=${LAKEFS_DATABASE_POSTGRES_CONNECTION_STRING:-postgres://lakefs:lakefs@postgres/postgres?sslmode=disable} + - LAKEFS_BLOCKSTORE_TYPE=${LAKEFS_BLOCKSTORE_TYPE:-local} + - LAKEFS_BLOCKSTORE_LOCAL_PATH=${LAKEFS_BLOCKSTORE_LOCAL_PATH:-/home/lakefs} + - LAKEFS_GATEWAYS_S3_DOMAIN_NAME=${LAKEFS_GATEWAYS_S3_DOMAIN_NAME:-s3.local.lakefs.io:8000} + - LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-} + - LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_SECRET_KEY=${AWS_SECRET_ACCESS_KEY:-} + - LAKEFS_LOGGING_LEVEL=${LAKEFS_LOGGING_LEVEL:-INFO} + - LAKEFS_STATS_ENABLED + - LAKEFS_BLOCKSTORE_S3_ENDPOINT + - LAKEFS_BLOCKSTORE_S3_FORCE_PATH_STYLE + - LAKEFS_COMMITTED_LOCAL_CACHE_DIR=${LAKEFS_COMMITTED_LOCAL_CACHE_DIR:-/home/lakefs/.local_tier} + entrypoint: ["/app/wait-for", "postgres:5432", "--", "/app/lakefs", "run"] + postgres: + image: "postgres" + command: "-c log_min_messages=FATAL" + environment: + POSTGRES_USER: lakefs + POSTGRES_PASSWORD: lakefs + # logging: + # driver: none diff --git a/smart_open/tests/test_lakefs.py b/smart_open/tests/test_lakefs.py new file mode 100644 index 00000000..e9dec002 --- /dev/null +++ b/smart_open/tests/test_lakefs.py @@ -0,0 +1,341 @@ +# -*- coding: utf-8 -*- +# +# This code is distributed under the terms and conditions +# from the MIT License (MIT). +# +from __future__ import annotations + +import logging +from typing import Callable, Generator + +import pytest +from lakefs_client import apis, configuration, exceptions, models +from lakefs_client import client as lfs_client + +from smart_open.lakefs import open + +"""It needs docker compose to run lakefs locally: +https://docs.lakefs.io/quickstart/run.html + +curl https://compose.lakefs.io | docker-compose -f - up +""" +_LAKEFS_HOST = "http://localhost:8000/api/v1" + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope="module") +def lakefs() -> Generator[lfs_client.LakeFSClient, None, None]: + import os + import shlex + import subprocess + + from urllib3.exceptions import MaxRetryError + + cwd = os.path.dirname(os.path.realpath(__file__)) + subprocess.Popen(shlex.split("docker compose up -d"), cwd=cwd) + + conf = configuration.Configuration(_LAKEFS_HOST) + client = lfs_client.LakeFSClient(conf) + + healthcheck: apis.HealthCheckApi = client.healthcheck + api_available = False + while not api_available: + try: + healthcheck.health_check() + api_available = True + except (exceptions.ApiException, MaxRetryError): + continue + + comm_prefs = models.CommPrefsInput( + email="test@company.com", + feature_updates=True, + security_updates=True, + ) + username = models.Setup(username="admin") + try: + config: apis.ConfigApi = client.config + _ = config.setup_comm_prefs(comm_prefs) + credentials: models.CredentialsWithSecret = config.setup(username) + except exceptions.ApiException as e: + raise Exception("Error setting up lakefs: %s\n" % e) from e + conf = configuration.Configuration( + host=_LAKEFS_HOST, + username=credentials.access_key_id, + password=credentials.secret_access_key, + ) + os.environ["LAKECTL_SERVER_ENDPOINT_URL"] = _LAKEFS_HOST + os.environ["LAKECTL_CREDENTIALS_ACCESS_KEY_ID"] = credentials.access_key_id + os.environ["LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY"] = credentials.secret_access_key + + client = lfs_client.LakeFSClient(conf) + + repositories_api: apis.RepositoriesApi = client.repositories + new_repo = models.RepositoryCreation( + name="repo", storage_namespace="local:///home/lakefs/", default_branch="main" + ) + try: + _: models.Repository = repositories_api.create_repository(new_repo) + except exceptions.ApiException as e: + raise Exception("Error creating repository: %s\n" % e) from e + + yield client + + subprocess.Popen(shlex.split("docker compose down"), cwd=cwd) + + +@pytest.fixture(scope="module") +def repo(lakefs) -> models.Repository: + repositories_api: apis.RepositoriesApi = lakefs.repositories + return repositories_api.list_repositories().results[0] + + +@pytest.mark.parametrize( + "uri, parsed", + [ + ( + "lakefs://REPO/REF/file", + dict(scheme="lakefs", repo="REPO", ref="REF", key="file"), + ), + ( + "lakefs://REPO/REF/1/file", + dict(scheme="lakefs", repo="REPO", ref="REF", key="1/file"), + ), + pytest.param( + "lakefs://REPO/REF/1/file", + dict(scheme="lakefs", repo="REPO", ref="REF/1", key="file"), + marks=pytest.mark.xfail, + ), + ], +) +def test_parse_uri(uri, parsed): + from dataclasses import asdict + + from smart_open.lakefs import parse_uri + + assert parsed == asdict(parse_uri(uri)) + + +class TestReader: + @pytest.fixture(scope="module") + def put_to_repo( + self, + lakefs, + repo: models.Repository, + ) -> Callable: + def _put_to_repo( + path: str, + content: bytes, + branch: str | None = None, + ) -> tuple[str, bytes]: + from io import BytesIO + + objects: apis.ObjectsApi = lakefs.objects + _branch = branch if branch else repo.default_branch + stream = BytesIO(content) + stream.name = path + try: + _ = objects.upload_object(repo.id, _branch, path, content=stream) + except exceptions.ApiException as e: + raise Exception("Error uploading object: %s\n" % e) from e + return path, content + + return _put_to_repo + + @pytest.fixture(scope="module") + def file(self, put_to_repo) -> tuple[str, bytes]: + path = "test/file.txt" + content = "hello wořld\nhow are you?".encode("utf8") + return put_to_repo(path, content) + + def test_iter(self, lakefs, repo, file): + path, content = file + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) + output = [line.rstrip(b"\n") for line in fin] + assert output == content.split(b"\n") + + def test_iter_context_manager(self, lakefs, repo, file): + path, content = file + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: + output = [line.rstrip(b"\n") for line in fin] + assert output == content.split(b"\n") + + def test_read(self, lakefs, repo, file): + path, content = file + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) + assert content[:6] == fin.read(6) + assert content[6:6 + 8] == fin.read1(8) + assert content[6 + 8:] == fin.read() + + def test_readinto(self, lakefs, repo, file): + path, content = file + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) + b = bytearray(6) + assert len(b) == fin.readinto(b) + assert content[:6] == b + assert len(b) == fin.readinto1(b) + assert content[6:6 + 6] == b + + def test_seek_beginning(self, lakefs, repo, file): + path, content = file + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) + assert content[:6] == fin.read(6) + assert content[6:6 + 8] == fin.read(8) + fin.seek(0) + assert content == fin.read() + fin.seek(0) + assert content == fin.read(-1) + + def test_seek_start(self, lakefs, repo, file): + path, _ = file + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) + assert fin.seek(6) == 6 + assert fin.tell() == 6 + assert fin.read(6) == "wořld".encode("utf-8") + + def test_seek_current(self, lakefs, repo, file): + from smart_open import constants + + path, _ = file + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) + assert fin.read(5) == b"hello" + assert fin.seek(1, constants.WHENCE_CURRENT) == 6 + assert fin.read(6) == "wořld".encode("utf-8") + + def test_seek_end(self, lakefs, repo, file): + from smart_open import constants + + path, content = file + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) + assert fin.seek(-4, constants.WHENCE_END) == len(content) - 4 + assert fin.read() == b"you?" + + def test_seek_past_end(self, lakefs, repo, file): + path, content = file + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) + assert fin.seek(60) == len(content) + + def test_detect_eof(self, lakefs, repo, file): + from smart_open import constants + + path, content = file + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) + fin.read() + eof = fin.tell() + assert eof == len(content) + fin.seek(0, constants.WHENCE_END) + assert fin.tell() == eof + fin.seek(eof) + assert fin.tell() == eof + + def test_read_gzip(self, lakefs, repo, put_to_repo): + import gzip + from io import BytesIO + + expected = "раcцветали яблони и груши, поплыли туманы над рекой...".encode( + "utf-8" + ) + buf = BytesIO() + buf.close = lambda: None # keep buffer open so that we can .getvalue() + with gzip.GzipFile(fileobj=buf, mode="w") as zipfile: + zipfile.write(expected) + path = "zip/file.zip" + _ = put_to_repo(path, buf.getvalue()) + + # + # Make sure we're reading things correctly. + # + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: + assert fin.read() == buf.getvalue() + + # + # Make sure the buffer we wrote is legitimate gzip. + # + sanity_buf = BytesIO(buf.getvalue()) + with gzip.GzipFile(fileobj=sanity_buf) as zipfile: + assert zipfile.read() == expected + + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: + with gzip.GzipFile(fileobj=fin) as zipfile: + assert zipfile.read() == expected + + def test_readline(self, lakefs, repo, put_to_repo): + content = b"englishman\nin\nnew\nyork\n" + path = "many_lines.txt" + _ = put_to_repo(path, content) + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: + fin.readline() + assert fin.tell() == content.index(b"\n") + 1 + fin.seek(0) + assert list(fin) == [b"englishman\n", b"in\n", b"new\n", b"york\n"] + assert fin.tell() == len(content) + + def test_readline_tiny_buffer(self, lakefs, repo, put_to_repo): + content = b"englishman\nin\nnew\nyork\n" + path = "many_lines.txt" + _ = put_to_repo(path, content) + with open( + repo.id, repo.default_branch, path, "rb", lakefs, buffer_size=8 + ) as fin: + assert list(fin) == [b"englishman\n", b"in\n", b"new\n", b"york\n"] + assert fin.tell() == len(content) + + def test_read0_does_not_return_data(self, lakefs, repo, file): + path, _ = file + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: + assert fin.read(0) == b"" + + def test_read_past_end(self, lakefs, repo, file): + path, content = file + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: + assert fin.read(100) == content + + def test_read_empty_file(self, lakefs, repo, put_to_repo): + content = b"" + path = "empty_file.txt" + _ = put_to_repo(path, content) + with open( + repo.id, repo.default_branch, path, "rb", lakefs, buffer_size=8 + ) as fin: + assert fin.read() == b"" + + def test_open_with_transport_params(self, lakefs, repo, file): + from smart_open import open + + path, content = file + transport_params = {"client": lakefs} + uri = f"lakefs://{repo.id}/{repo.default_branch}/{path}" + with open(uri, transport_params=transport_params) as fin: + assert fin.read() == content.decode() + + def test_open_with_envvar_credentials(self, lakefs, repo, file): + from smart_open import open + + path, content = file + uri = f"lakefs://{repo.id}/{repo.default_branch}/{path}" + with open(uri) as fin: + assert fin.read() == content.decode() + + +class TestWriter: + def commits(self, lakefs, repo): + refs: apis.RefsApi = lakefs.refs + commit_list = refs.log_commits(repo.id, repo.default_branch) + return commit_list.results + + def test_write(self, lakefs, repo): + content = "ветер по морю гуляет...".encode("utf8") + path = "write/1.txt" + with open(repo.id, repo.default_branch, path, "wb", lakefs) as fout: + assert fout.write(content) == len(content) + + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: + assert fin.read() == content + + def test_commit(self, lakefs, repo): + content = "ветер по морю гуляет...".encode("utf8") + path = "write/2.txt" + message = "Modify file." + with open(repo.id, repo.default_branch, path, "wb", lakefs, message) as fout: + assert fout.write(content) == len(content) + assert self.commits(lakefs, repo)[0].message == message diff --git a/smart_open/transport.py b/smart_open/transport.py index 086ea2b0..30edebf8 100644 --- a/smart_open/transport.py +++ b/smart_open/transport.py @@ -104,6 +104,7 @@ def get_transport(scheme): register_transport("smart_open.s3") register_transport("smart_open.ssh") register_transport("smart_open.webhdfs") +register_transport("smart_open.lakefs") SUPPORTED_SCHEMES = tuple(sorted(_REGISTRY.keys())) """The transport schemes that the local installation of ``smart_open`` supports."""