Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IO buffering to speed up IO operations on raw json #300

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions th2_data_services/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,13 +870,14 @@ def from_cache_file(cls, filename, pickle_version: int = o.DEFAULT_PICKLE_VERSIO
return data_obj

@classmethod
def from_json(cls, filename, buffer_limit=250, gzip=False) -> "Data[dict]":
def from_json(cls, filename, buffer_limit=250, gzip=False, bytes_buffer_size = 16*1024*1024) -> "Data[dict]":
"""Creates Data object from json-lines file with provided name.

Args:
filename: Name or path to cache file.
buffer_limit: If limit is 0 buffer will not be used. Number of messages in buffer before parsing.
gzip: Set to true if file is json file compressed using gzip.
bytes_buffer_size: Size of the buffer that will be used to speed up IO operations using BufferedIO.

Returns:
Data: Data object.
Expand All @@ -889,7 +890,7 @@ def from_json(cls, filename, buffer_limit=250, gzip=False) -> "Data[dict]":
if gzip:
data = cls(iter_json_gzip_file(filename, buffer_limit))
else:
data = cls(iter_json_file(filename, buffer_limit))
data = cls(iter_json_file(filename, buffer_limit, bytes_buffer_size))
data.update_metadata({"source_file": filename})
return data

Expand Down
13 changes: 7 additions & 6 deletions th2_data_services/utils/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import io
from typing import Generator
import gzip

Expand All @@ -21,14 +21,15 @@
from th2_data_services.utils.decode_error_handler import UNICODE_REPLACE_HANDLER


def iter_json_file(filename, buffer_limit=250):
def iter_json_file(filename, buffer_limit=250, bytes_buffer_size=16*1024*1024):
"""Returns the function that returns generators."""

def iter_json_file_logic():
"""Generator that reads and yields decoded JSON objects from a file."""
json_processor = BufferedJSONProcessor(buffer_limit)

with open(filename, "r") as data:
with open(filename, "rb") as data:
buffered_reader = io.BufferedReader(data, buffer_size=bytes_buffer_size)
while True:
try:
v = data.readline()
Expand All @@ -38,7 +39,7 @@ def iter_json_file_logic():
yield from json_processor.decode(v)
except ValueError:
print(len(json_processor.buffer))
print(f"Error string: {v}")
print(f"Error string: {v.decode('utf-8')}")
raise
yield from json_processor.fin()

Expand Down Expand Up @@ -122,8 +123,8 @@ def from_buffer(self) -> Generator:
Generator[dict]
"""
try:
for i in json.loads("[" + ",".join(self.buffer) + "]"):
yield i
for obj in self.buffer:
yield json.loads(obj)
except JSONDecodeError as e:
raise ValueError(
f"json.decoder.JSONDecodeError: Invalid json received.\n" f"{e}\n" f"{self.buffer}"
Expand Down
Loading