-
Notifications
You must be signed in to change notification settings - Fork 6
/
streamparser.py
executable file
·290 lines (231 loc) · 8.86 KB
/
streamparser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
#!/usr/bin/env python3
# coding=utf-8
"""
Usage: streamparser.py [FILE]
Consumes input from a file (first argument) or stdin, parsing and pretty printing the readings of lexical units found.
"""
__all__ = [
'Knownness', 'known', 'unknown', 'biunknown', 'genunknown', 'LexicalUnit', 'SReading',
'subreading_to_string', 'reading_to_string', 'mainpos', 'parse', 'parse_file',
]
__author__ = 'Sushain K. Cherivirala, Kevin Brubeck Unhammer'
__copyright__ = 'Copyright 2016--2018, Sushain K. Cherivirala, Kevin Brubeck Unhammer'
__credits__ = ['Sushain K. Cherivirala', 'Kevin Brubeck Unhammer']
__license__ = 'GPLv3+'
__status__ = 'Production'
__version__ = '5.0.2'
import fileinput
import functools
import itertools
import pprint
import re
import warnings
from collections import namedtuple
if False:
from typing import Type, List, Tuple, Iterator, Iterable, Generator, Union # noqa: F401
class Knownness:
"""Level of knowledge associated with a :class:`LexicalUnit`. \n
Values: :class:`known`, :class:`unknown`, :class:`biunknown`, :class:`genunknown`
"""
symbol = ''
class known(Knownness): # noqa: N801
pass
class unknown(Knownness): # noqa: N801
"""Denoted by ``*``, analysis not available."""
symbol = '*'
class biunknown(Knownness): # noqa: N801
"""Denoted by ``@``, translation not available."""
symbol = '@'
class genunknown(Knownness): # noqa: N801
"""Denoted by ``#``, generated form not available."""
symbol = '#'
def _symbol_to_knownness(symbol): # type: (str) -> Type[Knownness]
return {'*': unknown, '@': biunknown, '#': genunknown}.get(symbol, known)
SReading = namedtuple('SReading', ['baseform', 'tags'])
SReading.__doc__ = """A single subreading of an analysis of a token.
Attributes:
baseform (str): The base form (lemma, lexical form, citation form) of the reading.
tags (List[str]): The morphological tags associated with the reading.
"""
def subreading_to_string(sub): # type: (SReading) -> str
return sub.baseform + ''.join('<' + t + '>' for t in sub.tags) # type: ignore
def reading_to_string(reading): # type: (List[SReading]) -> str
return '+'.join(subreading_to_string(sub) for sub in reading)
def mainpos(reading, ltr=False): # type: (SReading, bool) -> str
"""Return the first part-of-speech tag of a reading. If there are
several subreadings, by default give the first tag of the last
subreading. If ltr=True, give the first tag of the first
subreading, see
http://beta.visl.sdu.dk/cg3/single/#sub-stream-apertium for more
information.
"""
if ltr:
return reading[0].tags[0] # type: ignore
else:
return reading[-1].tags[0] # type: ignore
def _parse_tags(tag_str): # type: (str) -> List[str]
in_tag = False
tags = []
buf = ''
stream = (c for c in tag_str)
for c in stream:
if not in_tag and c == '<':
in_tag = True
continue
elif c == '\\':
buf += c
buf += next(stream)
elif c == '>':
tags.append(buf)
buf = ''
in_tag = False
else:
buf += c
if buf != '':
tags.append(buf)
return tags
def _parse_subreading(reading): # type: (str) -> List[Tuple[str, str]]
in_lemma = True
lemma = ''
subs = []
buf = ''
stream = (c for c in reading)
for c in stream:
if c == '+':
subs.append((lemma, buf))
buf = ''
lemma = ''
in_lemma = True
continue
elif c == '\\':
buf += c
buf += next(stream)
elif in_lemma and c == '<':
in_lemma = False
lemma = buf
buf = ''
buf += c
else:
buf += c
if buf != '':
if in_lemma:
subs.append((lemma + buf, ''))
else:
subs.append((lemma, buf))
return subs
class LexicalUnit:
"""A lexical unit consisting of a lemma and its readings.
Attributes:
lexical_unit (str): The lexical unit in Apertium stream format.
wordform (str): The word form (surface form) of the lexical unit.
wordbound_blank (str): The wordbound blank of the lexical unit.
readings (List[List[:class:`SReading`]]): The analyses of the lexical unit with sublists containing all subreadings.
knownness (:class:`Knownness`): The level of knowledge of the lexical unit.
"""
def __init__(self, lexical_unit): # type: (str) -> None
self.lexical_unit = lexical_unit
cohort = re.split(r'(?<!\\)/', lexical_unit)
if ']]^' in cohort[0]:
self.wordbound_blank, self.wordform = cohort[0].split(']]^', 1)
self.wordbound_blank += ']]'
else:
self.wordbound_blank = ''
self.wordform = cohort[0]
readings = cohort[1:]
if len(readings) == 1:
self.knownness = _symbol_to_knownness(readings[0][:1])
else:
self.knownness = known
self.readings = [] # type: List[List[SReading]]
for reading in readings:
if len(reading) < 1:
warnings.warn('Empty readings for {}'.format(self.lexical_unit), RuntimeWarning)
else:
subreadings = []
for subreading in _parse_subreading(reading):
baseform = subreading[0].lstrip('+')
tags = _parse_tags(subreading[1])
subreadings.append(SReading(baseform=baseform, tags=tags))
self.readings.append(subreadings)
def __repr__(self): # type: () -> str
return self.lexical_unit
@functools.singledispatch
def parse(stream, with_text=False): # type: (Iterator[str], bool) -> Iterator[Union[Tuple[str, LexicalUnit], LexicalUnit]]
"""Generates lexical units from a character stream.
Args:
stream (Iterator[str]): A character stream containing lexical units, superblanks and other text.
with_text (Optional[bool]): A boolean defining whether to output preceding text with each lexical unit.
Yields:
:class:`LexicalUnit`: The next lexical unit found in the character stream. (if `with_text` is False) \n
*(str, LexicalUnit)* - The next lexical unit found in the character stream and the the text that seperated it from the
prior unit in a tuple. (if with_text is True)
"""
buffer = ''
text_buffer = ''
in_lexical_unit = False
in_superblank = False
for char in stream:
if in_superblank:
if char == ']':
in_superblank = False
text_buffer += char
elif char == '\\':
text_buffer += char
text_buffer += next(stream)
else:
text_buffer += char
elif in_lexical_unit:
if char == '$':
if with_text:
yield (text_buffer, LexicalUnit(buffer))
else:
yield LexicalUnit(buffer)
buffer = ''
text_buffer = ''
in_lexical_unit = False
elif char == '\\':
buffer += char
buffer += next(stream)
else:
buffer += char
else:
if char == '[':
next_char = next(stream)
if next_char == '[':
buffer += '[['
in_lexical_unit = True
else:
in_superblank = True
text_buffer += char
if next_char == ']':
in_superblank = False
text_buffer += next_char
elif next_char == '\\':
text_buffer += next_char
text_buffer += next(stream)
else:
text_buffer += next_char
elif char == '^':
in_lexical_unit = True
elif char == '\\':
text_buffer += char
text_buffer += next(stream)
else:
text_buffer += char
@parse.register(str)
def _parse_str(str, **kwargs): # type: (str, dict) -> Iterator[Union[Tuple[str, LexicalUnit], LexicalUnit]]
return parse(iter(str), **kwargs)
def parse_file(f, **kwargs): # type: (Iterable, dict) -> Iterator[Union[Tuple[str, LexicalUnit], LexicalUnit]]
"""Generates lexical units from a file.
Args:
f (file): A file containing lexical units, superblanks and other text.
Yields:
:class:`LexicalUnit`: The next lexical unit found in the file.
"""
return parse(itertools.chain.from_iterable(f), **kwargs)
def main(): # type: () -> None
lexical_units = parse_file(fileinput.input())
for lexical_unit in lexical_units:
pprint.pprint(lexical_unit.readings, width=120) # type: ignore
if __name__ == '__main__':
main()