-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdataset_reader.py
145 lines (124 loc) · 5.95 KB
/
dataset_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import pdb
from overrides import overrides
from allennlp.data import Instance
from allennlp.data.dataset_readers import DatasetReader
from allennlp.data.fields import SpanField, ListField, TextField, MetadataField, ArrayField, SequenceLabelField, LabelField
from allennlp.data.fields import LabelField, TextField
from allennlp.data.tokenizers import Token, Tokenizer, WhitespaceTokenizer
from parameters import Params
import random
from tqdm import tqdm
from tokenizer import CustomTokenizer
import numpy as np
import glob
import re
import math
random.seed(42)
class LivedoorCorpusReader(DatasetReader):
def __init__(
self,
config,
max_tokens: int = None,
**kwargs
):
super().__init__(**kwargs)
self.custom_tokenizer_class = CustomTokenizer(config=config)
self.token_indexers = self.custom_tokenizer_class.token_indexer_returner()
self.max_tokens = max_tokens
self.config = config
self.class2id = {}
self.train_mention_ids, self.dev_mention_ids, self.test_mention_ids, self.mention_id2data = \
self.mention_ids_returner()
@overrides
def _read(self, train_dev_test_flag: str) -> list:
'''
:param train_dev_test_flag: 'train', 'dev', 'test'
:return: list of instances
'''
mention_ids, instances = list(), list()
if train_dev_test_flag == 'train':
mention_ids += self.train_mention_ids
# Because Iterator(shuffle=True) has bug, we forcefully shuffle train dataset here.
random.shuffle(mention_ids)
elif train_dev_test_flag == 'dev':
mention_ids += self.dev_mention_ids
elif train_dev_test_flag == 'test':
mention_ids += self.test_mention_ids
for idx, mention_uniq_id in tqdm(enumerate(mention_ids)):
instances.append(self.text_to_instance(mention_uniq_id,
data=self.mention_id2data[mention_uniq_id]))
return instances
@overrides
def text_to_instance(self, mention_uniq_id, data=None) -> Instance:
if mention_uniq_id == None:
tokenized = [Token('[CLS]')]
tokenized += [Token(split_token) for split_token in self.custom_tokenizer_class.tokenize(
txt=data['title'])][:self.config.max_title_length]
tokenized += [Token('[SEP]')]
context_field = TextField(tokenized, self.token_indexers)
fields = {"context": context_field}
else:
tokenized = [Token('[CLS]')]
tokenized += [Token(split_token) for split_token in self.custom_tokenizer_class.tokenize(
txt=data['title'])][:self.config.max_title_length]
tokenized += [Token('[unused0]')]
tokenized += [Token(split_token) for split_token in self.custom_tokenizer_class.tokenize(
txt=data['caption'])][:self.config.max_caption_length]
tokenized += [Token('[SEP]')]
context_field = TextField(tokenized, self.token_indexers)
fields = {"context": context_field}
fields['label'] = LabelField(data['class'])
fields['mention_uniq_id'] = ArrayField(np.array(mention_uniq_id))
return Instance(fields)
def mention_ids_returner(self):
mention_id2data = {}
train_mention_ids, dev_mention_ids, test_mention_ids = [], [], []
dataset_each_class_dirs_path = glob.glob(self.config.dataset_dir+'**/')
for each_class_dir_path in dataset_each_class_dirs_path:
label_class = re.search(r'(.+)\/', each_class_dir_path.replace(self.config.dataset_dir,'')).group(1)
if label_class not in self.class2id:
self.class2id.update({label_class: len(self.class2id)})
file_paths = self._each_class_dir_path2_txt_paths(each_class_dir_path)
# train : dev : test = 7 : 1 : 2
data_num_in_one_label = len(file_paths)
if self.config.debug:
data_num_in_one_label = data_num_in_one_label // 8
data_frac = data_num_in_one_label // 10
train_tmp_ids = [i for i in range(0, data_frac * 7)]
dev_tmp_ids = [j for j in range(data_frac * 7, data_frac * 8)]
test_tmp_ids = [k for k in range(data_frac * 8, data_num_in_one_label)]
for idx, file_path in enumerate(file_paths):
data = self._one_file_reader(label_class, file_path)
tmp_idx_for_all_data = len(mention_id2data)
mention_id2data.update({tmp_idx_for_all_data: data})
if idx in train_tmp_ids:
train_mention_ids.append(tmp_idx_for_all_data)
elif idx in dev_tmp_ids:
dev_mention_ids.append(tmp_idx_for_all_data)
elif idx in test_tmp_ids:
test_mention_ids.append(tmp_idx_for_all_data)
else:
if self.config.debug:
continue
else:
print('Error')
exit()
return train_mention_ids, dev_mention_ids, test_mention_ids, mention_id2data
def _each_class_dir_path2_txt_paths(self, each_class_dir_path):
return [path for path in glob.glob(each_class_dir_path+'*') if 'LICENSE' not in path]
def _one_file_reader(self, label_class, filepath):
data = {'class': label_class}
caption = ''
with open(filepath, 'r') as f:
for idx, line in enumerate(f):
if idx == 2:
data.update({'title': line.strip()})
if idx > 2 and idx < 10 and line.strip() != '':
caption += line.strip()
data.update({'caption': caption})
return data
if __name__ == '__main__':
params = Params()
config = params.opts
dsr = LivedoorCorpusReader(config=config)
dsr._read('train')