From de458e0df1e4218f1deb37c2c5d358eaf32b4fd8 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Mon, 9 Jan 2023 19:46:12 +0100 Subject: [PATCH 01/23] Add test lakefs --- smart_open/tests/test_lakefs.py | 141 ++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 smart_open/tests/test_lakefs.py diff --git a/smart_open/tests/test_lakefs.py b/smart_open/tests/test_lakefs.py new file mode 100644 index 00000000..d283b523 --- /dev/null +++ b/smart_open/tests/test_lakefs.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +# +# This code is distributed under the terms and conditions +# from the MIT License (MIT). +# +import pytest +import lakefs_client +from lakefs_client import client +from lakefs_client import models +from lakefs_client import apis +import logging + + +"""It needs docker compose to run lakefs locally: +https://docs.lakefs.io/quickstart/run.html + +curl https://compose.lakefs.io | docker-compose -f - up +""" +_LAKEFS_HOST = "http://localhost:8000/api/v1" + +logger = logging.getLogger(__name__) + + +def api_available(lfs_client: client.LakeFSClient): + from urllib3.exceptions import MaxRetryError + + healthcheck: apis.HealthCheckApi = lfs_client.healthcheck + try: + healthcheck.health_check() + return True + except (lakefs_client.ApiException, MaxRetryError): + return False + + +@pytest.fixture(scope="module") +def lakefs(): + import shlex + import subprocess + from time import sleep + + compose = subprocess.Popen( + shlex.split("curl https://compose.lakefs.io"), stdout=subprocess.PIPE + ) + subprocess.Popen(shlex.split("docker-compose -f - up -d"), stdin=compose.stdout) + compose.stdout.close() + + configuration = lakefs_client.Configuration(_LAKEFS_HOST) + lfs_client = client.LakeFSClient(configuration) + + while not api_available(lfs_client): + sleep(1) + + comm_prefs = models.CommPrefsInput( + email="test@company.com", + feature_updates=True, + security_updates=True, + ) + username = models.Setup(username="admin") + try: + config: apis.ConfigApi = lfs_client.config + _ = config.setup_comm_prefs(comm_prefs) + credentials: models.CredentialsWithSecret = config.setup(username) + except lakefs_client.ApiException as e: + raise Exception( + "Error setting up lakefs: %s\n" % e + ) from lakefs_client.ApiException + configuration = lakefs_client.Configuration( + host=_LAKEFS_HOST, + username=credentials.access_key_id, + password=credentials.secret_access_key, + ) + yield client.LakeFSClient(configuration) + + compose = subprocess.Popen( + shlex.split("curl https://compose.lakefs.io"), stdout=subprocess.PIPE + ) + subprocess.Popen(shlex.split("docker-compose -f - down"), stdin=compose.stdout) + compose.stdout.close() + + +def create_repo(lfs_client: client.LakeFSClient, repo_name: str) -> models.Repository: + new_repo = models.RepositoryCreation( + name=repo_name, storage_namespace="local:///home/lakefs/", default_branch="main" + ) + try: + repositories: apis.RepositoriesApi = lfs_client.repositories + repository: models.Repository = repositories.create_repository(new_repo) + except lakefs_client.ApiException as e: + raise Exception("Error creating repository: %s\n" % e) from e + return repository + + +def put_to_repo( + lfs_client: client.LakeFSClient, + repo: models.Repository, + path: str, + content: bytes, + branch: str | None = None, +): + from io import BytesIO + + objects: apis.ObjectsApi = lfs_client.objects + _branch = branch if branch else repo.default_branch + stream = BytesIO(content) + stream.name = path + try: + obj_stats = objects.upload_object(repo.id, _branch, path, content=stream) + except lakefs_client.ApiException as e: + raise Exception("Error uploading object: %s\n" % e) from e + return obj_stats + + +class TestReader: + def test_read(self, lakefs: client.LakeFSClient): + from smart_open.lakefs import Reader + + content = "hello wořld\nhow are you?".encode("utf8") + path = "test.txt" + repo_name = "test" + repo = create_repo(lfs_client=lakefs, repo_name=repo_name) + put_to_repo(lakefs, repo, path, content) + logger.debug("content: %r, len: %r", content, len(content)) + + fin = Reader(client=lakefs, repo=repo_name, ref=repo.default_branch, path=path) + assert content[:6] == fin.read(6) + assert content[6:14] == fin.read(8) + assert content[14:] == fin.read() + + def test_iter(self, lakefs: client.LakeFSClient): + from smart_open.lakefs import Reader + + content = "hello wořld\nhow are you?".encode("utf8") + path = "test_iter.txt" + repo_name = "test-iter" + repo = create_repo(lfs_client=lakefs, repo_name=repo_name) + put_to_repo(lakefs, repo, path, content) + + # connect to fake Azure Blob Storage and read from the fake key we filled above + fin = Reader(client=lakefs, repo=repo_name, ref=repo.default_branch, path=path) + output = [line.rstrip(b"\n") for line in fin] + assert output == content.split(b"\n") From 8a9f5de26dec060047b634c85eb7a0722c41e671 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Mon, 9 Jan 2023 19:46:22 +0100 Subject: [PATCH 02/23] Add lakefs reader --- smart_open/lakefs.py | 271 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 271 insertions(+) create mode 100644 smart_open/lakefs.py diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py new file mode 100644 index 00000000..2e556edc --- /dev/null +++ b/smart_open/lakefs.py @@ -0,0 +1,271 @@ +import io +import re +import logging +import urllib.parse + +try: + from lakefs_client import client + from lakefs_client import apis + from lakefs_client import models +except ImportError: + MISSING_DEPS = True + +from smart_open import bytebuffer, constants +import smart_open.utils + +SCHEME = "lakefs" + +URI_EXAMPLES = ( + "lakefs://REPO/REF/file", + "lakefs:///REPO/main/file.bz2", +) + +DEFAULT_BUFFER_SIZE = 4 * 1024**2 +"""Default buffer size is 256MB.""" + +DEFAULT_MAX_CONCURRENCY = 1 +"""Default number of parallel connections with which to download.""" + + +logger = logging.getLogger(__name__) + + +def parse_uri(uri_as_string): + """lakefs protocol URIs. + + lakeFS uses a specific format for path URIs. The URI lakefs://// + is a path to objects in the given repo and ref expression under key. This is used + both for path prefixes and for full paths. In similar fashion, lakefs:/// + identifies the repository at a ref expression, and lakefs:// identifes a repo. + """ + sr = urllib.parse.urlsplit(uri_as_string, allow_fragments=False) + assert sr.scheme == SCHEME + repo = sr.netloc + _pattern = r"^/(?P[^/]+)/(?P.+)" + _match = re.fullmatch(_pattern, sr.path) + if _match: + ref = _match.group("ref") + key = _match.group("key") + else: + ref = None + key = None + return dict(scheme=SCHEME, repo=repo, ref=ref, key=key) + + +def open_uri(uri, mode, transport_params): + parsed_uri = parse_uri(uri) + kwargs = smart_open.utils.check_kwargs(open, transport_params) + return open( + parsed_uri["repo"], parsed_uri["ref"], parsed_uri["key"], mode, **kwargs + ) + + +def open( + repo, + ref, + key, + client=None, + buffer_size=DEFAULT_BUFFER_SIZE, + max_concurrency=DEFAULT_MAX_CONCURRENCY, + client_kwargs=None, + writebuffer=None, +): + pass + + +class _RawReader(object): + """Read a lakeFS object.""" + + def __init__( + self, + client: client.LakeFSClient, + repo: str, + ref: str, + path: str, + ): + self._client = client + self._repo = repo + self._ref = ref + self._path = path + + self._content_length = self._get_content_length() + self._position = 0 + + def _get_content_length(self): + objects: apis.ObjectsApi = self._client.objects + obj_stats: models.ObjectStats = objects.stat_object( + self._repo, self._ref, self._path + ) + return obj_stats.size_bytes + + def seek(self, offset, whence=constants.WHENCE_START): + """Seek to the specified position. + + :param int offset: The offset in bytes. + :param int whence: Where the offset is from. + + :returns: the position after seeking. + :rtype: int + """ + if whence not in constants.WHENCE_CHOICES: + raise ValueError( + "invalid whence, expected one of %r" % constants.WHENCE_CHOICES + ) + + if whence == constants.WHENCE_START: + start = max(0, offset) + elif whence == constants.WHENCE_CURRENT: + start = max(0, self._position + offset) + elif whence == constants.WHENCE_END: + start = max(0, self._content_length + offset) + + self._position = min(start, self._content_length) + + return self._position + + def read(self, size=-1): + if self._position >= self._content_length: + return b"" + _size = max(-1, size) + objects: apis.ObjectsApi = self._client.objects + start_range = self._position + end_range = self._content_length if _size == -1 else (start_range + _size) + range = f"bytes={start_range}-{end_range}" + binary = objects.get_object( + self._repo, self._ref, self._path, range=range + ).read() + self._position += len(binary) + return binary + + +class Reader(io.BufferedIOBase): + def __init__( + self, + client: client.LakeFSClient, + repo: str, + ref: str, + path: str, + buffer_size=DEFAULT_BUFFER_SIZE, + line_terminator=smart_open.constants.BINARY_NEWLINE, + ): + self._repo = repo + self._ref = ref + self._path = path + self._raw_reader = _RawReader(client, repo, ref, path) + self._position = 0 + self._eof = False + self._buffer_size = buffer_size + self._buffer = bytebuffer.ByteBuffer(buffer_size) + self._line_terminator = line_terminator + self.raw = None + + # + # io.BufferedIOBase methods. + # + + def close(self): + """Flush and close this stream.""" + pass + + def readable(self): + """Return True if the stream can be read from.""" + return True + + def read(self, size=-1): + """Read up to size bytes from the object and return them.""" + if size == 0: + return b"" + elif size < 0: + out = self._read_from_buffer() + self._raw_reader.read() + self._position = self._raw_reader._content_length + return out + + if len(self._buffer) >= size: + return self._read_from_buffer(size) + + if self._eof: + return self._read_from_buffer() + + self._fill_buffer(size) + return self._read_from_buffer(size) + + def read1(self, size=-1): + """This is the same as read().""" + return self.read(size=size) + + def readinto(self, b): + """Read up to len(b) bytes into b, and return the number of bytes read.""" + data = self.read(len(b)) + if not data: + return 0 + b[: len(data)] = data + return len(data) + + def readline(self, limit=-1): + """Read up to and including the next newline. Returns the bytes read.""" + if limit != -1: + raise NotImplementedError("limits other than -1 not implemented yet") + + line = io.BytesIO() + while not (self._eof and len(self._buffer) == 0): + line_part = self._buffer.readline(self._line_terminator) + line.write(line_part) + self._position += len(line_part) + + if line_part.endswith(self._line_terminator): + break + else: + self._fill_buffer() + + return line.getvalue() + + def seekable(self): + """If False, seek(), tell() and truncate() will raise IOError. + + We offer only seek support, and no truncate support.""" + return True + + def detach(self): + """Unsupported.""" + raise io.UnsupportedOperation + + def tell(self): + """Return the current position within the file.""" + return self._position + + def truncate(self, size=None): + """Unsupported.""" + raise io.UnsupportedOperation + + # + # Internal methods. + # + def _read_from_buffer(self, size=-1): + size = size if size >= 0 else len(self._buffer) + part = self._buffer.read(size) + self._position += len(part) + return part + + def _fill_buffer(self, size=-1): + size = max(size, self._buffer._chunk_size) + while len(self._buffer) < size and not self._eof: + bytes_read = self._buffer.fill(self._raw_reader) + if bytes_read == 0: + logger.debug("%s: reached EOF while filling buffer", self) + self._eof = True + + def __str__(self): + return "smart_open.lakefs.Reader(%r, %r, %r)" % ( + self._repo, + self._ref, + self._path, + ) + + def __repr__(self): + return ( + "smart_open.lakefs.Reader(" + "repo=%r, " + "ref=%r, " + "path=%r, " + "buffer_size=%r" + ) % (self._repo, self._ref, self._path, self._buffer_size) From 3b3d5adb41b1eb22247cf2932bdcd9aeb0b1b62f Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Mon, 6 Mar 2023 20:11:07 +0100 Subject: [PATCH 03/23] Add test for lakefs reader --- smart_open/lakefs.py | 49 +++++- smart_open/tests/test_lakefs.py | 271 ++++++++++++++++++++++++-------- 2 files changed, 246 insertions(+), 74 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index 2e556edc..dac967c2 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -1,12 +1,10 @@ +import typing import io import re import logging -import urllib.parse try: - from lakefs_client import client - from lakefs_client import apis - from lakefs_client import models + from lakefs_client import client, apis, models except ImportError: MISSING_DEPS = True @@ -38,7 +36,8 @@ def parse_uri(uri_as_string): both for path prefixes and for full paths. In similar fashion, lakefs:/// identifies the repository at a ref expression, and lakefs:// identifes a repo. """ - sr = urllib.parse.urlsplit(uri_as_string, allow_fragments=False) + # sr = urllib.parse.urlsplit(uri_as_string, allow_fragments=False) + sr = smart_open.utils.safe_urlsplit(uri_as_string) assert sr.scheme == SCHEME repo = sr.netloc _pattern = r"^/(?P[^/]+)/(?P.+)" @@ -52,7 +51,17 @@ def parse_uri(uri_as_string): return dict(scheme=SCHEME, repo=repo, ref=ref, key=key) -def open_uri(uri, mode, transport_params): +def open_uri(uri: str, mode: str, transport_params: dict) -> typing.IO: + """Return a file-like object pointing to the URI. + + Args: + uri: The URI to open + mode: Either "rb" or "wb". + transport_params: Any additional parameters to pass to the `open` function (see below). + + Returns: + file-like object. + """ parsed_uri = parse_uri(uri) kwargs = smart_open.utils.check_kwargs(open, transport_params) return open( @@ -165,7 +174,9 @@ def __init__( def close(self): """Flush and close this stream.""" - pass + # todo: check what to close + # self._raw_reader. + self._buffer.empty() def readable(self): """Return True if the stream can be read from.""" @@ -229,6 +240,30 @@ def detach(self): """Unsupported.""" raise io.UnsupportedOperation + def seek(self, offset, whence=smart_open.constants.WHENCE_START): + """Seek to the specified position. + + :param int offset: The offset in bytes. + :param int whence: Where the offset is from. + + Returns the position after seeking.""" + logger.debug('seeking to offset: %r whence: %r', offset, whence) + if whence not in smart_open.constants.WHENCE_CHOICES: + raise ValueError('invalid whence %i, expected one of %r' % (whence, + smart_open.constants.WHENCE_CHOICES)) + + # Convert relative offset to absolute, since self._raw_reader + # doesn't know our current position. + if whence == constants.WHENCE_CURRENT: + whence = constants.WHENCE_START + offset += self._position + + self._position = self._raw_reader.seek(offset, whence) + self._buffer.empty() + self._eof = self._position == self._raw_reader._content_length + logger.debug('current_pos: %r', self._position) + return self._position + def tell(self): """Return the current position within the file.""" return self._position diff --git a/smart_open/tests/test_lakefs.py b/smart_open/tests/test_lakefs.py index d283b523..4b62756c 100644 --- a/smart_open/tests/test_lakefs.py +++ b/smart_open/tests/test_lakefs.py @@ -3,12 +3,12 @@ # This code is distributed under the terms and conditions # from the MIT License (MIT). # +import typing import pytest import lakefs_client -from lakefs_client import client -from lakefs_client import models -from lakefs_client import apis +from lakefs_client import client, models, apis import logging +from smart_open.lakefs import Reader """It needs docker compose to run lakefs locally: @@ -21,22 +21,11 @@ logger = logging.getLogger(__name__) -def api_available(lfs_client: client.LakeFSClient): - from urllib3.exceptions import MaxRetryError - - healthcheck: apis.HealthCheckApi = lfs_client.healthcheck - try: - healthcheck.health_check() - return True - except (lakefs_client.ApiException, MaxRetryError): - return False - - @pytest.fixture(scope="module") def lakefs(): import shlex import subprocess - from time import sleep + from urllib3.exceptions import MaxRetryError compose = subprocess.Popen( shlex.split("curl https://compose.lakefs.io"), stdout=subprocess.PIPE @@ -47,8 +36,14 @@ def lakefs(): configuration = lakefs_client.Configuration(_LAKEFS_HOST) lfs_client = client.LakeFSClient(configuration) - while not api_available(lfs_client): - sleep(1) + healthcheck: apis.HealthCheckApi = lfs_client.healthcheck + api_available = False + while not api_available: + try: + healthcheck.health_check() + api_available = True + except (lakefs_client.ApiException, MaxRetryError): + continue comm_prefs = models.CommPrefsInput( email="test@company.com", @@ -69,7 +64,18 @@ def lakefs(): username=credentials.access_key_id, password=credentials.secret_access_key, ) - yield client.LakeFSClient(configuration) + lfs_client = client.LakeFSClient(configuration) + + repositories_api: apis.RepositoriesApi = lfs_client.repositories + new_repo = models.RepositoryCreation( + name="repo", storage_namespace="local:///home/lakefs/", default_branch="main" + ) + try: + _: models.Repository = repositories_api.create_repository(new_repo) + except lakefs_client.ApiException as e: + raise Exception("Error creating repository: %s\n" % e) from e + + yield lfs_client compose = subprocess.Popen( shlex.split("curl https://compose.lakefs.io"), stdout=subprocess.PIPE @@ -78,64 +84,195 @@ def lakefs(): compose.stdout.close() -def create_repo(lfs_client: client.LakeFSClient, repo_name: str) -> models.Repository: - new_repo = models.RepositoryCreation( - name=repo_name, storage_namespace="local:///home/lakefs/", default_branch="main" +@pytest.mark.parametrize( + "uri, parsed", + [ + ("lakefs://REPO/REF/file", dict(scheme='lakefs', repo='REPO', ref='REF', key='file')), + ("lakefs://REPO/REF/1/file", dict(scheme='lakefs', repo='REPO', ref='REF', key='1/file')), + pytest.param( + "lakefs://REPO/REF/1/file", dict(scheme='lakefs', repo='REPO', ref='REF/1', key='file'), + marks=pytest.mark.xfail), + ] ) - try: - repositories: apis.RepositoriesApi = lfs_client.repositories - repository: models.Repository = repositories.create_repository(new_repo) - except lakefs_client.ApiException as e: - raise Exception("Error creating repository: %s\n" % e) from e - return repository - - -def put_to_repo( - lfs_client: client.LakeFSClient, - repo: models.Repository, - path: str, - content: bytes, - branch: str | None = None, -): - from io import BytesIO - - objects: apis.ObjectsApi = lfs_client.objects - _branch = branch if branch else repo.default_branch - stream = BytesIO(content) - stream.name = path - try: - obj_stats = objects.upload_object(repo.id, _branch, path, content=stream) - except lakefs_client.ApiException as e: - raise Exception("Error uploading object: %s\n" % e) from e - return obj_stats +def test_parse_uri(uri, parsed): + from smart_open.lakefs import parse_uri + assert parsed == parse_uri(uri) class TestReader: - def test_read(self, lakefs: client.LakeFSClient): - from smart_open.lakefs import Reader - content = "hello wořld\nhow are you?".encode("utf8") - path = "test.txt" - repo_name = "test" - repo = create_repo(lfs_client=lakefs, repo_name=repo_name) - put_to_repo(lakefs, repo, path, content) - logger.debug("content: %r, len: %r", content, len(content)) + @pytest.fixture(scope="module") + def repo(self, lakefs) -> models.Repository: + repositories_api: apis.RepositoriesApi = lakefs.repositories + return repositories_api.list_repositories().results[0] - fin = Reader(client=lakefs, repo=repo_name, ref=repo.default_branch, path=path) - assert content[:6] == fin.read(6) - assert content[6:14] == fin.read(8) - assert content[14:] == fin.read() + @pytest.fixture(scope="module") + def put_to_repo( + self, + lakefs, + repo: models.Repository, + ) -> typing.Callable: + + def _put_to_repo( + path: str, + content: bytes, + branch: str | None = None, + ) -> typing.IO: + from io import BytesIO + + objects: apis.ObjectsApi = lakefs.objects + _branch = branch if branch else repo.default_branch + stream = BytesIO(content) + stream.name = path + try: + _ = objects.upload_object(repo.id, _branch, path, content=stream) + except lakefs_client.ApiException as e: + raise Exception("Error uploading object: %s\n" % e) from e + return path, content - def test_iter(self, lakefs: client.LakeFSClient): - from smart_open.lakefs import Reader + return _put_to_repo + @pytest.fixture(scope="module") + def file(self, put_to_repo) -> typing.IO: + path = "test/file.txt" content = "hello wořld\nhow are you?".encode("utf8") - path = "test_iter.txt" - repo_name = "test-iter" - repo = create_repo(lfs_client=lakefs, repo_name=repo_name) - put_to_repo(lakefs, repo, path, content) + return put_to_repo(path, content) - # connect to fake Azure Blob Storage and read from the fake key we filled above - fin = Reader(client=lakefs, repo=repo_name, ref=repo.default_branch, path=path) + + def test_iter(self, lakefs, repo, file): + path, content = file + fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) output = [line.rstrip(b"\n") for line in fin] assert output == content.split(b"\n") + + def test_iter_context_manager(self, lakefs, repo, file): + path, content = file + with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) as fin: + output = [line.rstrip(b"\n") for line in fin] + assert output == content.split(b"\n") + + def test_read(self, lakefs, repo, file): + path, content = file + fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + assert content[:6] == fin.read(6) + assert content[6:6+8] == fin.read(8) + assert content[6+8:] == fin.read() + + def test_seek_beginning(self, lakefs, repo, file): + path, content = file + fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + assert content[:6] == fin.read(6) + assert content[6:6+8] == fin.read(8) + fin.seek(0) + assert content == fin.read() + fin.seek(0) + assert content == fin.read(-1) + + def test_seek_start(self, lakefs, repo, file): + path, _ = file + fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + assert fin.seek(6) == 6 + assert fin.tell() == 6 + assert fin.read(6) == u'wořld'.encode('utf-8') + + def test_seek_current(self, lakefs, repo, file): + from smart_open import constants + + path, _ = file + fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + assert fin.read(5) == b'hello' + assert fin.seek(1, whence=constants.WHENCE_CURRENT) == 6 + assert fin.read(6) == u'wořld'.encode('utf-8') + + + def test_seek_end(self, lakefs, repo, file): + from smart_open import constants + + path, content = file + fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + assert fin.seek(-4, whence=constants.WHENCE_END) == len(content) - 4 + assert fin.read() == b'you?' + + def test_seek_past_end(self, lakefs, repo, file): + path, content = file + fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + assert fin.seek(60) == len(content) + + def test_detect_eof(self, lakefs, repo, file): + from smart_open import constants + + path, content = file + fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + fin.read() + eof = fin.tell() + assert eof == len(content) + fin.seek(0, whence=constants.WHENCE_END) + assert fin.tell() == eof + fin.seek(eof) + assert fin.tell() == eof + + def test_read_gzip(self, lakefs, repo, put_to_repo): + from io import BytesIO + import gzip + + expected = u'раcцветали яблони и груши, поплыли туманы над рекой...'.encode('utf-8') + buf = BytesIO() + buf.close = lambda: None # keep buffer open so that we can .getvalue() + with gzip.GzipFile(fileobj=buf, mode='w') as zipfile: + zipfile.write(expected) + path = "zip/file.zip" + _ = put_to_repo(path, buf.getvalue()) + + # + # Make sure we're reading things correctly. + # + with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) as fin: + assert fin.read() == buf.getvalue() + + # + # Make sure the buffer we wrote is legitimate gzip. + # + sanity_buf = BytesIO(buf.getvalue()) + with gzip.GzipFile(fileobj=sanity_buf) as zipfile: + assert zipfile.read() == expected + + logger.debug('starting actual test') + with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) as fin: + with gzip.GzipFile(fileobj=fin) as zipfile: + assert zipfile.read() == expected + + def test_readline(self, lakefs, repo, put_to_repo): + content = b'englishman\nin\nnew\nyork\n' + path = "many_lines.txt" + _ = put_to_repo(path, content) + with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) as fin: + fin.readline() + assert fin.tell() == content.index(b'\n')+1 + fin.seek(0) + assert list(fin) == [b'englishman\n', b'in\n', b'new\n', b'york\n'] + assert fin.tell() == len(content) + + def test_readline_tiny_buffer(self, lakefs, repo, put_to_repo): + content = b'englishman\nin\nnew\nyork\n' + path = "many_lines.txt" + _ = put_to_repo(path, content) + with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path, buffer_size=8) as fin: + assert list(fin) == [b'englishman\n', b'in\n', b'new\n', b'york\n'] + assert fin.tell() == len(content) + + def test_read0_does_not_return_data(self, lakefs, repo, file): + path, _ = file + with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) as fin: + assert fin.read(0) == b'' + + def test_read_past_end(self, lakefs, repo, file): + path, content = file + with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) as fin: + assert fin.read(100) == content + + def test_read_empty_file(self, lakefs, repo, put_to_repo): + content = b'' + path = "empty_file.txt" + _ = put_to_repo(path, content) + with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path, buffer_size=8) as fin: + assert fin.read() == b'' From 2698b1120cfc51a02b127b2899252c92940020c6 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Sat, 11 Mar 2023 00:51:04 +0100 Subject: [PATCH 04/23] Fix docstrings and type hints --- smart_open/lakefs.py | 139 +++++++++++++++++--------------- smart_open/tests/test_lakefs.py | 1 - 2 files changed, 74 insertions(+), 66 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index dac967c2..776cbf4e 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -2,6 +2,7 @@ import io import re import logging +import functools try: from lakefs_client import client, apis, models @@ -18,12 +19,8 @@ "lakefs:///REPO/main/file.bz2", ) -DEFAULT_BUFFER_SIZE = 4 * 1024**2 """Default buffer size is 256MB.""" - -DEFAULT_MAX_CONCURRENCY = 1 -"""Default number of parallel connections with which to download.""" - +DEFAULT_BUFFER_SIZE = 4 * 1024**2 logger = logging.getLogger(__name__) @@ -36,7 +33,6 @@ def parse_uri(uri_as_string): both for path prefixes and for full paths. In similar fashion, lakefs:/// identifies the repository at a ref expression, and lakefs:// identifes a repo. """ - # sr = urllib.parse.urlsplit(uri_as_string, allow_fragments=False) sr = smart_open.utils.safe_urlsplit(uri_as_string) assert sr.scheme == SCHEME repo = sr.netloc @@ -54,13 +50,12 @@ def parse_uri(uri_as_string): def open_uri(uri: str, mode: str, transport_params: dict) -> typing.IO: """Return a file-like object pointing to the URI. - Args: - uri: The URI to open - mode: Either "rb" or "wb". - transport_params: Any additional parameters to pass to the `open` function (see below). + :param str uri: The URI to open + :param str mode: Either "rb" or "wb". + :param dict transport_params: Any additional parameters to pass to the `open` function (see below). - Returns: - file-like object. + :returns: file-like object. + :rtype: file-like """ parsed_uri = parse_uri(uri) kwargs = smart_open.utils.check_kwargs(open, transport_params) @@ -73,16 +68,16 @@ def open( repo, ref, key, + mode, client=None, buffer_size=DEFAULT_BUFFER_SIZE, - max_concurrency=DEFAULT_MAX_CONCURRENCY, client_kwargs=None, writebuffer=None, ): pass -class _RawReader(object): +class _RawReader: """Read a lakeFS object.""" def __init__( @@ -97,23 +92,23 @@ def __init__( self._ref = ref self._path = path - self._content_length = self._get_content_length() self._position = 0 - def _get_content_length(self): + @functools.cached_property + def _content_length(self) -> int: objects: apis.ObjectsApi = self._client.objects obj_stats: models.ObjectStats = objects.stat_object( self._repo, self._ref, self._path ) return obj_stats.size_bytes - def seek(self, offset, whence=constants.WHENCE_START): + def seek(self, offset: int, whence: int = constants.WHENCE_START) -> int: """Seek to the specified position. - :param int offset: The offset in bytes. + :param int offset: The byte offset. :param int whence: Where the offset is from. - :returns: the position after seeking. + :returns: The position after seeking. :rtype: int """ if whence not in constants.WHENCE_CHOICES: @@ -132,14 +127,21 @@ def seek(self, offset, whence=constants.WHENCE_START): return self._position - def read(self, size=-1): + def read(self, size: int = -1) -> bytes: + """Read from lakefs using the objects api. + + :param int size: number of bytes to read. + + :returns: the bytes read from lakefs + :rtype: bytes + """ if self._position >= self._content_length: return b"" _size = max(-1, size) - objects: apis.ObjectsApi = self._client.objects start_range = self._position end_range = self._content_length if _size == -1 else (start_range + _size) range = f"bytes={start_range}-{end_range}" + objects: apis.ObjectsApi = self._client.objects binary = objects.get_object( self._repo, self._ref, self._path, range=range ).read() @@ -148,6 +150,10 @@ def read(self, size=-1): class Reader(io.BufferedIOBase): + """Reads bytes from a lakefs object. + + Implements the io.BufferedIOBase interface of the standard library. + """ def __init__( self, client: client.LakeFSClient, @@ -163,57 +169,64 @@ def __init__( self._raw_reader = _RawReader(client, repo, ref, path) self._position = 0 self._eof = False - self._buffer_size = buffer_size self._buffer = bytebuffer.ByteBuffer(buffer_size) self._line_terminator = line_terminator - self.raw = None - - # - # io.BufferedIOBase methods. - # - def close(self): + def close(self) -> None: """Flush and close this stream.""" - # todo: check what to close - # self._raw_reader. self._buffer.empty() - def readable(self): + def readable(self) -> bool: """Return True if the stream can be read from.""" return True - def read(self, size=-1): - """Read up to size bytes from the object and return them.""" + def read(self, size: int = -1) -> bytes: + """Read and return up to size bytes. + + :param int size: + + :returns: read bytes + :rtype: bytes + """ if size == 0: return b"" elif size < 0: out = self._read_from_buffer() + self._raw_reader.read() self._position = self._raw_reader._content_length return out - - if len(self._buffer) >= size: + elif size <= len(self._buffer): return self._read_from_buffer(size) - - if self._eof: + elif self._eof: return self._read_from_buffer() + else: + self._fill_buffer(size) + return self._read_from_buffer(size) - self._fill_buffer(size) - return self._read_from_buffer(size) - - def read1(self, size=-1): - """This is the same as read().""" + def read1(self, size: int = -1): return self.read(size=size) - def readinto(self, b): - """Read up to len(b) bytes into b, and return the number of bytes read.""" + def readinto(self, b: bytes) -> int: + """Read bytes into b. + + :param bytes b: pre-allocated, writable bytes-like object. + + :returns: the number of bytes read. + :rtype: int + """ data = self.read(len(b)) if not data: return 0 b[: len(data)] = data return len(data) - def readline(self, limit=-1): - """Read up to and including the next newline. Returns the bytes read.""" + def readline(self, limit=-1) -> bytes: + """Read up to and including the next newline. + + :param int limit: + + :returns: bytes read + :rtype: bytes + """ if limit != -1: raise NotImplementedError("limits other than -1 not implemented yet") @@ -222,31 +235,33 @@ def readline(self, limit=-1): line_part = self._buffer.readline(self._line_terminator) line.write(line_part) self._position += len(line_part) - if line_part.endswith(self._line_terminator): break else: self._fill_buffer() - return line.getvalue() def seekable(self): - """If False, seek(), tell() and truncate() will raise IOError. - - We offer only seek support, and no truncate support.""" + """If the stream supports random access or not.""" return True def detach(self): """Unsupported.""" raise io.UnsupportedOperation - def seek(self, offset, whence=smart_open.constants.WHENCE_START): + def truncate(self, size=None): + """Unsupported.""" + raise io.UnsupportedOperation + + def seek(self, offset: int, whence: int = smart_open.constants.WHENCE_START): """Seek to the specified position. :param int offset: The offset in bytes. :param int whence: Where the offset is from. - Returns the position after seeking.""" + :returns: the position after seeking. + :r + """ logger.debug('seeking to offset: %r whence: %r', offset, whence) if whence not in smart_open.constants.WHENCE_CHOICES: raise ValueError('invalid whence %i, expected one of %r' % (whence, @@ -265,23 +280,17 @@ def seek(self, offset, whence=smart_open.constants.WHENCE_START): return self._position def tell(self): - """Return the current position within the file.""" + """Return the current stream position.""" return self._position - def truncate(self, size=None): - """Unsupported.""" - raise io.UnsupportedOperation - - # - # Internal methods. - # - def _read_from_buffer(self, size=-1): - size = size if size >= 0 else len(self._buffer) + def _read_from_buffer(self, size: int = -1) -> bytes: + """Reads from buffer and updates position.""" part = self._buffer.read(size) self._position += len(part) return part - def _fill_buffer(self, size=-1): + def _fill_buffer(self, size: int = -1) -> None: + """Fills the buffer with either the default buffer size or size.""" size = max(size, self._buffer._chunk_size) while len(self._buffer) < size and not self._eof: bytes_read = self._buffer.fill(self._raw_reader) @@ -303,4 +312,4 @@ def __repr__(self): "ref=%r, " "path=%r, " "buffer_size=%r" - ) % (self._repo, self._ref, self._path, self._buffer_size) + ) % (self._repo, self._ref, self._path, self._buffer._chunk_size) diff --git a/smart_open/tests/test_lakefs.py b/smart_open/tests/test_lakefs.py index 4b62756c..7d3a6397 100644 --- a/smart_open/tests/test_lakefs.py +++ b/smart_open/tests/test_lakefs.py @@ -184,7 +184,6 @@ def test_seek_current(self, lakefs, repo, file): assert fin.seek(1, whence=constants.WHENCE_CURRENT) == 6 assert fin.read(6) == u'wořld'.encode('utf-8') - def test_seek_end(self, lakefs, repo, file): from smart_open import constants From 185437e7b72ad018eee4e020fd0c7da961efd491 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Sun, 12 Mar 2023 20:24:43 +0100 Subject: [PATCH 05/23] Add readinto and read1 test --- smart_open/tests/test_lakefs.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/smart_open/tests/test_lakefs.py b/smart_open/tests/test_lakefs.py index 7d3a6397..c8f5533b 100644 --- a/smart_open/tests/test_lakefs.py +++ b/smart_open/tests/test_lakefs.py @@ -155,9 +155,18 @@ def test_read(self, lakefs, repo, file): path, content = file fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) assert content[:6] == fin.read(6) - assert content[6:6+8] == fin.read(8) + assert content[6:6+8] == fin.read1(8) assert content[6+8:] == fin.read() + def test_readinto(self, lakefs, repo, file): + path, content = file + fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + b = bytearray(6) + assert len(b) == fin.readinto(b) + assert content[:6] == b + assert len(b) == fin.readinto1(b) + assert content[6:6+6] == b + def test_seek_beginning(self, lakefs, repo, file): path, content = file fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) From 9e0d2ab9bc3c23bb47f0ffb565e2c3b5b749032d Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Sun, 12 Mar 2023 20:25:19 +0100 Subject: [PATCH 06/23] Subclass rawiobase --- smart_open/lakefs.py | 108 +++++++++++++++++++++++++------------------ 1 file changed, 63 insertions(+), 45 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index 776cbf4e..db6cedd5 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -77,7 +77,7 @@ def open( pass -class _RawReader: +class _RawReader(io.RawIOBase): """Read a lakeFS object.""" def __init__( @@ -91,17 +91,26 @@ def __init__( self._repo = repo self._ref = ref self._path = path - self._position = 0 + def seekable(self) -> bool: + return True + + def readable(self) -> bool: + return True + @functools.cached_property - def _content_length(self) -> int: + def content_length(self) -> int: objects: apis.ObjectsApi = self._client.objects obj_stats: models.ObjectStats = objects.stat_object( self._repo, self._ref, self._path ) return obj_stats.size_bytes + @property + def eof(self) -> bool: + return self._position == self.content_length + def seek(self, offset: int, whence: int = constants.WHENCE_START) -> int: """Seek to the specified position. @@ -121,32 +130,35 @@ def seek(self, offset: int, whence: int = constants.WHENCE_START) -> int: elif whence == constants.WHENCE_CURRENT: start = max(0, self._position + offset) elif whence == constants.WHENCE_END: - start = max(0, self._content_length + offset) + start = max(0, self.content_length + offset) - self._position = min(start, self._content_length) + self._position = min(start, self.content_length) return self._position - def read(self, size: int = -1) -> bytes: - """Read from lakefs using the objects api. + def readinto(self, __buffer: bytes) -> int | None: + """Read bytes into a pre-allocated bytes-like object __buffer. :param int size: number of bytes to read. - :returns: the bytes read from lakefs - :rtype: bytes + :returns: the number of bytes read from lakefs + :rtype: int """ - if self._position >= self._content_length: - return b"" - _size = max(-1, size) + if self._position >= self.content_length: + return 0 + size = len(__buffer) start_range = self._position - end_range = self._content_length if _size == -1 else (start_range + _size) + end_range = max(self.content_length, (start_range + size)) range = f"bytes={start_range}-{end_range}" objects: apis.ObjectsApi = self._client.objects - binary = objects.get_object( + data = objects.get_object( self._repo, self._ref, self._path, range=range ).read() - self._position += len(binary) - return binary + if not data: + return 0 + self._position += len(data) + __buffer[: len(data)] = data + return len(data) class Reader(io.BufferedIOBase): @@ -166,12 +178,15 @@ def __init__( self._repo = repo self._ref = ref self._path = path - self._raw_reader = _RawReader(client, repo, ref, path) + self.raw = _RawReader(client, repo, ref, path) self._position = 0 - self._eof = False self._buffer = bytebuffer.ByteBuffer(buffer_size) self._line_terminator = line_terminator + @property + def bytes_buffered(self) -> int: + return len(self._buffer) + def close(self) -> None: """Flush and close this stream.""" self._buffer.empty() @@ -191,33 +206,37 @@ def read(self, size: int = -1) -> bytes: if size == 0: return b"" elif size < 0: - out = self._read_from_buffer() + self._raw_reader.read() - self._position = self._raw_reader._content_length + out = self._read_from_buffer() + self.raw.read() + self._position = self.raw.content_length return out - elif size <= len(self._buffer): + elif size <= self.bytes_buffered: + # Fast path: the data to read is fully buffered. return self._read_from_buffer(size) - elif self._eof: - return self._read_from_buffer() - else: + if not self.raw.eof: self._fill_buffer(size) - return self._read_from_buffer(size) + return self._read_from_buffer(size) def read1(self, size: int = -1): - return self.read(size=size) - - def readinto(self, b: bytes) -> int: - """Read bytes into b. - - :param bytes b: pre-allocated, writable bytes-like object. + """Read and return up to size bytes. - :returns: the number of bytes read. - :rtype: int + with at most one call to the underlying raw stream readinto(). + This can be useful if you are implementing your own buffering + on top of a BufferedIOBase object. """ - data = self.read(len(b)) - if not data: - return 0 - b[: len(data)] = data - return len(data) + if size == 0: + return b"" + elif size < 0: + out = self._read_from_buffer() + self.raw.read() + self._position = self.raw.content_length + return out + elif size <= self.bytes_buffered: + # Fast path: the data to read is fully buffered. + return self._read_from_buffer(size) + else: + out = self._read_from_buffer() + out += self.raw.read(size-len(out)) + self._position += len(out) + return out def readline(self, limit=-1) -> bytes: """Read up to and including the next newline. @@ -231,7 +250,8 @@ def readline(self, limit=-1) -> bytes: raise NotImplementedError("limits other than -1 not implemented yet") line = io.BytesIO() - while not (self._eof and len(self._buffer) == 0): + while not (self.raw.eof and self.bytes_buffered == 0): + # while we are not in eof or buffer is not empty line_part = self._buffer.readline(self._line_terminator) line.write(line_part) self._position += len(line_part) @@ -267,15 +287,14 @@ def seek(self, offset: int, whence: int = smart_open.constants.WHENCE_START): raise ValueError('invalid whence %i, expected one of %r' % (whence, smart_open.constants.WHENCE_CHOICES)) - # Convert relative offset to absolute, since self._raw_reader + # Convert relative offset to absolute, since self.raw # doesn't know our current position. if whence == constants.WHENCE_CURRENT: whence = constants.WHENCE_START offset += self._position - self._position = self._raw_reader.seek(offset, whence) + self._position = self.raw.seek(offset, whence) self._buffer.empty() - self._eof = self._position == self._raw_reader._content_length logger.debug('current_pos: %r', self._position) return self._position @@ -292,11 +311,10 @@ def _read_from_buffer(self, size: int = -1) -> bytes: def _fill_buffer(self, size: int = -1) -> None: """Fills the buffer with either the default buffer size or size.""" size = max(size, self._buffer._chunk_size) - while len(self._buffer) < size and not self._eof: - bytes_read = self._buffer.fill(self._raw_reader) + while self.bytes_buffered < size and not self.raw.eof: + bytes_read = self._buffer.fill(self.raw) if bytes_read == 0: logger.debug("%s: reached EOF while filling buffer", self) - self._eof = True def __str__(self): return "smart_open.lakefs.Reader(%r, %r, %r)" % ( From c7860f92aea1bbedc9d9075233c42833557a6c92 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Sun, 12 Mar 2023 22:32:39 +0100 Subject: [PATCH 07/23] Add raw writer --- smart_open/lakefs.py | 91 ++++++++++++++++++++++++++++++--- smart_open/tests/test_lakefs.py | 50 +++++++++++++++--- 2 files changed, 127 insertions(+), 14 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index db6cedd5..59eddebd 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -5,6 +5,7 @@ import functools try: + import lakefs_client from lakefs_client import client, apis, models except ImportError: MISSING_DEPS = True @@ -70,11 +71,45 @@ def open( key, mode, client=None, + commit_message=None, buffer_size=DEFAULT_BUFFER_SIZE, - client_kwargs=None, - writebuffer=None, ): - pass + """Open a lakefs object for reading or writing. + + Parameters + ---------- + repo: str + The name of the repository this object resides in. + ref: str + The name of the branch or commit. + key: str + The path to the object for a given repo and branch. + mode: str + The mode for opening the object. Must be either "rb" or "wb". + client: lakefs_client.client.LakeFSClient + The lakefs client to use. + commit_message: str + The message to include in the commit. + buffer_size: int, optional + The buffer size to use when performing I/O. For reading only. + """ + if not client: + raise ValueError('you must specify the client to connect to lakefs') + + if mode == smart_open.constants.READ_BINARY: + return Reader( + client, + repo, + ref, + key, + buffer_size=buffer_size, + line_terminator=smart_open.constants.BINARY_NEWLINE, + ) + elif mode == smart_open.constants.WRITE_BINARY: + raw_writer = _RawWriter(client, repo, ref, key, commit_message) + return io.BufferedWriter(raw_writer, buffer_size) + else: + raise NotImplementedError(f'Lakefs support for mode {mode} not implemented') class _RawReader(io.RawIOBase): @@ -85,12 +120,12 @@ def __init__( client: client.LakeFSClient, repo: str, ref: str, - path: str, + key: str, ): self._client = client self._repo = repo self._ref = ref - self._path = path + self._path = key self._position = 0 def seekable(self) -> bool: @@ -160,6 +195,46 @@ def readinto(self, __buffer: bytes) -> int | None: __buffer[: len(data)] = data return len(data) +class _RawWriter(io.RawIOBase): + def __init__( + self, + client: client.LakeFSClient, + repo: str, + ref: str, + key: str, + commit_message: str | None + ): + self._client = client + self._repo = repo + self._ref = ref + self._path = key + if commit_message: + self._message = commit_message + else: + self._message = f'Update {self._path}.' + + def writable(self) -> bool: + return True + + def write(self, __b: bytes) -> int | None: + objects: apis.ObjectsApi = self._client.objects + commits: apis.CommitsApi = self._client.commits + stream = io.BytesIO(__b) + stream.name = self._path + try: + object_stats = objects.upload_object(self.repo.id, self._ref, self._path, content=stream) + message = models.CommitCreation(self._message) + _ = commits.commit(self.repo.id, self._ref, message) + except lakefs_client.ApiException as e: + raise Exception("Error uploading object: %s\n" % e) from e + + return object_stats.size_bytes + + @functools.cached_property + def repo(self) -> models.Repository: + repositories_api: apis.RepositoriesApi = self._client.repositories + return repositories_api.get_repository(self._repo) + class Reader(io.BufferedIOBase): """Reads bytes from a lakefs object. @@ -171,14 +246,14 @@ def __init__( client: client.LakeFSClient, repo: str, ref: str, - path: str, + key: str, buffer_size=DEFAULT_BUFFER_SIZE, line_terminator=smart_open.constants.BINARY_NEWLINE, ): self._repo = repo self._ref = ref - self._path = path - self.raw = _RawReader(client, repo, ref, path) + self._path = key + self.raw = _RawReader(client, repo, ref, key) self._position = 0 self._buffer = bytebuffer.ByteBuffer(buffer_size) self._line_terminator = line_terminator diff --git a/smart_open/tests/test_lakefs.py b/smart_open/tests/test_lakefs.py index c8f5533b..f518a4ed 100644 --- a/smart_open/tests/test_lakefs.py +++ b/smart_open/tests/test_lakefs.py @@ -8,7 +8,7 @@ import lakefs_client from lakefs_client import client, models, apis import logging -from smart_open.lakefs import Reader +from smart_open.lakefs import open, Reader """It needs docker compose to run lakefs locally: @@ -83,6 +83,10 @@ def lakefs(): subprocess.Popen(shlex.split("docker-compose -f - down"), stdin=compose.stdout) compose.stdout.close() +@pytest.fixture(scope="module") +def repo(lakefs) -> models.Repository: + repositories_api: apis.RepositoriesApi = lakefs.repositories + return repositories_api.list_repositories().results[0] @pytest.mark.parametrize( "uri, parsed", @@ -101,11 +105,6 @@ def test_parse_uri(uri, parsed): class TestReader: - @pytest.fixture(scope="module") - def repo(self, lakefs) -> models.Repository: - repositories_api: apis.RepositoriesApi = lakefs.repositories - return repositories_api.list_repositories().results[0] - @pytest.fixture(scope="module") def put_to_repo( self, @@ -193,6 +192,19 @@ def test_seek_current(self, lakefs, repo, file): assert fin.seek(1, whence=constants.WHENCE_CURRENT) == 6 assert fin.read(6) == u'wořld'.encode('utf-8') + def test_seek_current_io(self, lakefs, repo, file): + from smart_open import constants + from smart_open.lakefs import _RawReader + import io + + path, _ = file + # fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + raw = _RawReader(client=lakefs, repo=repo.id, ref=repo.default_branch, key=path) + fin = io.BufferedReader(raw=raw) + assert fin.read(5) == b'hello' + assert fin.seek(1, constants.WHENCE_CURRENT) == 6 + assert fin.read(6) == u'wořld'.encode('utf-8') + def test_seek_end(self, lakefs, repo, file): from smart_open import constants @@ -284,3 +296,29 @@ def test_read_empty_file(self, lakefs, repo, put_to_repo): _ = put_to_repo(path, content) with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path, buffer_size=8) as fin: assert fin.read() == b'' + + def test_open(self, lakefs, repo, file): + path, content = file + with open(repo.id, repo.default_branch, path, "rb", client=lakefs) as fin: + assert fin.read(100) == content + +class TestWriter: + def commits(self, lakefs, repo): + refs: apis.RefsApi = lakefs.refs + commit_list = refs.log_commits(repo.id, repo.default_branch) + return commit_list.results + + def test_write(self, lakefs, repo): + content = u"ветер по морю гуляет...".encode('utf8') + with open(repo.id, repo.default_branch, "write.txt", "wb", lakefs) as fout: + assert fout.write(content) == len(content) + + with open(repo.id, repo.default_branch, "write.txt", "rb", lakefs) as fin: + assert fin.read() == content + + def test_commit(self, lakefs, repo): + content = u"ветер по морю гуляет...".encode('utf8') + message = "Modify file." + with open(repo.id, repo.default_branch, "write.txt", "wb", lakefs, message) as fout: + assert fout.write(content) == len(content) + assert self.commits(lakefs, repo)[0].message == message From cac52606bd3473bab1cbd9e3916340b82a2cea28 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Mon, 13 Mar 2023 00:02:26 +0100 Subject: [PATCH 08/23] Add commit to writer --- smart_open/lakefs.py | 85 +++++++-------- smart_open/tests/test_lakefs.py | 182 ++++++++++++++++++++------------ 2 files changed, 150 insertions(+), 117 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index 59eddebd..7ae5b295 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -20,7 +20,6 @@ "lakefs:///REPO/main/file.bz2", ) -"""Default buffer size is 256MB.""" DEFAULT_BUFFER_SIZE = 4 * 1024**2 logger = logging.getLogger(__name__) @@ -66,13 +65,13 @@ def open_uri(uri: str, mode: str, transport_params: dict) -> typing.IO: def open( - repo, - ref, - key, - mode, - client=None, - commit_message=None, - buffer_size=DEFAULT_BUFFER_SIZE, + repo: str, + ref: str, + key: str, + mode: str, + client: str | None = None, + commit_message: str | None = None, + buffer_size: int = DEFAULT_BUFFER_SIZE, ): """Open a lakefs object for reading or writing. @@ -89,27 +88,21 @@ def open( client: lakefs_client.client.LakeFSClient The lakefs client to use. commit_message: str - The message to include in the commit. + Only when writing. The message to include in the commit. buffer_size: int, optional - The buffer size to use when performing I/O. For reading only. + The buffer size to use when performing I/O. """ if not client: - raise ValueError('you must specify the client to connect to lakefs') + raise ValueError("you must specify the client to connect to lakefs") if mode == smart_open.constants.READ_BINARY: - return Reader( - client, - repo, - ref, - key, - buffer_size=buffer_size, - line_terminator=smart_open.constants.BINARY_NEWLINE, - ) + raw = _RawReader(client, repo, ref, key) + return io.BufferedReader(raw, buffer_size) elif mode == smart_open.constants.WRITE_BINARY: raw_writer = _RawWriter(client, repo, ref, key, commit_message) return io.BufferedWriter(raw_writer, buffer_size) else: - raise NotImplementedError(f'Lakefs support for mode {mode} not implemented') + raise NotImplementedError(f"Lakefs support for mode {mode} not implemented") class _RawReader(io.RawIOBase): @@ -146,7 +139,7 @@ def content_length(self) -> int: def eof(self) -> bool: return self._position == self.content_length - def seek(self, offset: int, whence: int = constants.WHENCE_START) -> int: + def seek(self, __offset: int, __whence: int = constants.WHENCE_START) -> int: """Seek to the specified position. :param int offset: The byte offset. @@ -155,17 +148,17 @@ def seek(self, offset: int, whence: int = constants.WHENCE_START) -> int: :returns: The position after seeking. :rtype: int """ - if whence not in constants.WHENCE_CHOICES: + if __whence not in constants.WHENCE_CHOICES: raise ValueError( "invalid whence, expected one of %r" % constants.WHENCE_CHOICES ) - if whence == constants.WHENCE_START: - start = max(0, offset) - elif whence == constants.WHENCE_CURRENT: - start = max(0, self._position + offset) - elif whence == constants.WHENCE_END: - start = max(0, self.content_length + offset) + if __whence == constants.WHENCE_START: + start = max(0, __offset) + elif __whence == constants.WHENCE_CURRENT: + start = max(0, self._position + __offset) + elif __whence == constants.WHENCE_END: + start = max(0, self.content_length + __offset) self._position = min(start, self.content_length) @@ -183,18 +176,17 @@ def readinto(self, __buffer: bytes) -> int | None: return 0 size = len(__buffer) start_range = self._position - end_range = max(self.content_length, (start_range + size)) + end_range = min(self.content_length, (start_range + size)) - 1 range = f"bytes={start_range}-{end_range}" objects: apis.ObjectsApi = self._client.objects - data = objects.get_object( - self._repo, self._ref, self._path, range=range - ).read() + data = objects.get_object(self._repo, self._ref, self._path, range=range).read() if not data: return 0 self._position += len(data) __buffer[: len(data)] = data return len(data) + class _RawWriter(io.RawIOBase): def __init__( self, @@ -202,7 +194,7 @@ def __init__( repo: str, ref: str, key: str, - commit_message: str | None + commit_message: str | None, ): self._client = client self._repo = repo @@ -211,7 +203,7 @@ def __init__( if commit_message: self._message = commit_message else: - self._message = f'Update {self._path}.' + self._message = f"Update {self._path}." def writable(self) -> bool: return True @@ -222,25 +214,22 @@ def write(self, __b: bytes) -> int | None: stream = io.BytesIO(__b) stream.name = self._path try: - object_stats = objects.upload_object(self.repo.id, self._ref, self._path, content=stream) + object_stats = objects.upload_object( + self._repo, self._ref, self._path, content=stream + ) message = models.CommitCreation(self._message) - _ = commits.commit(self.repo.id, self._ref, message) + _ = commits.commit(self._repo, self._ref, message) except lakefs_client.ApiException as e: raise Exception("Error uploading object: %s\n" % e) from e - return object_stats.size_bytes - @functools.cached_property - def repo(self) -> models.Repository: - repositories_api: apis.RepositoriesApi = self._client.repositories - return repositories_api.get_repository(self._repo) - class Reader(io.BufferedIOBase): """Reads bytes from a lakefs object. Implements the io.BufferedIOBase interface of the standard library. """ + def __init__( self, client: client.LakeFSClient, @@ -309,7 +298,7 @@ def read1(self, size: int = -1): return self._read_from_buffer(size) else: out = self._read_from_buffer() - out += self.raw.read(size-len(out)) + out += self.raw.read(size - len(out)) self._position += len(out) return out @@ -357,10 +346,12 @@ def seek(self, offset: int, whence: int = smart_open.constants.WHENCE_START): :returns: the position after seeking. :r """ - logger.debug('seeking to offset: %r whence: %r', offset, whence) + logger.debug("seeking to offset: %r whence: %r", offset, whence) if whence not in smart_open.constants.WHENCE_CHOICES: - raise ValueError('invalid whence %i, expected one of %r' % (whence, - smart_open.constants.WHENCE_CHOICES)) + raise ValueError( + "invalid whence %i, expected one of %r" + % (whence, smart_open.constants.WHENCE_CHOICES) + ) # Convert relative offset to absolute, since self.raw # doesn't know our current position. @@ -370,7 +361,7 @@ def seek(self, offset: int, whence: int = smart_open.constants.WHENCE_START): self._position = self.raw.seek(offset, whence) self._buffer.empty() - logger.debug('current_pos: %r', self._position) + logger.debug("current_pos: %r", self._position) return self._position def tell(self): diff --git a/smart_open/tests/test_lakefs.py b/smart_open/tests/test_lakefs.py index f518a4ed..765494e1 100644 --- a/smart_open/tests/test_lakefs.py +++ b/smart_open/tests/test_lakefs.py @@ -8,7 +8,7 @@ import lakefs_client from lakefs_client import client, models, apis import logging -from smart_open.lakefs import open, Reader +from smart_open.lakefs import open """It needs docker compose to run lakefs locally: @@ -83,35 +83,44 @@ def lakefs(): subprocess.Popen(shlex.split("docker-compose -f - down"), stdin=compose.stdout) compose.stdout.close() + @pytest.fixture(scope="module") def repo(lakefs) -> models.Repository: repositories_api: apis.RepositoriesApi = lakefs.repositories return repositories_api.list_repositories().results[0] + @pytest.mark.parametrize( "uri, parsed", [ - ("lakefs://REPO/REF/file", dict(scheme='lakefs', repo='REPO', ref='REF', key='file')), - ("lakefs://REPO/REF/1/file", dict(scheme='lakefs', repo='REPO', ref='REF', key='1/file')), + ( + "lakefs://REPO/REF/file", + dict(scheme="lakefs", repo="REPO", ref="REF", key="file"), + ), + ( + "lakefs://REPO/REF/1/file", + dict(scheme="lakefs", repo="REPO", ref="REF", key="1/file"), + ), pytest.param( - "lakefs://REPO/REF/1/file", dict(scheme='lakefs', repo='REPO', ref='REF/1', key='file'), - marks=pytest.mark.xfail), - ] - ) + "lakefs://REPO/REF/1/file", + dict(scheme="lakefs", repo="REPO", ref="REF/1", key="file"), + marks=pytest.mark.xfail, + ), + ], +) def test_parse_uri(uri, parsed): from smart_open.lakefs import parse_uri + assert parsed == parse_uri(uri) class TestReader: - @pytest.fixture(scope="module") def put_to_repo( - self, - lakefs, - repo: models.Repository, - ) -> typing.Callable: - + self, + lakefs, + repo: models.Repository, + ) -> typing.Callable: def _put_to_repo( path: str, content: bytes, @@ -137,40 +146,47 @@ def file(self, put_to_repo) -> typing.IO: content = "hello wořld\nhow are you?".encode("utf8") return put_to_repo(path, content) - def test_iter(self, lakefs, repo, file): path, content = file - fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) output = [line.rstrip(b"\n") for line in fin] assert output == content.split(b"\n") def test_iter_context_manager(self, lakefs, repo, file): path, content = file - with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) as fin: + with open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) as fin: output = [line.rstrip(b"\n") for line in fin] assert output == content.split(b"\n") def test_read(self, lakefs, repo, file): path, content = file - fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + fin = open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) assert content[:6] == fin.read(6) - assert content[6:6+8] == fin.read1(8) - assert content[6+8:] == fin.read() + assert content[6 : 6 + 8] == fin.read1(8) + assert content[6 + 8 :] == fin.read() def test_readinto(self, lakefs, repo, file): path, content = file - fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + fin = open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) b = bytearray(6) assert len(b) == fin.readinto(b) assert content[:6] == b assert len(b) == fin.readinto1(b) - assert content[6:6+6] == b + assert content[6 : 6 + 6] == b def test_seek_beginning(self, lakefs, repo, file): path, content = file - fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + fin = open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) assert content[:6] == fin.read(6) - assert content[6:6+8] == fin.read(8) + assert content[6 : 6 + 8] == fin.read(8) fin.seek(0) assert content == fin.read() fin.seek(0) @@ -178,55 +194,52 @@ def test_seek_beginning(self, lakefs, repo, file): def test_seek_start(self, lakefs, repo, file): path, _ = file - fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + fin = open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) assert fin.seek(6) == 6 assert fin.tell() == 6 - assert fin.read(6) == u'wořld'.encode('utf-8') + assert fin.read(6) == "wořld".encode("utf-8") def test_seek_current(self, lakefs, repo, file): from smart_open import constants path, _ = file - fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) - assert fin.read(5) == b'hello' - assert fin.seek(1, whence=constants.WHENCE_CURRENT) == 6 - assert fin.read(6) == u'wořld'.encode('utf-8') - - def test_seek_current_io(self, lakefs, repo, file): - from smart_open import constants - from smart_open.lakefs import _RawReader - import io - - path, _ = file - # fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) - raw = _RawReader(client=lakefs, repo=repo.id, ref=repo.default_branch, key=path) - fin = io.BufferedReader(raw=raw) - assert fin.read(5) == b'hello' + fin = open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) + assert fin.read(5) == b"hello" assert fin.seek(1, constants.WHENCE_CURRENT) == 6 - assert fin.read(6) == u'wořld'.encode('utf-8') + assert fin.read(6) == "wořld".encode("utf-8") def test_seek_end(self, lakefs, repo, file): from smart_open import constants path, content = file - fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) - assert fin.seek(-4, whence=constants.WHENCE_END) == len(content) - 4 - assert fin.read() == b'you?' + fin = open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) + assert fin.seek(-4, constants.WHENCE_END) == len(content) - 4 + assert fin.read() == b"you?" def test_seek_past_end(self, lakefs, repo, file): path, content = file - fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + fin = open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) assert fin.seek(60) == len(content) def test_detect_eof(self, lakefs, repo, file): from smart_open import constants path, content = file - fin = Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) + fin = open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) fin.read() eof = fin.tell() assert eof == len(content) - fin.seek(0, whence=constants.WHENCE_END) + fin.seek(0, constants.WHENCE_END) assert fin.tell() == eof fin.seek(eof) assert fin.tell() == eof @@ -235,10 +248,12 @@ def test_read_gzip(self, lakefs, repo, put_to_repo): from io import BytesIO import gzip - expected = u'раcцветали яблони и груши, поплыли туманы над рекой...'.encode('utf-8') + expected = "раcцветали яблони и груши, поплыли туманы над рекой...".encode( + "utf-8" + ) buf = BytesIO() buf.close = lambda: None # keep buffer open so that we can .getvalue() - with gzip.GzipFile(fileobj=buf, mode='w') as zipfile: + with gzip.GzipFile(fileobj=buf, mode="w") as zipfile: zipfile.write(expected) path = "zip/file.zip" _ = put_to_repo(path, buf.getvalue()) @@ -246,7 +261,9 @@ def test_read_gzip(self, lakefs, repo, put_to_repo): # # Make sure we're reading things correctly. # - with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) as fin: + with open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) as fin: assert fin.read() == buf.getvalue() # @@ -256,52 +273,75 @@ def test_read_gzip(self, lakefs, repo, put_to_repo): with gzip.GzipFile(fileobj=sanity_buf) as zipfile: assert zipfile.read() == expected - logger.debug('starting actual test') - with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) as fin: + logger.debug("starting actual test") + with open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) as fin: with gzip.GzipFile(fileobj=fin) as zipfile: assert zipfile.read() == expected def test_readline(self, lakefs, repo, put_to_repo): - content = b'englishman\nin\nnew\nyork\n' + content = b"englishman\nin\nnew\nyork\n" path = "many_lines.txt" _ = put_to_repo(path, content) - with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) as fin: + with open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) as fin: fin.readline() - assert fin.tell() == content.index(b'\n')+1 + assert fin.tell() == content.index(b"\n") + 1 fin.seek(0) - assert list(fin) == [b'englishman\n', b'in\n', b'new\n', b'york\n'] + assert list(fin) == [b"englishman\n", b"in\n", b"new\n", b"york\n"] assert fin.tell() == len(content) def test_readline_tiny_buffer(self, lakefs, repo, put_to_repo): - content = b'englishman\nin\nnew\nyork\n' + content = b"englishman\nin\nnew\nyork\n" path = "many_lines.txt" _ = put_to_repo(path, content) - with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path, buffer_size=8) as fin: - assert list(fin) == [b'englishman\n', b'in\n', b'new\n', b'york\n'] + with open( + mode="rb", + client=lakefs, + repo=repo.id, + ref=repo.default_branch, + key=path, + buffer_size=8, + ) as fin: + assert list(fin) == [b"englishman\n", b"in\n", b"new\n", b"york\n"] assert fin.tell() == len(content) def test_read0_does_not_return_data(self, lakefs, repo, file): path, _ = file - with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) as fin: - assert fin.read(0) == b'' + with open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) as fin: + assert fin.read(0) == b"" def test_read_past_end(self, lakefs, repo, file): path, content = file - with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path) as fin: + with open( + mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path + ) as fin: assert fin.read(100) == content def test_read_empty_file(self, lakefs, repo, put_to_repo): - content = b'' + content = b"" path = "empty_file.txt" _ = put_to_repo(path, content) - with Reader(client=lakefs, repo=repo.id, ref=repo.default_branch, path=path, buffer_size=8) as fin: - assert fin.read() == b'' + with open( + mode="rb", + client=lakefs, + repo=repo.id, + ref=repo.default_branch, + key=path, + buffer_size=8, + ) as fin: + assert fin.read() == b"" def test_open(self, lakefs, repo, file): path, content = file with open(repo.id, repo.default_branch, path, "rb", client=lakefs) as fin: assert fin.read(100) == content + class TestWriter: def commits(self, lakefs, repo): refs: apis.RefsApi = lakefs.refs @@ -309,16 +349,18 @@ def commits(self, lakefs, repo): return commit_list.results def test_write(self, lakefs, repo): - content = u"ветер по морю гуляет...".encode('utf8') - with open(repo.id, repo.default_branch, "write.txt", "wb", lakefs) as fout: + content = "ветер по морю гуляет...".encode("utf8") + path = "write/1.txt" + with open(repo.id, repo.default_branch, path, "wb", lakefs) as fout: assert fout.write(content) == len(content) - with open(repo.id, repo.default_branch, "write.txt", "rb", lakefs) as fin: + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: assert fin.read() == content def test_commit(self, lakefs, repo): - content = u"ветер по морю гуляет...".encode('utf8') + content = "ветер по морю гуляет...".encode("utf8") + path = "write/2.txt" message = "Modify file." - with open(repo.id, repo.default_branch, "write.txt", "wb", lakefs, message) as fout: + with open(repo.id, repo.default_branch, path, "wb", lakefs, message) as fout: assert fout.write(content) == len(content) assert self.commits(lakefs, repo)[0].message == message From 0787a35150da22f506865fdf0c598d5c2d68d732 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Mon, 13 Mar 2023 00:02:41 +0100 Subject: [PATCH 09/23] Register transport --- smart_open/transport.py | 1 + 1 file changed, 1 insertion(+) diff --git a/smart_open/transport.py b/smart_open/transport.py index 086ea2b0..30edebf8 100644 --- a/smart_open/transport.py +++ b/smart_open/transport.py @@ -104,6 +104,7 @@ def get_transport(scheme): register_transport("smart_open.s3") register_transport("smart_open.ssh") register_transport("smart_open.webhdfs") +register_transport("smart_open.lakefs") SUPPORTED_SCHEMES = tuple(sorted(_REGISTRY.keys())) """The transport schemes that the local installation of ``smart_open`` supports.""" From 6a0c3407822d75f99bf07ac306c3943ee4a0d950 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Mon, 13 Mar 2023 00:23:40 +0100 Subject: [PATCH 10/23] Update setup.py --- setup.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 57ad9e66..cdbe40a9 100644 --- a/setup.py +++ b/setup.py @@ -41,8 +41,9 @@ def read(fname): azure_deps = ['azure-storage-blob', 'azure-common', 'azure-core'] http_deps = ['requests'] ssh_deps = ['paramiko'] +lakefs_deps = ['lakefs_client'] -all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps +all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps + lakefs_deps tests_require = all_deps + [ 'moto[server]', 'responses', @@ -65,7 +66,7 @@ def read(fname): url='https://github.com/piskvorky/smart_open', download_url='http://pypi.python.org/pypi/smart_open', - keywords='file streaming, s3, hdfs, gcs, azure blob storage', + keywords='file streaming, s3, hdfs, gcs, azure blob storage, lakefs', license='MIT', platforms='any', @@ -80,6 +81,7 @@ def read(fname): 'http': http_deps, 'webhdfs': http_deps, 'ssh': ssh_deps, + 'lakefs': lakefs_deps, }, python_requires=">=3.6,<4.0", From 776bb97b1e9e21a45c2bb955cf5292dbbf2d2676 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Mon, 13 Mar 2023 12:56:10 +0100 Subject: [PATCH 11/23] Remove bufferediobase --- smart_open/lakefs.py | 197 ++++--------------------------------------- 1 file changed, 15 insertions(+), 182 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index 7ae5b295..e79271ba 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -10,8 +10,7 @@ except ImportError: MISSING_DEPS = True -from smart_open import bytebuffer, constants -import smart_open.utils +from smart_open import constants, utils SCHEME = "lakefs" @@ -33,7 +32,7 @@ def parse_uri(uri_as_string): both for path prefixes and for full paths. In similar fashion, lakefs:/// identifies the repository at a ref expression, and lakefs:// identifes a repo. """ - sr = smart_open.utils.safe_urlsplit(uri_as_string) + sr = utils.safe_urlsplit(uri_as_string) assert sr.scheme == SCHEME repo = sr.netloc _pattern = r"^/(?P[^/]+)/(?P.+)" @@ -58,7 +57,7 @@ def open_uri(uri: str, mode: str, transport_params: dict) -> typing.IO: :rtype: file-like """ parsed_uri = parse_uri(uri) - kwargs = smart_open.utils.check_kwargs(open, transport_params) + kwargs = utils.check_kwargs(open, transport_params) return open( parsed_uri["repo"], parsed_uri["ref"], parsed_uri["key"], mode, **kwargs ) @@ -95,10 +94,10 @@ def open( if not client: raise ValueError("you must specify the client to connect to lakefs") - if mode == smart_open.constants.READ_BINARY: + if mode == constants.READ_BINARY: raw = _RawReader(client, repo, ref, key) return io.BufferedReader(raw, buffer_size) - elif mode == smart_open.constants.WRITE_BINARY: + elif mode == constants.WRITE_BINARY: raw_writer = _RawWriter(client, repo, ref, key, commit_message) return io.BufferedWriter(raw_writer, buffer_size) else: @@ -106,7 +105,11 @@ def open( class _RawReader(io.RawIOBase): - """Read a lakeFS object.""" + """Read a lakeFS object. + + Provides low-level access to the underlying lakefs api. + High level primites are implementedu using io.BufferedReader. + """ def __init__( self, @@ -188,6 +191,11 @@ def readinto(self, __buffer: bytes) -> int | None: class _RawWriter(io.RawIOBase): + """Write a lakefs object. + + Provides low-level access to the underlying lakefs api. + High level primites are implementedu using io.BufferedReader. + """ def __init__( self, client: client.LakeFSClient, @@ -222,178 +230,3 @@ def write(self, __b: bytes) -> int | None: except lakefs_client.ApiException as e: raise Exception("Error uploading object: %s\n" % e) from e return object_stats.size_bytes - - -class Reader(io.BufferedIOBase): - """Reads bytes from a lakefs object. - - Implements the io.BufferedIOBase interface of the standard library. - """ - - def __init__( - self, - client: client.LakeFSClient, - repo: str, - ref: str, - key: str, - buffer_size=DEFAULT_BUFFER_SIZE, - line_terminator=smart_open.constants.BINARY_NEWLINE, - ): - self._repo = repo - self._ref = ref - self._path = key - self.raw = _RawReader(client, repo, ref, key) - self._position = 0 - self._buffer = bytebuffer.ByteBuffer(buffer_size) - self._line_terminator = line_terminator - - @property - def bytes_buffered(self) -> int: - return len(self._buffer) - - def close(self) -> None: - """Flush and close this stream.""" - self._buffer.empty() - - def readable(self) -> bool: - """Return True if the stream can be read from.""" - return True - - def read(self, size: int = -1) -> bytes: - """Read and return up to size bytes. - - :param int size: - - :returns: read bytes - :rtype: bytes - """ - if size == 0: - return b"" - elif size < 0: - out = self._read_from_buffer() + self.raw.read() - self._position = self.raw.content_length - return out - elif size <= self.bytes_buffered: - # Fast path: the data to read is fully buffered. - return self._read_from_buffer(size) - if not self.raw.eof: - self._fill_buffer(size) - return self._read_from_buffer(size) - - def read1(self, size: int = -1): - """Read and return up to size bytes. - - with at most one call to the underlying raw stream readinto(). - This can be useful if you are implementing your own buffering - on top of a BufferedIOBase object. - """ - if size == 0: - return b"" - elif size < 0: - out = self._read_from_buffer() + self.raw.read() - self._position = self.raw.content_length - return out - elif size <= self.bytes_buffered: - # Fast path: the data to read is fully buffered. - return self._read_from_buffer(size) - else: - out = self._read_from_buffer() - out += self.raw.read(size - len(out)) - self._position += len(out) - return out - - def readline(self, limit=-1) -> bytes: - """Read up to and including the next newline. - - :param int limit: - - :returns: bytes read - :rtype: bytes - """ - if limit != -1: - raise NotImplementedError("limits other than -1 not implemented yet") - - line = io.BytesIO() - while not (self.raw.eof and self.bytes_buffered == 0): - # while we are not in eof or buffer is not empty - line_part = self._buffer.readline(self._line_terminator) - line.write(line_part) - self._position += len(line_part) - if line_part.endswith(self._line_terminator): - break - else: - self._fill_buffer() - return line.getvalue() - - def seekable(self): - """If the stream supports random access or not.""" - return True - - def detach(self): - """Unsupported.""" - raise io.UnsupportedOperation - - def truncate(self, size=None): - """Unsupported.""" - raise io.UnsupportedOperation - - def seek(self, offset: int, whence: int = smart_open.constants.WHENCE_START): - """Seek to the specified position. - - :param int offset: The offset in bytes. - :param int whence: Where the offset is from. - - :returns: the position after seeking. - :r - """ - logger.debug("seeking to offset: %r whence: %r", offset, whence) - if whence not in smart_open.constants.WHENCE_CHOICES: - raise ValueError( - "invalid whence %i, expected one of %r" - % (whence, smart_open.constants.WHENCE_CHOICES) - ) - - # Convert relative offset to absolute, since self.raw - # doesn't know our current position. - if whence == constants.WHENCE_CURRENT: - whence = constants.WHENCE_START - offset += self._position - - self._position = self.raw.seek(offset, whence) - self._buffer.empty() - logger.debug("current_pos: %r", self._position) - return self._position - - def tell(self): - """Return the current stream position.""" - return self._position - - def _read_from_buffer(self, size: int = -1) -> bytes: - """Reads from buffer and updates position.""" - part = self._buffer.read(size) - self._position += len(part) - return part - - def _fill_buffer(self, size: int = -1) -> None: - """Fills the buffer with either the default buffer size or size.""" - size = max(size, self._buffer._chunk_size) - while self.bytes_buffered < size and not self.raw.eof: - bytes_read = self._buffer.fill(self.raw) - if bytes_read == 0: - logger.debug("%s: reached EOF while filling buffer", self) - - def __str__(self): - return "smart_open.lakefs.Reader(%r, %r, %r)" % ( - self._repo, - self._ref, - self._path, - ) - - def __repr__(self): - return ( - "smart_open.lakefs.Reader(" - "repo=%r, " - "ref=%r, " - "path=%r, " - "buffer_size=%r" - ) % (self._repo, self._ref, self._path, self._buffer._chunk_size) From dcfbbdee4d8effb4aed40f2a15accc51abdbf602 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Fri, 17 Mar 2023 15:49:23 +0100 Subject: [PATCH 12/23] Add name to raw reader and writer --- smart_open/lakefs.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index e79271ba..58abe015 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -123,6 +123,7 @@ def __init__( self._ref = ref self._path = key self._position = 0 + self.name = key def seekable(self) -> bool: return True @@ -212,6 +213,7 @@ def __init__( self._message = commit_message else: self._message = f"Update {self._path}." + self.name = key def writable(self) -> bool: return True From 31ba8aa3c249ea72db8d7d39d804370ab46b3c58 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Fri, 17 Mar 2023 15:49:47 +0100 Subject: [PATCH 13/23] Fix test to use local compose file --- smart_open/tests/docker-compose.yml | 31 +++++++++++++++++++++++++++++ smart_open/tests/test_lakefs.py | 21 +++++++++---------- 2 files changed, 40 insertions(+), 12 deletions(-) create mode 100644 smart_open/tests/docker-compose.yml diff --git a/smart_open/tests/docker-compose.yml b/smart_open/tests/docker-compose.yml new file mode 100644 index 00000000..368a03cd --- /dev/null +++ b/smart_open/tests/docker-compose.yml @@ -0,0 +1,31 @@ +version: '3' +services: + lakefs: + image: "treeverse/lakefs:0.89.0" + ports: + - "8000:8000" + depends_on: + - "postgres" + environment: + - LAKEFS_AUTH_ENCRYPT_SECRET_KEY=${LAKEFS_AUTH_ENCRYPT_SECRET_KEY:-some random secret string} + - LAKEFS_DATABASE_TYPE=${LAKEFS_DATABASE_TYPE:-postgres} + - LAKEFS_DATABASE_POSTGRES_CONNECTION_STRING=${LAKEFS_DATABASE_POSTGRES_CONNECTION_STRING:-postgres://lakefs:lakefs@postgres/postgres?sslmode=disable} + - LAKEFS_BLOCKSTORE_TYPE=${LAKEFS_BLOCKSTORE_TYPE:-local} + - LAKEFS_BLOCKSTORE_LOCAL_PATH=${LAKEFS_BLOCKSTORE_LOCAL_PATH:-/home/lakefs} + - LAKEFS_GATEWAYS_S3_DOMAIN_NAME=${LAKEFS_GATEWAYS_S3_DOMAIN_NAME:-s3.local.lakefs.io:8000} + - LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-} + - LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_SECRET_KEY=${AWS_SECRET_ACCESS_KEY:-} + - LAKEFS_LOGGING_LEVEL=${LAKEFS_LOGGING_LEVEL:-INFO} + - LAKEFS_STATS_ENABLED + - LAKEFS_BLOCKSTORE_S3_ENDPOINT + - LAKEFS_BLOCKSTORE_S3_FORCE_PATH_STYLE + - LAKEFS_COMMITTED_LOCAL_CACHE_DIR=${LAKEFS_COMMITTED_LOCAL_CACHE_DIR:-/home/lakefs/.local_tier} + entrypoint: ["/app/wait-for", "postgres:5432", "--", "/app/lakefs", "run"] + postgres: + image: "postgres" + command: "-c log_min_messages=FATAL" + environment: + POSTGRES_USER: lakefs + POSTGRES_PASSWORD: lakefs + # logging: + # driver: none diff --git a/smart_open/tests/test_lakefs.py b/smart_open/tests/test_lakefs.py index 765494e1..f4b37a36 100644 --- a/smart_open/tests/test_lakefs.py +++ b/smart_open/tests/test_lakefs.py @@ -23,15 +23,13 @@ @pytest.fixture(scope="module") def lakefs(): + import os import shlex import subprocess from urllib3.exceptions import MaxRetryError - compose = subprocess.Popen( - shlex.split("curl https://compose.lakefs.io"), stdout=subprocess.PIPE - ) - subprocess.Popen(shlex.split("docker-compose -f - up -d"), stdin=compose.stdout) - compose.stdout.close() + cwd = os.path.dirname(os.path.realpath(__file__)) + subprocess.Popen(shlex.split("docker compose up -d"), cwd=cwd) configuration = lakefs_client.Configuration(_LAKEFS_HOST) lfs_client = client.LakeFSClient(configuration) @@ -77,11 +75,7 @@ def lakefs(): yield lfs_client - compose = subprocess.Popen( - shlex.split("curl https://compose.lakefs.io"), stdout=subprocess.PIPE - ) - subprocess.Popen(shlex.split("docker-compose -f - down"), stdin=compose.stdout) - compose.stdout.close() + subprocess.Popen(shlex.split("docker compose down"), cwd=cwd) @pytest.fixture(scope="module") @@ -337,9 +331,12 @@ def test_read_empty_file(self, lakefs, repo, put_to_repo): assert fin.read() == b"" def test_open(self, lakefs, repo, file): + from smart_open import open path, content = file - with open(repo.id, repo.default_branch, path, "rb", client=lakefs) as fin: - assert fin.read(100) == content + transport_params = {'client': lakefs} + uri=f"lakefs://{repo.id}/{repo.default_branch}/{path}" + with open(uri, transport_params=transport_params) as fin: + assert fin.read() == content.decode() class TestWriter: From 216f196f04a766650280cb00bb58987754a257df Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Mon, 27 Nov 2023 18:14:14 +0100 Subject: [PATCH 14/23] Fix imports and format --- smart_open/lakefs.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index 58abe015..5daf41cf 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -1,12 +1,12 @@ -import typing +import functools import io -import re import logging -import functools +import re +import typing try: import lakefs_client - from lakefs_client import client, apis, models + from lakefs_client import apis, client, models except ImportError: MISSING_DEPS = True @@ -197,6 +197,7 @@ class _RawWriter(io.RawIOBase): Provides low-level access to the underlying lakefs api. High level primites are implementedu using io.BufferedReader. """ + def __init__( self, client: client.LakeFSClient, From ff3998f3a5010942d3f18a455414598f6131a936 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Tue, 28 Nov 2023 12:11:54 +0100 Subject: [PATCH 15/23] Add dataclass for parsed uri --- smart_open/lakefs.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index 5daf41cf..f4495c8f 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +import dataclasses import functools import io import logging @@ -24,7 +27,15 @@ logger = logging.getLogger(__name__) -def parse_uri(uri_as_string): +@dataclasses.dataclass +class ParsedURI: + scheme: str + repo: str + ref: str + key: str + + +def parse_uri(uri_as_string: str) -> ParsedURI: """lakefs protocol URIs. lakeFS uses a specific format for path URIs. The URI lakefs://// @@ -58,9 +69,7 @@ def open_uri(uri: str, mode: str, transport_params: dict) -> typing.IO: """ parsed_uri = parse_uri(uri) kwargs = utils.check_kwargs(open, transport_params) - return open( - parsed_uri["repo"], parsed_uri["ref"], parsed_uri["key"], mode, **kwargs - ) + return open(parsed_uri.repo, parsed_uri.ref, parsed_uri.key, mode, **kwargs) def open( From 1a27d0d5d50d852a6d52bf667efd2e97d1677a19 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Tue, 28 Nov 2023 12:24:30 +0100 Subject: [PATCH 16/23] Fix parse_uri to raise errors on bad URI --- smart_open/lakefs.py | 17 +++++++++-------- smart_open/tests/test_lakefs.py | 4 +++- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index f4495c8f..be90e03f 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -44,17 +44,18 @@ def parse_uri(uri_as_string: str) -> ParsedURI: identifies the repository at a ref expression, and lakefs:// identifes a repo. """ sr = utils.safe_urlsplit(uri_as_string) - assert sr.scheme == SCHEME - repo = sr.netloc + if sr.scheme != SCHEME: + raise ValueError(f"Scheme is not `lakefs` in {uri_as_string}") _pattern = r"^/(?P[^/]+)/(?P.+)" _match = re.fullmatch(_pattern, sr.path) - if _match: - ref = _match.group("ref") - key = _match.group("key") + if _match is None: + raise ValueError( + f"Missing `branch/commit` and `path` in {uri_as_string}." + "The URI should have the format of `lakefs:////`" + ) else: - ref = None - key = None - return dict(scheme=SCHEME, repo=repo, ref=ref, key=key) + ref, key = _match.groups() + return ParsedURI(scheme=sr.scheme, repo=sr.netloc, ref=ref, key=key) def open_uri(uri: str, mode: str, transport_params: dict) -> typing.IO: diff --git a/smart_open/tests/test_lakefs.py b/smart_open/tests/test_lakefs.py index f4b37a36..a4e8b4f8 100644 --- a/smart_open/tests/test_lakefs.py +++ b/smart_open/tests/test_lakefs.py @@ -103,9 +103,11 @@ def repo(lakefs) -> models.Repository: ], ) def test_parse_uri(uri, parsed): + from dataclasses import asdict + from smart_open.lakefs import parse_uri - assert parsed == parse_uri(uri) + assert parsed == asdict(parse_uri(uri)) class TestReader: From b895737cd2c05c8e4b20ef482d04e09bbee3a537 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Tue, 28 Nov 2023 12:49:07 +0100 Subject: [PATCH 17/23] Fix module and variable name clashing --- smart_open/lakefs.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index be90e03f..41874a1d 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -8,8 +8,8 @@ import typing try: - import lakefs_client - from lakefs_client import apis, client, models + from lakefs_client import apis, configuration, models + from lakefs_client import client as lfs_client except ImportError: MISSING_DEPS = True @@ -63,7 +63,7 @@ def open_uri(uri: str, mode: str, transport_params: dict) -> typing.IO: :param str uri: The URI to open :param str mode: Either "rb" or "wb". - :param dict transport_params: Any additional parameters to pass to the `open` function (see below). + :param dict transport_params: Any additional parameters to pass to `open`. :returns: file-like object. :rtype: file-like @@ -78,7 +78,7 @@ def open( ref: str, key: str, mode: str, - client: str | None = None, + client: lfs_client.LakeFSClient | None = None, commit_message: str | None = None, buffer_size: int = DEFAULT_BUFFER_SIZE, ): @@ -123,7 +123,7 @@ class _RawReader(io.RawIOBase): def __init__( self, - client: client.LakeFSClient, + client: lfs_client.LakeFSClient, repo: str, ref: str, key: str, @@ -205,12 +205,12 @@ class _RawWriter(io.RawIOBase): """Write a lakefs object. Provides low-level access to the underlying lakefs api. - High level primites are implementedu using io.BufferedReader. + High level primitives are implemented using io.BufferedReader. """ def __init__( self, - client: client.LakeFSClient, + client: lfs_client.LakeFSClient, repo: str, ref: str, key: str, From 1c5d4ac4634e03c35d0f4401cfa1bd0ce3577f99 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Tue, 28 Nov 2023 13:04:25 +0100 Subject: [PATCH 18/23] Fix read tests --- smart_open/tests/test_lakefs.py | 130 ++++++++++++-------------------- 1 file changed, 48 insertions(+), 82 deletions(-) diff --git a/smart_open/tests/test_lakefs.py b/smart_open/tests/test_lakefs.py index a4e8b4f8..06ed620c 100644 --- a/smart_open/tests/test_lakefs.py +++ b/smart_open/tests/test_lakefs.py @@ -3,13 +3,16 @@ # This code is distributed under the terms and conditions # from the MIT License (MIT). # -import typing -import pytest -import lakefs_client -from lakefs_client import client, models, apis +from __future__ import annotations + import logging -from smart_open.lakefs import open +from typing import Callable, Generator +import pytest +from lakefs_client import apis, configuration, exceptions, models +from lakefs_client import client as lfs_client + +from smart_open.lakefs import open """It needs docker compose to run lakefs locally: https://docs.lakefs.io/quickstart/run.html @@ -22,25 +25,26 @@ @pytest.fixture(scope="module") -def lakefs(): +def lakefs() -> Generator[lfs_client.LakeFSClient, None, None]: import os import shlex import subprocess + from urllib3.exceptions import MaxRetryError cwd = os.path.dirname(os.path.realpath(__file__)) subprocess.Popen(shlex.split("docker compose up -d"), cwd=cwd) - configuration = lakefs_client.Configuration(_LAKEFS_HOST) - lfs_client = client.LakeFSClient(configuration) + conf = configuration.Configuration(_LAKEFS_HOST) + client = lfs_client.LakeFSClient(conf) - healthcheck: apis.HealthCheckApi = lfs_client.healthcheck + healthcheck: apis.HealthCheckApi = client.healthcheck api_available = False while not api_available: try: healthcheck.health_check() api_available = True - except (lakefs_client.ApiException, MaxRetryError): + except (exceptions.ApiException, MaxRetryError): continue comm_prefs = models.CommPrefsInput( @@ -50,30 +54,30 @@ def lakefs(): ) username = models.Setup(username="admin") try: - config: apis.ConfigApi = lfs_client.config + config: apis.ConfigApi = client.config _ = config.setup_comm_prefs(comm_prefs) credentials: models.CredentialsWithSecret = config.setup(username) - except lakefs_client.ApiException as e: - raise Exception( - "Error setting up lakefs: %s\n" % e - ) from lakefs_client.ApiException - configuration = lakefs_client.Configuration( + except exceptions.ApiException as e: + raise Exception("Error setting up lakefs: %s\n" % e) from e + conf = configuration.Configuration( host=_LAKEFS_HOST, username=credentials.access_key_id, password=credentials.secret_access_key, ) lfs_client = client.LakeFSClient(configuration) - repositories_api: apis.RepositoriesApi = lfs_client.repositories + client = lfs_client.LakeFSClient(conf) + + repositories_api: apis.RepositoriesApi = client.repositories new_repo = models.RepositoryCreation( name="repo", storage_namespace="local:///home/lakefs/", default_branch="main" ) try: _: models.Repository = repositories_api.create_repository(new_repo) - except lakefs_client.ApiException as e: + except exceptions.ApiException as e: raise Exception("Error creating repository: %s\n" % e) from e - yield lfs_client + yield client subprocess.Popen(shlex.split("docker compose down"), cwd=cwd) @@ -116,12 +120,12 @@ def put_to_repo( self, lakefs, repo: models.Repository, - ) -> typing.Callable: + ) -> Callable: def _put_to_repo( path: str, content: bytes, branch: str | None = None, - ) -> typing.IO: + ) -> tuple[str, bytes]: from io import BytesIO objects: apis.ObjectsApi = lakefs.objects @@ -130,14 +134,14 @@ def _put_to_repo( stream.name = path try: _ = objects.upload_object(repo.id, _branch, path, content=stream) - except lakefs_client.ApiException as e: + except exceptions.ApiException as e: raise Exception("Error uploading object: %s\n" % e) from e return path, content return _put_to_repo @pytest.fixture(scope="module") - def file(self, put_to_repo) -> typing.IO: + def file(self, put_to_repo) -> tuple[str, bytes]: path = "test/file.txt" content = "hello wořld\nhow are you?".encode("utf8") return put_to_repo(path, content) @@ -150,26 +154,20 @@ def test_iter(self, lakefs, repo, file): def test_iter_context_manager(self, lakefs, repo, file): path, content = file - with open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) as fin: + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: output = [line.rstrip(b"\n") for line in fin] assert output == content.split(b"\n") def test_read(self, lakefs, repo, file): path, content = file - fin = open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) assert content[:6] == fin.read(6) assert content[6 : 6 + 8] == fin.read1(8) assert content[6 + 8 :] == fin.read() def test_readinto(self, lakefs, repo, file): path, content = file - fin = open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) b = bytearray(6) assert len(b) == fin.readinto(b) assert content[:6] == b @@ -178,9 +176,7 @@ def test_readinto(self, lakefs, repo, file): def test_seek_beginning(self, lakefs, repo, file): path, content = file - fin = open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) assert content[:6] == fin.read(6) assert content[6 : 6 + 8] == fin.read(8) fin.seek(0) @@ -190,9 +186,7 @@ def test_seek_beginning(self, lakefs, repo, file): def test_seek_start(self, lakefs, repo, file): path, _ = file - fin = open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) assert fin.seek(6) == 6 assert fin.tell() == 6 assert fin.read(6) == "wořld".encode("utf-8") @@ -201,9 +195,7 @@ def test_seek_current(self, lakefs, repo, file): from smart_open import constants path, _ = file - fin = open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) assert fin.read(5) == b"hello" assert fin.seek(1, constants.WHENCE_CURRENT) == 6 assert fin.read(6) == "wořld".encode("utf-8") @@ -212,26 +204,20 @@ def test_seek_end(self, lakefs, repo, file): from smart_open import constants path, content = file - fin = open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) assert fin.seek(-4, constants.WHENCE_END) == len(content) - 4 assert fin.read() == b"you?" def test_seek_past_end(self, lakefs, repo, file): path, content = file - fin = open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) assert fin.seek(60) == len(content) def test_detect_eof(self, lakefs, repo, file): from smart_open import constants path, content = file - fin = open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) + fin = open(repo.id, repo.default_branch, path, "rb", lakefs) fin.read() eof = fin.tell() assert eof == len(content) @@ -241,8 +227,8 @@ def test_detect_eof(self, lakefs, repo, file): assert fin.tell() == eof def test_read_gzip(self, lakefs, repo, put_to_repo): - from io import BytesIO import gzip + from io import BytesIO expected = "раcцветали яблони и груши, поплыли туманы над рекой...".encode( "utf-8" @@ -257,9 +243,7 @@ def test_read_gzip(self, lakefs, repo, put_to_repo): # # Make sure we're reading things correctly. # - with open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) as fin: + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: assert fin.read() == buf.getvalue() # @@ -269,10 +253,7 @@ def test_read_gzip(self, lakefs, repo, put_to_repo): with gzip.GzipFile(fileobj=sanity_buf) as zipfile: assert zipfile.read() == expected - logger.debug("starting actual test") - with open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) as fin: + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: with gzip.GzipFile(fileobj=fin) as zipfile: assert zipfile.read() == expected @@ -280,9 +261,7 @@ def test_readline(self, lakefs, repo, put_to_repo): content = b"englishman\nin\nnew\nyork\n" path = "many_lines.txt" _ = put_to_repo(path, content) - with open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) as fin: + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: fin.readline() assert fin.tell() == content.index(b"\n") + 1 fin.seek(0) @@ -294,28 +273,19 @@ def test_readline_tiny_buffer(self, lakefs, repo, put_to_repo): path = "many_lines.txt" _ = put_to_repo(path, content) with open( - mode="rb", - client=lakefs, - repo=repo.id, - ref=repo.default_branch, - key=path, - buffer_size=8, + repo.id, repo.default_branch, path, "rb", lakefs, buffer_size=8 ) as fin: assert list(fin) == [b"englishman\n", b"in\n", b"new\n", b"york\n"] assert fin.tell() == len(content) def test_read0_does_not_return_data(self, lakefs, repo, file): path, _ = file - with open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) as fin: + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: assert fin.read(0) == b"" def test_read_past_end(self, lakefs, repo, file): path, content = file - with open( - mode="rb", client=lakefs, repo=repo.id, ref=repo.default_branch, key=path - ) as fin: + with open(repo.id, repo.default_branch, path, "rb", lakefs) as fin: assert fin.read(100) == content def test_read_empty_file(self, lakefs, repo, put_to_repo): @@ -323,20 +293,16 @@ def test_read_empty_file(self, lakefs, repo, put_to_repo): path = "empty_file.txt" _ = put_to_repo(path, content) with open( - mode="rb", - client=lakefs, - repo=repo.id, - ref=repo.default_branch, - key=path, - buffer_size=8, + repo.id, repo.default_branch, path, "rb", lakefs, buffer_size=8 ) as fin: assert fin.read() == b"" - def test_open(self, lakefs, repo, file): + def test_open_with_transport_params(self, lakefs, repo, file): from smart_open import open + path, content = file - transport_params = {'client': lakefs} - uri=f"lakefs://{repo.id}/{repo.default_branch}/{path}" + transport_params = {"client": lakefs} + uri = f"lakefs://{repo.id}/{repo.default_branch}/{path}" with open(uri, transport_params=transport_params) as fin: assert fin.read() == content.decode() From 9cc62d595ab2e36e2509be0db22bb8882be9e04d Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Tue, 28 Nov 2023 13:07:28 +0100 Subject: [PATCH 19/23] Add envar usage to create lfs_client --- smart_open/lakefs.py | 17 +++++++++++++++-- smart_open/tests/test_lakefs.py | 12 +++++++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index 41874a1d..0ba6b8ef 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -4,6 +4,7 @@ import functools import io import logging +import os import re import typing @@ -101,8 +102,20 @@ def open( buffer_size: int, optional The buffer size to use when performing I/O. """ - if not client: - raise ValueError("you must specify the client to connect to lakefs") + if client is None: + try: + conf = configuration.Configuration( + host=os.environ["LAKECTL_SERVER_ENDPOINT_URL"], + username=os.environ["LAKECTL_CREDENTIALS_ACCESS_KEY_ID"], + password=os.environ["LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY"], + ) + client = lfs_client.LakeFSClient(conf) + except KeyError as e: + raise ValueError( + "Missing lakectl credentials. Please set " + "LAKECTL_SERVER_ENDPOINT_URL, LAKECTL_CREDENTIALS_ACCESS_KEY_ID, " + "and LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY" + ) from e if mode == constants.READ_BINARY: raw = _RawReader(client, repo, ref, key) diff --git a/smart_open/tests/test_lakefs.py b/smart_open/tests/test_lakefs.py index 06ed620c..ca280d05 100644 --- a/smart_open/tests/test_lakefs.py +++ b/smart_open/tests/test_lakefs.py @@ -64,7 +64,9 @@ def lakefs() -> Generator[lfs_client.LakeFSClient, None, None]: username=credentials.access_key_id, password=credentials.secret_access_key, ) - lfs_client = client.LakeFSClient(configuration) + os.environ["LAKECTL_SERVER_ENDPOINT_URL"] = _LAKEFS_HOST + os.environ["LAKECTL_CREDENTIALS_ACCESS_KEY_ID"] = credentials.access_key_id + os.environ["LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY"] = credentials.secret_access_key client = lfs_client.LakeFSClient(conf) @@ -306,6 +308,14 @@ def test_open_with_transport_params(self, lakefs, repo, file): with open(uri, transport_params=transport_params) as fin: assert fin.read() == content.decode() + def test_open_with_envvar_credentials(self, lakefs, repo, file): + from smart_open import open + + path, content = file + uri = f"lakefs://{repo.id}/{repo.default_branch}/{path}" + with open(uri) as fin: + assert fin.read() == content.decode() + class TestWriter: def commits(self, lakefs, repo): From 76df04819e893285adb482b2f32740c8d0a762d9 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Tue, 28 Nov 2023 13:08:55 +0100 Subject: [PATCH 20/23] Fix typing --- smart_open/lakefs.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index 0ba6b8ef..2e33e357 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -6,7 +6,9 @@ import logging import os import re -import typing +from typing import IO + +from typing_extensions import Buffer try: from lakefs_client import apis, configuration, models @@ -59,7 +61,7 @@ def parse_uri(uri_as_string: str) -> ParsedURI: return ParsedURI(scheme=sr.scheme, repo=sr.netloc, ref=ref, key=key) -def open_uri(uri: str, mode: str, transport_params: dict) -> typing.IO: +def open_uri(uri: str, mode: str, transport_params: dict) -> IO: """Return a file-like object pointing to the URI. :param str uri: The URI to open @@ -191,7 +193,7 @@ def seek(self, __offset: int, __whence: int = constants.WHENCE_START) -> int: return self._position - def readinto(self, __buffer: bytes) -> int | None: + def readinto(self, __buffer: Buffer) -> int | None: """Read bytes into a pre-allocated bytes-like object __buffer. :param int size: number of bytes to read. @@ -242,7 +244,7 @@ def __init__( def writable(self) -> bool: return True - def write(self, __b: bytes) -> int | None: + def write(self, __b: Buffer) -> int | None: objects: apis.ObjectsApi = self._client.objects commits: apis.CommitsApi = self._client.commits stream = io.BytesIO(__b) From 321ab0fb587942c3fdec9885bd3e622a6c68a0ce Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Tue, 28 Nov 2023 13:10:11 +0100 Subject: [PATCH 21/23] Remove try-exept for write operation --- smart_open/lakefs.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index 2e33e357..209d070f 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -249,12 +249,9 @@ def write(self, __b: Buffer) -> int | None: commits: apis.CommitsApi = self._client.commits stream = io.BytesIO(__b) stream.name = self._path - try: - object_stats = objects.upload_object( - self._repo, self._ref, self._path, content=stream - ) - message = models.CommitCreation(self._message) - _ = commits.commit(self._repo, self._ref, message) - except lakefs_client.ApiException as e: - raise Exception("Error uploading object: %s\n" % e) from e + object_stats = objects.upload_object( + self._repo, self._ref, self._path, content=stream + ) + message = models.CommitCreation(self._message) + _ = commits.commit(self._repo, self._ref, message) return object_stats.size_bytes From b648aec907e29bd21d27ef7c3f6800eb8ea8594f Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Tue, 28 Nov 2023 16:30:00 +0100 Subject: [PATCH 22/23] Remove buffer typing_extension --- smart_open/lakefs.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index 209d070f..ecd1ff8e 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -8,8 +8,6 @@ import re from typing import IO -from typing_extensions import Buffer - try: from lakefs_client import apis, configuration, models from lakefs_client import client as lfs_client @@ -193,7 +191,7 @@ def seek(self, __offset: int, __whence: int = constants.WHENCE_START) -> int: return self._position - def readinto(self, __buffer: Buffer) -> int | None: + def readinto(self, __buffer) -> int | None: """Read bytes into a pre-allocated bytes-like object __buffer. :param int size: number of bytes to read. @@ -244,7 +242,7 @@ def __init__( def writable(self) -> bool: return True - def write(self, __b: Buffer) -> int | None: + def write(self, __b) -> int | None: objects: apis.ObjectsApi = self._client.objects commits: apis.CommitsApi = self._client.commits stream = io.BytesIO(__b) From 8491f096fc2a2fda343f770a199c2c63d469f1f1 Mon Sep 17 00:00:00 2001 From: Alvaro Alonso <112358.fn@gmail.com> Date: Wed, 29 Nov 2023 10:12:17 +0100 Subject: [PATCH 23/23] Fix typos and linting errors --- smart_open/lakefs.py | 4 ++-- smart_open/tests/test_lakefs.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/smart_open/lakefs.py b/smart_open/lakefs.py index ecd1ff8e..d0843054 100644 --- a/smart_open/lakefs.py +++ b/smart_open/lakefs.py @@ -42,7 +42,7 @@ def parse_uri(uri_as_string: str) -> ParsedURI: lakeFS uses a specific format for path URIs. The URI lakefs://// is a path to objects in the given repo and ref expression under key. This is used both for path prefixes and for full paths. In similar fashion, lakefs:/// - identifies the repository at a ref expression, and lakefs:// identifes a repo. + identifies the repository at a ref expression, and lakefs:// identifies a repo. """ sr = utils.safe_urlsplit(uri_as_string) if sr.scheme != SCHEME: @@ -131,7 +131,7 @@ class _RawReader(io.RawIOBase): """Read a lakeFS object. Provides low-level access to the underlying lakefs api. - High level primites are implementedu using io.BufferedReader. + High level primitives are implemented using io.BufferedReader. """ def __init__( diff --git a/smart_open/tests/test_lakefs.py b/smart_open/tests/test_lakefs.py index ca280d05..e9dec002 100644 --- a/smart_open/tests/test_lakefs.py +++ b/smart_open/tests/test_lakefs.py @@ -164,8 +164,8 @@ def test_read(self, lakefs, repo, file): path, content = file fin = open(repo.id, repo.default_branch, path, "rb", lakefs) assert content[:6] == fin.read(6) - assert content[6 : 6 + 8] == fin.read1(8) - assert content[6 + 8 :] == fin.read() + assert content[6:6 + 8] == fin.read1(8) + assert content[6 + 8:] == fin.read() def test_readinto(self, lakefs, repo, file): path, content = file @@ -174,13 +174,13 @@ def test_readinto(self, lakefs, repo, file): assert len(b) == fin.readinto(b) assert content[:6] == b assert len(b) == fin.readinto1(b) - assert content[6 : 6 + 6] == b + assert content[6:6 + 6] == b def test_seek_beginning(self, lakefs, repo, file): path, content = file fin = open(repo.id, repo.default_branch, path, "rb", lakefs) assert content[:6] == fin.read(6) - assert content[6 : 6 + 8] == fin.read(8) + assert content[6:6 + 8] == fin.read(8) fin.seek(0) assert content == fin.read() fin.seek(0)