-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathevaluate.py
229 lines (177 loc) · 7.47 KB
/
evaluate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import re
import json
from nltk.translate.bleu_score import corpus_bleu
from nltk.translate.meteor_score import meteor_score
from rouge_score import rouge_scorer
from tqdm import tqdm
import numpy as np
from transformers import BertTokenizer, LlamaTokenizer, BertTokenizerFast
from rdkit import DataStructs
from rdkit import Chem
import selfies as sf
from rdkit.Chem import MACCSkeys
from rdkit.Chem import AllChem
from sklearn import metrics
from Levenshtein import distance as lev
import nltk
nltk.download('wordnet')
nltk.download('punkt')
nltk.download('omw-1.4')
def evaluate_caption(predictions, targets, tokenizer, text_trunc_length=512):
meteor_scores = []
references = []
hypotheses = []
for gt, out in tqdm(zip(targets, predictions)):
gt_tokens = tokenizer.tokenize(gt, truncation=True, max_length=text_trunc_length)
out_tokens = tokenizer.tokenize(out, truncation=True, max_length=text_trunc_length)
references.append([gt_tokens])
hypotheses.append(out_tokens)
try:
mscore = meteor_score([gt_tokens], out_tokens)
meteor_scores.append(mscore)
except:
continue
bleu2 = corpus_bleu(references, hypotheses, weights=(.5,.5))
bleu4 = corpus_bleu(references, hypotheses, weights=(.25,.25,.25,.25))
bleu2 *= 100
bleu4 *= 100
print('BLEU-2 score:', bleu2)
print('BLEU-4 score:', bleu4)
_meteor_score = np.mean(meteor_scores)
_meteor_score *= 100
print('Average Meteor score:', _meteor_score)
scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'])
rouge_scores = []
references = []
hypotheses = []
for gt, out in tqdm(zip(targets, predictions)):
rs = scorer.score(out, gt)
rouge_scores.append(rs)
print('ROUGE score:')
rouge_1 = np.mean([rs['rouge1'].fmeasure for rs in rouge_scores]) * 100
rouge_2 = np.mean([rs['rouge2'].fmeasure for rs in rouge_scores]) * 100
rouge_l = np.mean([rs['rougeL'].fmeasure for rs in rouge_scores]) * 100
print('rouge1:', rouge_1)
print('rouge2:', rouge_2)
print('rougeL:', rouge_l)
return bleu2, bleu4, rouge_1, rouge_2, rouge_l, _meteor_score
def evaluate_naming_description(data, tokenizer, path, task, text_trunc_length=512):
pred = [line['prediction'].strip() for line in data]
target = [line['target'].strip() for line in data]
bleu2, bleu4, rouge_1, rouge_2, rouge_l, meteor_score = \
evaluate_caption(pred, target, tokenizer, text_trunc_length)
return bleu2, bleu4, rouge_1, rouge_2, rouge_l, meteor_score
def evaluate_reaction(data, path, task):
pred_list, test_list = [], []
for d in data:
curr_pred = d['prediction'].replace(" ", "").strip()
pred_mol = Chem.MolFromSmiles(curr_pred)
curr_tgt = d['target'].replace(" ", "").strip()
tgt_mol = Chem.MolFromSmiles(curr_tgt)
canonical_pred = Chem.MolToSmiles(pred_mol, isomericSmiles=False, canonical=True) if pred_mol else None
canonical_tgt = Chem.MolToSmiles(tgt_mol, isomericSmiles=False, canonical=True) if tgt_mol else None
if canonical_tgt is None or canonical_pred is None:
continue
pred_list.append(canonical_pred)
test_list.append(canonical_tgt)
references_list = []
hypotheses_list = []
outputs_rdkit_mols = []
levs = []
num_exact = 0
for pred, test in zip(pred_list, test_list):
pred_tokens = [c for c in pred]
test_tokens = [c for c in test]
references_list.append([test_tokens])
hypotheses_list.append(pred_tokens)
try:
m_out = Chem.MolFromSmiles(pred)
m_gt = Chem.MolFromSmiles(test)
if Chem.MolToInchi(m_out) == Chem.MolToInchi(m_gt):
num_exact += 1
outputs_rdkit_mols.append((m_gt, m_out))
except:
continue
levs.append(lev(pred, test))
blue_score = corpus_bleu(references_list, hypotheses_list)
# Calculate similarities
MACCS_sims, morgan_sims, RDK_sims = [], [], []
morgan_r = 2
for gt_m, ot_m in outputs_rdkit_mols:
MACCS_sims.append(DataStructs.FingerprintSimilarity(MACCSkeys.GenMACCSKeys(gt_m),
MACCSkeys.GenMACCSKeys(ot_m)))
RDK_sims.append(DataStructs.FingerprintSimilarity(Chem.RDKFingerprint(gt_m),
Chem.RDKFingerprint(ot_m)))
morgan_sims.append(DataStructs.TanimotoSimilarity(AllChem.GetMorganFingerprint(gt_m, morgan_r),
AllChem.GetMorganFingerprint(ot_m, morgan_r)))
results = {
'BLEU': blue_score,
'Levenshtein': sum(levs)/len(levs),
'Exact Match': num_exact/len(test_list),
'MACCS Similarity': np.mean(MACCS_sims),
'RDK Similarity': np.mean(RDK_sims),
'Morgan Similarity': np.mean(morgan_sims),
}
return results
def evaluate_property(data, path, task):
pred_list = []
tgt_list = []
for d in data:
try:
pred_list.append(float(d['prediction']))
tgt_list.append(float(d['target']))
except:
continue
mae = metrics.mean_absolute_error(tgt_list, pred_list)
return mae
def evaluate_task(task, file=None, tokenizer=None):
"""Main function to evaluate a specific task
Args:
task (str): Task to evaluate ('desc', 'forward', 'retro', 'property', 'naming')
file (str): File path to the predictions
tokenizer: Tokenizer to use for text-based tasks
"""
if file is None:
file = "/all_checkpoints/temp/lightning_logs/version_0/predictions.txt"
with open(file, "r") as f:
data = [json.loads(line.strip()) for line in f.readlines()]
if task in ["desc", "naming"]:
if tokenizer is None:
raise ValueError("Tokenizer required for description and naming tasks")
return evaluate_naming_description(data, tokenizer, file, task)
elif task in ["forward", "retro"]:
return evaluate_reaction(data, file, task)
elif "prop" in task:
return evaluate_property(data, file, task)
else:
raise ValueError(f"Unknown task: {task}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Evaluate model predictions for various chemistry tasks')
parser.add_argument('--task', type=str, required=True,
choices=['desc', 'forward', 'retro', 'property', 'naming'],
help='Task to evaluate')
parser.add_argument('--path', type=str,
default="/hub_data5/jinyoungp/all_checkpoints/newLLaMo_epoch3_epoch3_ft_",
help='Base path to the model checkpoints')
args = parser.parse_args()
# Initialize tokenizer if needed for the task
tokenizer = None
if args.task in ["desc", "naming"]:
tokenizer = LlamaTokenizer.from_pretrained(
"meta-llama/Llama-2-7b-chat-hf",
use_fast=False
)
# Run evaluation
results = evaluate_task(
task=args.task,
file=args.path,
tokenizer=tokenizer
)
# Print results
print(f"\nResults for {args.task} task:")
if isinstance(results, dict):
for metric, value in results.items():
print(f"{metric}: {value:.4f}")
else:
print(results)