-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcensoror.py
237 lines (207 loc) · 10.7 KB
/
censoror.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import argparse
import glob
import os
import sys
import spacy
from spacy.matcher import Matcher
import re
from google.cloud import language_v1
# Define and parse command-line arguments for the script
def parse_arguments():
parser = argparse.ArgumentParser(description="Censor sensitive information from text files.")
parser.add_argument('--input', nargs='+', help='Glob pattern for input text files.')
parser.add_argument('--names', action='store_true', help='Flag to censor names.')
parser.add_argument('--dates', action='store_true', help='Flag to censor dates.')
parser.add_argument('--phones', action='store_true', help='Flag to censor phone numbers.')
parser.add_argument('--address', action='store_true', help='Flag to censor addresses.')
parser.add_argument('--output', type=str, help='Directory to store censored files.')
parser.add_argument('--stats', type=str, help='File or location to write the statistics.')
return parser.parse_args()
def create_matcher(nlp, censor_flags):
matcher = Matcher(nlp.vocab)# Sets up patterns to identify and censor specific types of information based on flags
# Enhancing phone number detection with multiple patterns
if censor_flags['phones']:
phone_patterns = [
[{"SHAPE": "ddd"}, {"ORTH": "-", "OP": "?"}, {"SHAPE": "ddd"}, {"ORTH": "-", "OP": "?"}, {"SHAPE": "dddd"}],
[{"SHAPE": "(ddd)"}, {"ORTH": "-", "OP": "?"}, {"SHAPE": "ddd"}, {"ORTH": "-", "OP": "?"}, {"SHAPE": "dddd"}],
[{"SHAPE": "ddd"}, {"ORTH": ".", "OP": "?"}, {"SHAPE": "ddd"}, {"ORTH": ".", "OP": "?"}, {"SHAPE": "dddd"}],
[{"SHAPE": "+"}, {"SHAPE": "dd"}, {"ORTH": " ", "OP": "?"}, {"SHAPE": "dddddddddd", "OP": "?"}],
[{"SHAPE": "dddddddddd"}] # 10 consecutive digits
]
for pattern in phone_patterns:
matcher.add("PHONE_NUMBER", [pattern])
#matcher for name
if censor_flags['names']:
name_pattern=[[{"ENT_TYPE": "PERSON"}]]
matcher.add("NAMES", name_pattern)
#matcher for dates
if censor_flags['dates']:
date_pattern = [[{"ENT_TYPE": "DATE"}]]
matcher.add("DATES", date_pattern)
# if censor_flags['address']:
# address_patttern= [[{"ENT_TYPE": {"IN": [ "GPE"]}}]]
# matcher.add("ADDRESS", address_patttern)
if censor_flags.get('address', False): # Assuming we want email address censoring under the 'address' flag for simplicity as directed by professor in class
email_pattern = [{"TEXT": {"REGEX": "[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]+"}}]
matcher.add("EMAIL", [email_pattern])
return matcher
def preprocess_text_for_phones(text,stats):
# Regex pattern to match phone numbers (simplified example)
# phone_regex = r'\+?\d[\d\s-\)\(]{10,14}\d'
phone_regex = r'\+?\d[\d\s\-\(\)]{10,14}\d'
# Find all matches in the text
matches = re.finditer(phone_regex, text)
# Replace each found phone number with censorship marks
phone_count=0
for match in matches:
start, end = match.span()
text = text[:start] + "█" * (end - start) + text[end:]
phone_count+=1
stats['PHONES'] += phone_count
return text , stats
def preprocess_text_for_dates(text,stats):
# Find and censor dates in the text
date_count = 0
date_regex = r'\b(?:\d{1,2}[-/.]\d{1,2}[-/.]\d{2,4})\b'
matches = re.finditer(date_regex, text)
for match in matches:
start, end = match.span()
text = text[:start] + "█" * (end - start) + text[end:]
date_count += 1
stats['DATES'] += date_count
return text, stats
def byte_offset_to_char_position(text, byte_offset):
# Encode the text up to the byte offset into bytes using UTF-8
# Then count the length of the encoded bytes, which gives the character position
char_position = len(text.encode('utf-8')[:byte_offset].decode('utf-8'))
return char_position
def censor_text_with_google_nlp(text, censor_flags, stats):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "files/dataengineering-project1-ddca4f2d3131.json"
# Use Google NLP to identify and censor additional sensitive information
client = language_v1.LanguageServiceClient()
document = language_v1.Document(content=text, type_=language_v1.Document.Type.PLAIN_TEXT)
response = client.analyze_entities(document=document, encoding_type='UTF8')
censored_text = list(text)
# Special handling for addresses
for entity in response.entities:
entity_type = language_v1.Entity.Type(entity.type_).name
if (entity_type == "ADDRESS" and censor_flags['address']):
for mention in entity.mentions:
# Ignore common type mentions
if mention.type == language_v1.EntityMention.Type.COMMON:
continue
# Censor the mention
start_char_pos = byte_offset_to_char_position(text, mention.text.begin_offset)
end_char_pos = byte_offset_to_char_position(text, mention.text.begin_offset + len(mention.text.content))
for i in range(start_char_pos, end_char_pos):
if i < len(censored_text): # Ensure index is within bounds
censored_text[i] = "█"
# Update stats
stats['ADDRESS'] += 1
#doing this in two pases so that first addresses are censored and then entities with location tag so that there are no overlaps and lesser false negatives
partially_censored_text = "".join(censored_text)
document = language_v1.Document(content=partially_censored_text, type_=language_v1.Document.Type.PLAIN_TEXT)
response = client.analyze_entities(document=document, encoding_type='UTF8')
for entity in response.entities:
entity_type = language_v1.Entity.Type(entity.type_).name
if (entity_type == "PERSON" and censor_flags['names']) or \
(entity_type == "LOCATION" and censor_flags['address']):
for mention in entity.mentions:
if mention.type == language_v1.EntityMention.Type.COMMON:
continue
start_char_pos = byte_offset_to_char_position(partially_censored_text, mention.text.begin_offset)
end_char_pos = byte_offset_to_char_position(partially_censored_text, mention.text.begin_offset + len(mention.text.content))
for i in range(start_char_pos, end_char_pos):
if i < len(censored_text): # Ensure index is within bounds
censored_text[i] = "█"
# Update stats
if entity_type == "PERSON":
stats['NAMES'] += 1
elif entity_type == "LOCATION":
stats['ADDRESS'] += 1
return "".join(censored_text), stats
# Applies the censorship to the identified spans within the text.
def apply_censoring(span, censored_text):
# Replace each character in the span with a block symbol to censor it.
for i in range(span.start_char, span.end_char):
censored_text[i] = "█"
# Main function to censor text based on the specified flags.
def censor_text(text, nlp, matcher, censor_flags):
# Initialize stats to keep track of what gets censored.
stats = {'NAMES': 0, 'DATES': 0, 'PHONES': 0, 'ADDRESS': 0, 'EMAIL': 0}
# Apply regex-based preprocessing for phones and dates for precision.
if censor_flags['phones']:
text, stats = preprocess_text_for_phones(text,stats)
if censor_flags['dates']:
text, stats = preprocess_text_for_dates(text,stats)
# Use Google NLP for addreses and names censoring
if censor_flags['names'] or censor_flags['address']:
text, stats = censor_text_with_google_nlp(text, censor_flags, stats)
#spacy setup
doc = nlp(text)
censored_text = list(text)
# Use spaCy's NER for names, dates
# Using matcher for phone numbers
matches = matcher(doc)
for match_id, start, end in matches:
span = doc[start:end] # The matched span
apply_censoring(span, censored_text)
if nlp.vocab.strings[match_id] in stats:
stats[nlp.vocab.strings[match_id]] += 1
censored_text= "".join(censored_text)
return "".join(censored_text), stats
# Processes each file according to the input and output specifications.
def process_file(file_path, output_dir, nlp, matcher, censor_flags):
# Open and read the content of the input file.
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
# Apply censorship to the text and get the file-specific stats.
censored_text, file_stats = censor_text(text, nlp, matcher, censor_flags) # Capture stats here
# Create the output path and write the censored text to a new file.
output_path = os.path.join(output_dir, f"{os.path.basename(file_path)}.censored")
with open(output_path, 'w', encoding='utf-8') as file:
file.write(censored_text)
# Return the stats for this file.
return file_stats
# Outputs the censorship stats to the specified destination.
def output_stats(stats, stats_destination):
# Format the stats as a string
stats_str = "\n".join(f"{key}: {value}" for key, value in stats.items())
# Print or write the stats as specified by the user.
if stats_destination in ['stderr', 'stdout']:
print(stats_str, file=sys.stderr if stats_destination == 'stderr' else sys.stdout)
else:
# when writing to a file, ensure that it is created
with open(stats_destination, 'w') as stats_file:
stats_file.write(stats_str)
def main():
# Parse command-line arguments.
args = parse_arguments()
# Load the spaCy model and create the matcher with the specified flags.
nlp = spacy.load("en_core_web_md")
matcher = create_matcher(nlp, {
'names': args.names,
'dates': args.dates,
'phones': args.phones,
'address': args.address
})
# Ensure the output directory exists.
if not os.path.exists(args.output):
os.makedirs(args.output)
total_stats = {'NAMES': 0, 'DATES': 0, 'PHONES': 0, 'ADDRESS': 0, 'EMAIL': 0}
for input_pattern in args.input:
for file_path in glob.glob(input_pattern):
# file_stats = process_file(file_path, adjusted_output_path, nlp, matcher, {
file_stats = process_file(file_path, args.output, nlp, matcher, {
'names': args.names,
'dates': args.dates,
'phones': args.phones,
'address': args.address
})
# Aggregate stats
for key in total_stats.keys():
total_stats[key] += file_stats.get(key, 0)
# Output the aggregated stats to the specified destination
output_stats(total_stats, args.stats)
if __name__ == "__main__":
main()