From 389105b918b76aeb3763e71694531449a92ebced Mon Sep 17 00:00:00 2001 From: Zach Probst Date: Tue, 30 Jul 2024 13:04:23 -0700 Subject: [PATCH 1/3] Add log message for when a source yields no files --- nodestream/pipeline/extractors/files.py | 31 ++++++++++++ tests/unit/pipeline/extractors/test_files.py | 51 ++++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/nodestream/pipeline/extractors/files.py b/nodestream/pipeline/extractors/files.py index 2fccc778e..16998158b 100644 --- a/nodestream/pipeline/extractors/files.py +++ b/nodestream/pipeline/extractors/files.py @@ -8,6 +8,7 @@ from csv import DictReader from glob import glob from io import BufferedReader, IOBase, TextIOWrapper +from logging import getLogger from pathlib import Path from typing import ( Any, @@ -162,6 +163,16 @@ def get_files(self) -> AsyncIterator[ReadableFile]: """ raise NotImplementedError + def describe(self) -> str: + """Return a human-readable description of the file source. + + This method should return a human-readable description of the file source. + The description should be a string that describes the file source in a + way that is understandable to the user. The description should be + concise and informative. + """ + return str(self) + @SUPPORTED_FILE_FORMAT_REGISTRY.connect_baseclass class FileCodec(Pluggable, ABC): @@ -437,6 +448,12 @@ async def get_files(self) -> AsyncIterator[ReadableFile]: for path in self.paths: yield LocalFile(path) + def describe(self) -> str: + if len(self.paths) == 1: + return f"{self.paths[0]}" + else: + return f"{len(self.paths)} local files" + class RemoteFileSource(FileSource): """A class that represents a source of remote files to be read. @@ -458,6 +475,12 @@ async def get_files(self) -> AsyncIterator[ReadableFile]: for url in self.urls: yield RemoteFile(url, client, self.memory_spooling_max_size) + def describe(self) -> str: + if len(self.urls) == 1: + return f"{self.urls[0]}" + else: + return f"{len(self.urls)} remote files" + class UnifiedFileExtractor(Extractor): """A class that extracts records from files. @@ -471,6 +494,7 @@ class UnifiedFileExtractor(Extractor): def __init__(self, file_sources: Iterable[FileSource]) -> None: self.file_sources = file_sources + self.logger = getLogger(__name__) async def read_file(self, file: ReadableFile) -> Iterable[JsonLikeDocument]: intermediaries: List[AsyncContextManager[ReadableFile]] = [] @@ -516,10 +540,17 @@ async def read_file(self, file: ReadableFile) -> Iterable[JsonLikeDocument]: async def extract_records(self) -> AsyncGenerator[Any, Any]: for file_source in self.file_sources: + total_files_from_source = 0 async for file in file_source.get_files(): + total_files_from_source += 1 async for record in self.read_file(file): yield record + if total_files_from_source == 0: + self.logger.warning( + f"No files found for source: {file_source.describe()}" + ) + # DEPRECATED CODE BELOW ## # diff --git a/tests/unit/pipeline/extractors/test_files.py b/tests/unit/pipeline/extractors/test_files.py index 7c47032ae..bcdaeae03 100644 --- a/tests/unit/pipeline/extractors/test_files.py +++ b/tests/unit/pipeline/extractors/test_files.py @@ -14,6 +14,8 @@ FileExtractor, LocalFileSource, RemoteFileExtractor, + UnifiedFileExtractor, + RemoteFileSource, ) SIMPLE_RECORD = {"record": "value"} @@ -218,3 +220,52 @@ async def test_remote_file_extractor_extract_records(mocker, httpx_mock): subject = RemoteFileExtractor.from_file_data(urls=files) results = [r async for r in subject.extract_records()] assert_that(results, equal_to([SIMPLE_RECORD, SIMPLE_RECORD])) + + +@pytest.mark.asyncio +async def test_no_files_found_from_local_source(mocker): + subject = UnifiedFileExtractor([LocalFileSource([])]) + subject.logger = mocker.Mock() + results = [r async for r in subject.extract_records()] + assert_that(results, equal_to([])) + subject.logger.warning.assert_called_once_with( + "No files found for source: 0 local files" + ) + + +@pytest.mark.asyncio +async def test_no_files_found_from_remote_source(mocker): + subject = UnifiedFileExtractor([RemoteFileSource([], 10)]) + subject.logger = mocker.Mock() + results = [r async for r in subject.extract_records()] + assert_that(results, equal_to([])) + subject.logger.warning.assert_called_once_with( + "No files found for source: 0 remote files" + ) + + +def test_remote_file_source_single_file_description(): + url = "https://example.com/file.json" + subject = RemoteFileSource([url], 10) + assert_that(subject.describe(), equal_to(url)) + + +def test_remote_file_source_multiple_file_description(): + urls = ["https://example.com/file.json", "https://example.com/file2.json"] + subject = RemoteFileSource(urls, 10) + assert_that(subject.describe(), equal_to("2 remote files")) + + +def test_local_file_source_single_file_description(fixture_directory): + path = Path(f"{fixture_directory}/file.json") + subject = LocalFileSource([path]) + assert_that(subject.describe(), equal_to(str(path))) + + +def test_local_file_source_multiple_file_description(fixture_directory): + paths = [ + Path(f"{fixture_directory}/file.json"), + Path(f"{fixture_directory}/file2.json"), + ] + subject = LocalFileSource(paths) + assert_that(subject.describe(), equal_to("2 local files")) From 61e67991028db42d78fdaef62d74fcabcbd32685 Mon Sep 17 00:00:00 2001 From: Zach Probst Date: Tue, 30 Jul 2024 13:11:20 -0700 Subject: [PATCH 2/3] lint --- tests/unit/pipeline/extractors/test_files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/pipeline/extractors/test_files.py b/tests/unit/pipeline/extractors/test_files.py index bcdaeae03..3886a6806 100644 --- a/tests/unit/pipeline/extractors/test_files.py +++ b/tests/unit/pipeline/extractors/test_files.py @@ -14,8 +14,8 @@ FileExtractor, LocalFileSource, RemoteFileExtractor, - UnifiedFileExtractor, RemoteFileSource, + UnifiedFileExtractor, ) SIMPLE_RECORD = {"record": "value"} From 9789989710488b5df3ba0fc790f3695b334c80c6 Mon Sep 17 00:00:00 2001 From: Zach Probst Date: Sat, 3 Aug 2024 17:20:13 -0700 Subject: [PATCH 3/3] Add default case --- tests/unit/pipeline/extractors/test_files.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/unit/pipeline/extractors/test_files.py b/tests/unit/pipeline/extractors/test_files.py index 3886a6806..77b8d8512 100644 --- a/tests/unit/pipeline/extractors/test_files.py +++ b/tests/unit/pipeline/extractors/test_files.py @@ -8,10 +8,11 @@ import pandas as pd import pytest import yaml -from hamcrest import assert_that, equal_to, has_length +from hamcrest import assert_that, contains_string, equal_to, has_length from nodestream.pipeline.extractors.files import ( FileExtractor, + FileSource, LocalFileSource, RemoteFileExtractor, RemoteFileSource, @@ -269,3 +270,12 @@ def test_local_file_source_multiple_file_description(fixture_directory): ] subject = LocalFileSource(paths) assert_that(subject.describe(), equal_to("2 local files")) + + +def test_file_source_default_description(): + class SomeFileSource(FileSource): + def get_files(self): + pass + + subject = SomeFileSource() + assert_that(subject.describe(), contains_string("SomeFileSource"))