-
Notifications
You must be signed in to change notification settings - Fork 0
/
jisc_alto2txt_wrangler.py
446 lines (341 loc) · 14.7 KB
/
jisc_alto2txt_wrangler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
"""
JISC alto2txt Wrangler
A command line tool for replacing 4-character title codes with 7-digit
NLP codes in the metadata XML files generated by executing alto2txt on the
output produced by JISC Wrangler.
Only the XML content of the _metadata.xml files produced by alto2txt is
modified. Specifically, the value of the "id" attribute associated with the
"publication" element is changed from a 4-character title code to a 7-digit
NLP code.
File structure and names are unchanged (i.e. duplicated in the output), to
ensure that paths and files quoted to in the metadata XML remain valid.
"""
import argparse
import csv
import logging
import os
import sys
import xml.etree.ElementTree as ET
from datetime import datetime
from pathlib import Path
from shutil import copy
from typing import Dict, Union
from tqdm import tqdm # type: ignore
from jisc_wrangler import constants, logutils, utils
def main():
try:
# Prepare for execution.
args = parse_args()
initialise(args)
# Process all of the files under the input directory.
process_inputs(args)
# Check that all of the input files were processed.
validate(args)
except Exception as e: # pylint: disable=broad-exception-caught
logging.exception(str(e))
print(f"ERROR: {str(e)}")
sys.exit()
def process_inputs(args: argparse.Namespace) -> None:
"""Process all of the files under the input directory.
Args:
args (argparse.Namespace): Runtime parameters.
"""
# Read the title code lookup file.
lookup = read_title_code_lookup_file()
# Get the input metadata file full paths.
metadata_files = utils.list_files(args.input_dir, constants.METADATA_XML_SUFFIX)
logging.info("Found %s metadata files.", len(metadata_files))
# Print the number of files to be processed (and a progress bar).
print(f"Processing {len(metadata_files)} metadata files")
failure_count = 0
for file in tqdm(metadata_files):
logging.debug("Processing file %s", file)
# Read the metadata XML file.
xml_tree = ET.parse(file)
# Replace the 4 character title code with the 7 character NLP code
# in the metadata XML.
try:
title_code, nlp = replace_publication_id(xml_tree, lookup)
except ValueError:
title_code, nlp = (None, None)
# If the publication code replacement failed, skip this file.
if title_code is None or nlp is None:
logging.warning("Skipping file %s & associated plaintext file", file)
continue
# Construct the output file path.
output_file = file.replace(args.input_dir, args.output_dir, 1)
# Write the modified XML tree to the output file.
if not args.dry_run:
output_dir = os.path.dirname(output_file)
if not os.path.isdir(output_dir):
Path(output_dir).mkdir(parents=True, exist_ok=True)
logging.info("Created subdirectory at %s", output_dir)
# xml_tree.write(output_file)
try:
with open(output_file, "wb") as open_f:
xml_tree.write(open_f)
except TypeError:
failure_count += 1
msg = "TypeError when writing XML ElementTree to {output_file}"
logging.error(
"TypeError when writing XML ElementTree to %s", output_file
)
os.remove(output_file)
print(msg + ". File was removed. Continuing...")
# Find the corresponding plaintext file.
plaintext_path = Path(
file.replace(constants.METADATA_XML_SUFFIX, constants.PLAINTEXT_EXTENSION)
)
if not plaintext_path.is_file():
msg = f"Failed to find plaintext file at: {plaintext_path}"
raise RuntimeError(msg)
# Construct the output plaintext file path.
output_plaintext_file = str(plaintext_path).replace(
args.input_dir, args.output_dir, 1
)
# Copy the plain text file to the output directory.
if not args.dry_run:
copy(str(plaintext_path), output_plaintext_file)
if failure_count > 0:
print(f"{failure_count} failures requiring manual intervention.")
def validate(args: argparse.Namespace) -> None:
"""Check the number of input and output files match.
Args:
args (argparse.Namespace): Runtime parameters
"""
# Compare the number of input & output metadata files.
input_metadata_files = utils.list_files(
args.input_dir, constants.METADATA_XML_SUFFIX
)
output_metadata_files = utils.list_files(
args.output_dir, constants.METADATA_XML_SUFFIX
)
if len(input_metadata_files) != len(output_metadata_files):
msg = "unequal input & output metadata file counts."
logging.warning(msg)
print(f"WARNING: {msg}")
# Compare the number of input & output plaintext files.
input_plaintext_files = utils.list_files(
args.input_dir, constants.PLAINTEXT_EXTENSION
)
output_plaintext_files = utils.list_files(
args.output_dir, constants.PLAINTEXT_EXTENSION
)
if len(input_plaintext_files) != len(output_plaintext_files):
msg = "unequal input & output plaintext file counts."
logging.warning(msg)
print(f"WARNING: {msg}")
logging.info("Processed %s metadata files.", len(output_metadata_files))
logging.info("Processed %s plaintext files.", len(output_plaintext_files))
def replace_publication_id(xml_tree: ET.ElementTree, lookup: dict) -> tuple:
"""Replace a 4-character title code with a 7-digit NLP code in an XML tree.
The XML tree structure is assumed to contain a "publication" element with
"id" attribute, and sub-element "issue" which itself has a sub-element
named "date".
Args:
xml_tree (ElementTree): An XML ElementTree.
lookup (dict): A dictionary for NLP code lookups.
Raises:
ValueError: Failed to find issue/date element in XML tree.
ValueError: Failed to find issue/date element in XML tree.
ValueError: Failed to standardise title code.
ValueError: Failed to set publication element in XML tree.
ValueError: Failed to get NLP for title code.
ValueError: Failed to set publication element in XML tree.
Returns:
tuple: The 4-character title code & the 7-digit NLP code.
"""
pub_elem = xml_tree.find(constants.PUPBLICATION_ELEMENT_NAME)
if pub_elem is None:
logging.warning("Failed to find publication element in XML tree.")
raise ValueError("Failed to find title code attribute in XML tree.")
title_code = pub_elem.attrib[constants.PUBLICATION_ID_ATTRIBUTE_NAME]
if title_code is None:
logging.warning("Failed to find title code attribute in XML tree.")
raise ValueError("Failed to find title code attribute in XML tree.")
if title_code not in lookup:
standardised_title_code = standardise_title_code(title_code, xml_tree)
if standardised_title_code is None:
logging.warning("Failed to standardise title code.")
raise ValueError("Failed to standardise title code.")
title_code = standardised_title_code
date_str = pub_elem.find(
constants.ISSUE_ELEMENT_NAME + "/" + constants.DATE_ELEMENT_NAME
)
if date_str is None:
logging.warning("Failed to find issue/date element in XML tree.")
raise ValueError("Failed to find issue/date element in XML tree.")
year, month, day = utils.parse_publicaton_date(str(date_str.text))
nlp = title_code_to_nlp(title_code, year, month, day, lookup)
if nlp is None:
logging.warning("Failed to get NLP for title code %s", title_code)
raise ValueError("Failed to get NLP for title code.")
try:
pub_elem.set(constants.PUBLICATION_ID_ATTRIBUTE_NAME, nlp)
except Exception as e: # pylint: disable=broad-exception-caught
print("Failed to set publication element in XML tree.")
print(f"ERROR: {str(e)}")
raise ValueError("Failed to set publication element in XML tree.")
return (title_code, nlp)
def standardise_title_code(
title_code: str, xml_tree: ET.ElementTree
) -> Union[str, None]:
"""Standardise a non-standard JISC title code.
Handles non-standard title codes observed in the JISC source data.
Args:
title_code (str): A non-standard JISC title code.
xml_tree (ElementTree): An XML ElementTree.
Returns: the corresponding standard title code or None if no
standardisation is available.
"""
# Handle the case of title codes of the form NCBLXXXX or BL000X.
if title_code[0:4] == "NCBL" or title_code[0:5] == "BL000":
# Extract the correct title code from the input subdirectory path.
xml_tree_obj = xml_tree.find(constants.INPUT_SUB_PATH_ELEMENT_NAME)
assert isinstance(xml_tree_obj, ET.Element)
input_sub_path_elem = str(xml_tree_obj.text)
logging.info(
"Extracted title code from subdirectory path: %s", input_sub_path_elem
)
return input_sub_path_elem[0:4]
return None
def title_code_to_nlp(
title_code: str, year: str, month: str, day: str, lookup: dict
) -> Union[str, None]:
"""Convert a 4-character title code to a 7-digit NLP code. Also supports
non-standard title codes if found in the lookup table.
Args:
title_code (str): A 4-character or JISC title code.
year (str): A publication year in YYYY format.
month (str): A publication month in MM format.
day (str): A publication day in DD format.
lookup (dict): A dictionary for NLP code lookups.
Returns: the 7-digit NLP code for the title (and date) as a string,
or None if the NLP code is not available.
"""
code_lookup = lookup.get(title_code)
if not code_lookup:
logging.warning("Title code %s not found in lookup table.", title_code)
return None
date = datetime.strptime(day + "-" + month + "-" + year, "%d-%m-%Y")
for entry in code_lookup:
date_range = entry[0]
if utils.date_in_range(date_range[0], date_range[1], date):
return entry[1]
logging.warning("Date out of range for title code %s in lookup table.", title_code)
return None
def read_title_code_lookup_file() -> dict:
"""Read the csv daa file for title code lookups.
Returns: dict: A dictionary keyed by title code. Values are pairs in which
the first element is a date range (i.e. a pair of datetime objects).
and the second element is the corresponding NLP code.
"""
# Read the title code lookup file.
rows = []
with open(constants.TITLE_CODE_LOOKUP_FILE, encoding="utf-8") as csvfile:
csvreader = csv.reader(csvfile, delimiter=constants.TITLE_CODE_LOOKUP_DELIMITER)
for row in csvreader:
rows.append(row)
ret = {} # type: Dict
# Ignore the header row.
for row in rows[1:]:
start = parse_lookup_date(row, start=True)
end = parse_lookup_date(row, start=False)
# Pad the NLP code to 7 characters.
nlp = row[constants.NLP_INDEX].strip().rjust(7, "0")
title_code = row[constants.TITLE_INDEX].strip()
element = ((start, end), nlp)
# If the title code is not already in the dictionary, add it.
if title_code not in ret:
ret[title_code] = []
ret[title_code].append(element)
return ret
def parse_lookup_date(row: list, start: bool) -> datetime:
"""Parse a date from a lookup table row.
Args:
row (list): A rom from the lookup table.
start (bool): Whether to go from the start or not.
Returns:
datetime: The parsed date.
"""
if start:
day_index = constants.START_DAY_INDEX
month_index = constants.START_MONTH_INDEX
year_index = constants.START_YEAR_INDEX
else:
day_index = constants.END_DAY_INDEX
month_index = constants.END_MONTH_INDEX
year_index = constants.END_YEAR_INDEX
# Pad the day to 2 characters.
day = row[day_index].strip().rjust(2, "0")
# Truncate the month to 3 characters
month = row[month_index].strip()[0:3]
year = row[year_index].strip()
return datetime.strptime(day + "-" + month + "-" + year, "%d-%b-%Y")
##
# Setup:
##
def parse_args() -> argparse.Namespace:
"""Parse arguments from the command line.
Returns: a Namespace object containing parsed command line arguments.
"""
parser = argparse.ArgumentParser(
description="Replace publication IDs in JISC alto2txt output"
)
parser.add_argument(
"input_dir",
help="Input directory containing JISC alto2txt output",
)
parser.add_argument(
"output_dir",
help="Output directory to which updated alto2txt output is written",
)
parser.add_argument(
"--working_dir",
type=str,
default=".",
help="Working directory to which temporary & log files are written",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Perform a dry run (don't copy any files)",
)
parser.add_argument(
"--debug",
action="store_true",
help="Run in debug mode (verbose logging)",
)
return parser.parse_args()
def initialise(args: argparse.Namespace) -> None:
"""Set up working directories and logging.
Args:
args (argparse.Namespace): Runtime parameters.
"""
print(">>> This is JISC alto2txt Wrangler <<<")
logutils.setup_logging(args, constants.NAME_LOGFILE_ALTO2TXT)
setup_directories(args)
args.input_dir = os.path.join(args.input_dir, "")
args.output_dir = os.path.join(args.output_dir, "")
logging.info("Input directory: %s", args.input_dir)
logging.info("Output directory: %s", args.output_dir)
def setup_directories(args: argparse.Namespace) -> None:
"""Set up working directories and logging.
Args:
args (argparse.Namespace): Runtime parameters.
Raises:
ValueError: If the input directory is invalid.
RuntimeError: If the output directory is not empty to start with.
"""
# Check the input directory path exists.
if not os.path.exists(args.input_dir):
raise ValueError("Please provide a valid input directory.")
# Prepare the output directory.
if not os.path.exists(args.output_dir):
Path(args.output_dir).mkdir(parents=True, exist_ok=True)
# Check the output directory is empty.
outputdir = [str(f) for f in Path(args.output_dir).rglob("*") if os.path.isfile(f)]
if len(outputdir) > 0:
raise RuntimeError("Output directory must be initially empty.")
if __name__ == "__main__":
main()