From 71922496b916a082da69cf7189ca5c1bfa638f34 Mon Sep 17 00:00:00 2001 From: Cristi Burca Date: Mon, 11 Jun 2018 16:51:26 +0100 Subject: [PATCH 1/3] Run 2to3 utility --- .../python/pymongo_hadoop/__init__.py | 12 ++++++------ .../language_support/python/pymongo_hadoop/input.py | 12 ++++++------ .../language_support/python/pymongo_hadoop/mapper.py | 4 ++-- .../python/pymongo_hadoop/reducer.py | 6 +++--- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/streaming/language_support/python/pymongo_hadoop/__init__.py b/streaming/language_support/python/pymongo_hadoop/__init__.py index 7920a603..24264393 100644 --- a/streaming/language_support/python/pymongo_hadoop/__init__.py +++ b/streaming/language_support/python/pymongo_hadoop/__init__.py @@ -1,10 +1,10 @@ import sys -from input import BSONInput, KeyValueBSONInput -from output import BSONOutput, KeyValueBSONOutput -from reducer import BSONReducer, BSONReducerInput -from reducer import KeyValueBSONReducer, KeyValueBSONReducerInput -from mapper import BSONMapper, KeyValueBSONMapper +from .input import BSONInput, KeyValueBSONInput +from .output import BSONOutput, KeyValueBSONOutput +from .reducer import BSONReducer, BSONReducerInput +from .reducer import KeyValueBSONReducer, KeyValueBSONReducerInput +from .mapper import BSONMapper, KeyValueBSONMapper __all__ = ['BSONInput', 'BSONOutput', 'KeyValueBSONOutput', 'KeyValueBSONInput', @@ -13,5 +13,5 @@ def dump_bits(bits): for bit in bits: - print >> sys.stderr, "\t * Bit: %s Ord: %d" % (hex(ord(bit)), ord(bit)) + print("\t * Bit: %s Ord: %d" % (hex(ord(bit)), ord(bit)), file=sys.stderr) diff --git a/streaming/language_support/python/pymongo_hadoop/input.py b/streaming/language_support/python/pymongo_hadoop/input.py index f8516c7c..88912c73 100644 --- a/streaming/language_support/python/pymongo_hadoop/input.py +++ b/streaming/language_support/python/pymongo_hadoop/input.py @@ -30,7 +30,7 @@ def _read(self): raise InvalidBSON("Bad EOO in BSON Data") doc = BSON(data).decode(codec_options=STREAMING_CODEC_OPTIONS) return doc - except struct.error, e: + except struct.error as e: #print >> sys.stderr, "Parsing Length record failed: %s" % e self.eof = True raise StopIteration(e) @@ -38,8 +38,8 @@ def _read(self): def read(self): try: return self._read() - except StopIteration, e: - print >> sys.stderr, "Iteration Failure: %s" % e + except StopIteration as e: + print("Iteration Failure: %s" % e, file=sys.stderr) return None def _reads(self): @@ -56,8 +56,8 @@ class KeyValueBSONInput(BSONInput): def read(self): try: doc = self._read() - except StopIteration, e: - print >> sys.stderr, "Key/Value Input iteration failed/stopped: %s" % e + except StopIteration as e: + print("Key/Value Input iteration failed/stopped: %s" % e, file=sys.stderr) return None if '_id' in doc: return doc['_id'], doc @@ -66,7 +66,7 @@ def read(self): def reads(self): it = self._reads() - n = it.next + n = it.__next__ while 1: doc = n() if '_id' in doc: diff --git a/streaming/language_support/python/pymongo_hadoop/mapper.py b/streaming/language_support/python/pymongo_hadoop/mapper.py index 90753d2a..5d6e1374 100644 --- a/streaming/language_support/python/pymongo_hadoop/mapper.py +++ b/streaming/language_support/python/pymongo_hadoop/mapper.py @@ -1,5 +1,5 @@ -from input import BSONInput, KeyValueBSONInput -from output import BSONOutput, KeyValueBSONOutput +from .input import BSONInput, KeyValueBSONInput +from .output import BSONOutput, KeyValueBSONOutput class BSONMapper(object): """Wraps BSONInput to allow writing mapper functions diff --git a/streaming/language_support/python/pymongo_hadoop/reducer.py b/streaming/language_support/python/pymongo_hadoop/reducer.py index b11ffb57..be69b6e5 100644 --- a/streaming/language_support/python/pymongo_hadoop/reducer.py +++ b/streaming/language_support/python/pymongo_hadoop/reducer.py @@ -3,8 +3,8 @@ https://github.com/klbostee/dumbo """ from itertools import groupby -from input import BSONInput, KeyValueBSONInput -from output import BSONOutput, KeyValueBSONOutput +from .input import BSONInput, KeyValueBSONInput +from .output import BSONOutput, KeyValueBSONOutput import sys @@ -55,7 +55,7 @@ def __init__(self, factory=None, input_fh=None): def default_reducer(data): - print >> sys.stderr, "*** Invoking default reducer function, this is unoptimized for your data and may be very slow." + print("*** Invoking default reducer function, this is unoptimized for your data and may be very slow.", file=sys.stderr) class BSONReducerInput(BSONInput): """Wrapper to 'roll up' the reduce data down to just From ca1ff5c3a01e58fa25c26ad733b5d56e514a8520 Mon Sep 17 00:00:00 2001 From: Cristi Burca Date: Mon, 11 Jun 2018 17:07:22 +0100 Subject: [PATCH 2/3] Use the binary stdin and stdout streams --- streaming/language_support/python/pymongo_hadoop/input.py | 2 +- streaming/language_support/python/pymongo_hadoop/output.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/language_support/python/pymongo_hadoop/input.py b/streaming/language_support/python/pymongo_hadoop/input.py index 88912c73..000b00d6 100644 --- a/streaming/language_support/python/pymongo_hadoop/input.py +++ b/streaming/language_support/python/pymongo_hadoop/input.py @@ -14,7 +14,7 @@ class BSONInput(object): https://github.com/klbostee/typedbytes """ - def __init__(self, fh=sys.stdin, unicode_errors='strict'): + def __init__(self, fh=sys.stdin.buffer, unicode_errors='strict'): self.fh = fh self.unicode_errors = unicode_errors self.eof = False diff --git a/streaming/language_support/python/pymongo_hadoop/output.py b/streaming/language_support/python/pymongo_hadoop/output.py index fbb9b0d8..d3c9f86b 100644 --- a/streaming/language_support/python/pymongo_hadoop/output.py +++ b/streaming/language_support/python/pymongo_hadoop/output.py @@ -10,7 +10,7 @@ class BSONOutput(object): https://github.com/klbostee/typedbytes """ - def __init__(self, fh=sys.stdout, unicode_errors='strict'): + def __init__(self, fh=sys.stdout.buffer, unicode_errors='strict'): self.fh = fh self.unicode_errors = unicode_errors From 6975b5fc09445ef3ae9003b202d42e63d4d98d12 Mon Sep 17 00:00:00 2001 From: Cristi Burca Date: Tue, 12 Jun 2018 09:52:06 +0100 Subject: [PATCH 3/3] fix byte comparison --- streaming/language_support/python/pymongo_hadoop/input.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/language_support/python/pymongo_hadoop/input.py b/streaming/language_support/python/pymongo_hadoop/input.py index 000b00d6..a7770afe 100644 --- a/streaming/language_support/python/pymongo_hadoop/input.py +++ b/streaming/language_support/python/pymongo_hadoop/input.py @@ -26,7 +26,7 @@ def _read(self): data = size_bits + self.fh.read(size) if len(data) != size + 4: raise struct.error("Unable to cleanly read expected BSON Chunk; EOF, underful buffer or invalid object size.") - if data[size + 4 - 1] != "\x00": + if data[size + 4 - 1] != 0: raise InvalidBSON("Bad EOO in BSON Data") doc = BSON(data).decode(codec_options=STREAMING_CODEC_OPTIONS) return doc