From 8c21419bc95b22c32f5416aeb2f804d6bd16b087 Mon Sep 17 00:00:00 2001 From: arcangelo7 Date: Sun, 13 Oct 2024 14:21:25 +0200 Subject: [PATCH] merge: save modifications once at the end --- oc_meta/plugins/editor.py | 7 +- .../analyser/find_entities_without_prov.py | 76 +++++ oc_meta/run/merge/check_merged_brs_results.py | 266 ++++++++++++++++++ ...results.py => check_merged_ids_results.py} | 0 .../duplicated_entities_simultaneously.py | 28 +- oc_meta/run/upload/on_triplestore.py | 4 +- 6 files changed, 359 insertions(+), 22 deletions(-) create mode 100644 oc_meta/run/analyser/find_entities_without_prov.py create mode 100644 oc_meta/run/merge/check_merged_brs_results.py rename oc_meta/run/merge/{check_results.py => check_merged_ids_results.py} (100%) diff --git a/oc_meta/plugins/editor.py b/oc_meta/plugins/editor.py index 9f0387a5..cb1c6a83 100644 --- a/oc_meta/plugins/editor.py +++ b/oc_meta/plugins/editor.py @@ -122,9 +122,7 @@ def delete(self, res: str, property: str = None, object: str = None) -> None: entity_to_purge.mark_as_to_be_deleted() self.save(g_set, supplier_prefix) - def merge(self, res: URIRef, other: URIRef) -> None: - supplier_prefix = self.__get_supplier_prefix(res) - g_set = GraphSet(self.base_iri, supplier_prefix=supplier_prefix, custom_counter_handler=self.counter_handler) + def merge(self, g_set: GraphSet, res: URIRef, other: URIRef) -> None: self.reader.import_entity_from_triplestore(g_set, self.endpoint, res, self.resp_agent, enable_validation=False) self.reader.import_entity_from_triplestore(g_set, self.endpoint, other, self.resp_agent, enable_validation=False) sparql = SPARQLWrapper(endpoint=self.endpoint) @@ -156,7 +154,6 @@ def merge(self, res: URIRef, other: URIRef) -> None: res_as_entity.merge(other_as_entity, prefer_self=True) else: res_as_entity.merge(other_as_entity) - self.save(g_set, supplier_prefix) def sync_rdf_with_triplestore(self, res: str, source_uri: str = None) -> bool: supplier_prefix = self.__get_supplier_prefix(res) @@ -183,7 +180,7 @@ def sync_rdf_with_triplestore(self, res: str, source_uri: str = None) -> bool: self.save(g_set, supplier_prefix) return False - def save(self, g_set: GraphSet, supplier_prefix: str): + def save(self, g_set: GraphSet, supplier_prefix: str = ""): provset = ProvSet(g_set, self.base_iri, wanted_label=False, supplier_prefix=supplier_prefix, custom_counter_handler=self.counter_handler) provset.generate_provenance() graph_storer = Storer(g_set, dir_split=self.dir_split, n_file_item=self.n_file_item, zip_output=self.zip_output_rdf) diff --git a/oc_meta/run/analyser/find_entities_without_prov.py b/oc_meta/run/analyser/find_entities_without_prov.py new file mode 100644 index 00000000..199c30f3 --- /dev/null +++ b/oc_meta/run/analyser/find_entities_without_prov.py @@ -0,0 +1,76 @@ +import argparse +import os +import zipfile +from rdflib import Graph, URIRef, Dataset +from tqdm import tqdm + +def count_data_files(input_folder): + count = 0 + for root, dirs, files in os.walk(input_folder): + if "prov" in dirs: + dirs.remove("prov") + count += sum(1 for f in files if f.endswith('.zip')) + return count + +def check_provenance(input_folder, output_file): + total_files = count_data_files(input_folder) + entities_without_provenance = [] + + with tqdm(total=total_files, desc="Processing files") as pbar: + for root, dirs, files in os.walk(input_folder): + if "prov" in dirs: + dirs.remove("prov") + + for file in files: + if file.endswith('.zip'): + zip_path = os.path.join(root, file) + entities_to_check = set() + + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + for json_file in zip_ref.namelist(): + if json_file.endswith('.json'): + with zip_ref.open(json_file) as f: + g = Graph() + g.parse(f, format='json-ld') + + for s in g.subjects(): + if isinstance(s, URIRef): + entities_to_check.add(str(s)) + + # Costruisci il percorso del file di provenance + prov_folder = os.path.join(os.path.dirname(zip_path), 'prov') + prov_file = os.path.join(prov_folder, 'se.zip') + + if os.path.exists(prov_file): + with zipfile.ZipFile(prov_file, 'r') as prov_zip: + ds = Dataset() + for prov_json in prov_zip.namelist(): + if prov_json.endswith('.json'): + with prov_zip.open(prov_json) as prov_f: + ds.parse(prov_f, format='json-ld') + + for entity_id in entities_to_check: + prov_graph_id = f"{entity_id}/prov/" + if URIRef(prov_graph_id) not in ds.graphs(): + entities_without_provenance.append(entity_id) + else: + entities_without_provenance.extend(entities_to_check) + + pbar.update(1) + + # Salva le entità senza provenance su file + with open(output_file, 'w') as f: + for entity in entities_without_provenance: + f.write(f"{entity}\n") + +def main(): + parser = argparse.ArgumentParser(description="Controlla la provenance delle entità RDF in file ZIP JSON-LD.") + parser.add_argument("input_folder", help="Cartella di input contenente i file ZIP JSON-LD") + parser.add_argument("output_file", help="File di output per le entità senza provenance") + args = parser.parse_args() + + check_provenance(args.input_folder, args.output_file) + print(f"Entità senza provenance salvate in: {args.output_file}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/oc_meta/run/merge/check_merged_brs_results.py b/oc_meta/run/merge/check_merged_brs_results.py new file mode 100644 index 00000000..7a8d853f --- /dev/null +++ b/oc_meta/run/merge/check_merged_brs_results.py @@ -0,0 +1,266 @@ +import argparse +import csv +import os +import random +import time +import zipfile +from functools import partial +from multiprocessing import Pool, cpu_count + +import yaml +from oc_meta.plugins.editor import MetaEditor +from rdflib import RDF, ConjunctiveGraph, Literal, URIRef +from SPARQLWrapper import JSON, SPARQLWrapper +from tqdm import tqdm + +DATACITE = "http://purl.org/spar/datacite/" +FABIO = "http://purl.org/spar/fabio/" + +def read_csv(csv_file): + with open(csv_file, 'r') as f: + reader = csv.DictReader(f) + return list(reader) + +def sparql_query_with_retry(sparql, max_retries=3, initial_delay=1, backoff_factor=2): + for attempt in range(max_retries): + try: + return sparql.query().convert() + except Exception as e: + if attempt == max_retries - 1: + raise + delay = initial_delay * (backoff_factor ** attempt) + time.sleep(delay + random.uniform(0, 1)) + +def check_entity_file(file_path, entity_uri, is_surviving): + with zipfile.ZipFile(file_path, 'r') as zip_ref: + for filename in zip_ref.namelist(): + with zip_ref.open(filename) as file: + g = ConjunctiveGraph() + g.parse(file, format='json-ld') + entity = URIRef(entity_uri) + + if (entity, None, None) not in g: + if is_surviving: + tqdm.write(f"Error in file {file_path}: Surviving entity {entity_uri} does not exist") + return + + if not is_surviving: + tqdm.write(f"Error in file {file_path}: Merged entity {entity_uri} still exists") + return + + types = list(g.objects(entity, RDF.type)) + if not types: + tqdm.write(f"Error in file {file_path}: Entity {entity_uri} has no type") + elif len(types) > 2: + tqdm.write(f"Error in file {file_path}: Entity {entity_uri} has more than two types") + elif URIRef(FABIO + "Expression") not in types: + tqdm.write(f"Error in file {file_path}: Entity {entity_uri} is not a fabio:Expression") + + identifiers = list(g.objects(entity, URIRef(DATACITE + "hasIdentifier"))) + if not identifiers: + tqdm.write(f"Error in file {file_path}: Entity {entity_uri} has no datacite:hasIdentifier") + +def check_entity_sparql(sparql_endpoint, entity_uri, is_surviving): + sparql = SPARQLWrapper(sparql_endpoint) + has_issues = False + + # Query to check if the entity exists + exists_query = f""" + ASK {{ + <{entity_uri}> ?p ?o . + }} + """ + sparql.setQuery(exists_query) + sparql.setReturnFormat(JSON) + exists_results = sparql_query_with_retry(sparql) + + if exists_results['boolean']: + if not is_surviving: + tqdm.write(f"Error in SPARQL: Merged entity {entity_uri} still exists") + has_issues = True + else: + if is_surviving: + tqdm.write(f"Error in SPARQL: Surviving entity {entity_uri} does not exist") + has_issues = True + return has_issues + + if not is_surviving: + referenced_query = f""" + ASK {{ + ?s ?p <{entity_uri}> . + }} + """ + sparql.setQuery(referenced_query) + sparql.setReturnFormat(JSON) + referenced_results = sparql_query_with_retry(sparql) + + if referenced_results['boolean']: + tqdm.write(f"Error in SPARQL: Merged entity {entity_uri} is still referenced by other entities") + has_issues = True + + # Query to get entity types + types_query = f""" + SELECT ?type WHERE {{ + <{entity_uri}> a ?type . + }} + """ + sparql.setQuery(types_query) + sparql.setReturnFormat(JSON) + types_results = sparql_query_with_retry(sparql) + + types = [result['type']['value'] for result in types_results['results']['bindings']] + if not types: + tqdm.write(f"Error in SPARQL: Entity {entity_uri} has no type") + has_issues = True + elif len(types) > 2: + tqdm.write(f"Error in SPARQL: Entity {entity_uri} has more than two types") + has_issues = True + elif FABIO + "Expression" not in types: + tqdm.write(f"Error in SPARQL: Entity {entity_uri} is not a fabio:Expression") + has_issues = True + + # Query for identifiers + identifiers_query = f""" + SELECT ?identifier WHERE {{ + <{entity_uri}> <{DATACITE}hasIdentifier> ?identifier . + }} + """ + sparql.setQuery(identifiers_query) + sparql.setReturnFormat(JSON) + identifiers_results = sparql_query_with_retry(sparql) + + identifiers = [result['identifier']['value'] for result in identifiers_results['results']['bindings']] + if not identifiers: + tqdm.write(f"Error in SPARQL: Entity {entity_uri} has no datacite:hasIdentifier") + has_issues = True + + return has_issues + +def process_csv(args, csv_file): + csv_path, rdf_dir, meta_config_path, sparql_endpoint, query_output_dir = args + csv_path = os.path.join(csv_path, csv_file) + data = read_csv(csv_path) + tasks = [] + + meta_editor = MetaEditor(meta_config_path, "") + + for row in data: + if 'Done' not in row or row['Done'] != 'True': + continue + + surviving_entity = row['surviving_entity'] + merged_entities = row['merged_entities'].split('; ') + all_entities = [surviving_entity] + merged_entities + + for entity in all_entities: + file_path = meta_editor.find_file(rdf_dir, meta_editor.dir_split, meta_editor.n_file_item, entity, True) + tasks.append((entity, entity == surviving_entity, file_path, rdf_dir, meta_config_path, sparql_endpoint, query_output_dir, surviving_entity)) + + return tasks + +def process_entity(args): + entity, is_surviving, file_path, rdf_dir, meta_config_path, sparql_endpoint, query_output_dir, surviving_entity = args + + if file_path is None: + tqdm.write(f"Error: Could not find file for entity {entity}") + else: + check_entity_file(file_path, entity, is_surviving) + + # has_issues = check_entity_sparql(sparql_endpoint, entity, is_surviving) + + # if has_issues and not is_surviving: + # triples = get_entity_triples(sparql_endpoint, entity) + # combined_query = generate_update_query(entity, surviving_entity, triples) + + # # Save combined DELETE DATA and INSERT DATA query + # query_file_path = os.path.join(query_output_dir, f"update_{entity.split('/')[-1]}.sparql") + # with open(query_file_path, 'w') as f: + # f.write(combined_query) + +def get_entity_triples(sparql_endpoint, entity_uri): + sparql = SPARQLWrapper(sparql_endpoint) + query = f""" + SELECT ?g ?s ?p ?o + WHERE {{ + GRAPH ?g {{ + {{ + <{entity_uri}> ?p ?o . + BIND(<{entity_uri}> AS ?s) + }} + UNION + {{ + ?s ?p <{entity_uri}> . + BIND(<{entity_uri}> AS ?o) + }} + }} + }} + """ + sparql.setQuery(query) + sparql.setReturnFormat(JSON) + results = sparql_query_with_retry(sparql) + + triples = [] + for result in results["results"]["bindings"]: + graph = result["g"]["value"] + subject = URIRef(result["s"]["value"]) + predicate = URIRef(result["p"]["value"]) + + obj_data = result["o"] + if obj_data["type"] == "uri": + obj = URIRef(obj_data["value"]) + else: + datatype = obj_data.get("datatype") + obj = Literal(obj_data["value"], datatype=URIRef(datatype) if datatype else None) + + triples.append((graph, subject, predicate, obj)) + + return triples + +def generate_update_query(merged_entity, surviving_entity, triples): + delete_query = "DELETE DATA {\n" + insert_query = "INSERT DATA {\n" + + for graph, subject, predicate, obj in triples: + if subject == URIRef(merged_entity): + delete_query += f" GRAPH <{graph}> {{ <{subject}> <{predicate}> {obj.n3()} }}\n" + elif obj == URIRef(merged_entity): + delete_query += f" GRAPH <{graph}> {{ <{subject}> <{predicate}> <{obj}> }}\n" + insert_query += f" GRAPH <{graph}> {{ <{subject}> <{predicate}> <{surviving_entity}> }}\n" + + delete_query += "}\n" + insert_query += "}\n" + + combined_query = delete_query + "\n" + insert_query + return combined_query + +def main(): + parser = argparse.ArgumentParser(description="Check merge process success on files and SPARQL endpoint for bibliographic resources") + parser.add_argument('csv_folder', type=str, help="Path to the folder containing CSV files") + parser.add_argument('rdf_dir', type=str, help="Path to the RDF directory") + parser.add_argument('--meta_config', type=str, required=True, help="Path to meta configuration file") + parser.add_argument('--query_output', type=str, required=True, help="Path to the folder where SPARQL queries will be saved") + args = parser.parse_args() + + with open(args.meta_config, 'r') as config_file: + config = yaml.safe_load(config_file) + + sparql_endpoint = config['triplestore_url'] + + os.makedirs(args.query_output, exist_ok=True) + + csv_files = [f for f in os.listdir(args.csv_folder) if f.endswith('.csv')] + + # Process CSV files in parallel + with Pool(processes=cpu_count()) as pool: + process_csv_partial = partial(process_csv, (args.csv_folder, args.rdf_dir, args.meta_config, sparql_endpoint, args.query_output)) + all_tasks = list(tqdm(pool.imap(process_csv_partial, csv_files), total=len(csv_files), desc="Processing CSV files")) + + # Flatten the list of lists into a single list + all_tasks = [task for sublist in all_tasks for task in sublist] + + # Process entities in parallel + with Pool(processes=cpu_count()) as pool: + list(tqdm(pool.imap(process_entity, all_tasks), total=len(all_tasks), desc="Processing entities")) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/oc_meta/run/merge/check_results.py b/oc_meta/run/merge/check_merged_ids_results.py similarity index 100% rename from oc_meta/run/merge/check_results.py rename to oc_meta/run/merge/check_merged_ids_results.py diff --git a/oc_meta/run/merge/duplicated_entities_simultaneously.py b/oc_meta/run/merge/duplicated_entities_simultaneously.py index 1e0fa822..a53e0ede 100644 --- a/oc_meta/run/merge/duplicated_entities_simultaneously.py +++ b/oc_meta/run/merge/duplicated_entities_simultaneously.py @@ -1,10 +1,13 @@ import argparse +import concurrent.futures import csv +import os +from typing import List + from oc_meta.plugins.editor import MetaEditor +from oc_ocdm.graph import GraphSet from rdflib import URIRef from tqdm import tqdm -import os -import concurrent.futures def get_entity_type(entity_url): @@ -16,8 +19,7 @@ def get_entity_type(entity_url): return None return None - -def read_csv(csv_file): +def read_csv(csv_file) -> List[dict]: data = [] with open(csv_file, mode='r', newline='', encoding='utf-8') as file: csv_reader = csv.DictReader(file) @@ -27,7 +29,6 @@ def read_csv(csv_file): data.append(row) return data - def write_csv(csv_file, data): fieldnames = data[0].keys() with open(csv_file, mode='w', newline='', encoding='utf-8') as file: @@ -36,11 +37,12 @@ def write_csv(csv_file, data): for row in data: writer.writerow(row) - def process_file(csv_file, meta_config, resp_agent, entity_types, stop_file_path): data = read_csv(csv_file) meta_editor = MetaEditor(meta_config, resp_agent, save_queries=True) - count = 0 + + # Creiamo un unico GraphSet per tutte le operazioni + g_set = GraphSet(meta_editor.base_iri, custom_counter_handler=meta_editor.counter_handler) for row in data: if os.path.exists(stop_file_path): @@ -54,22 +56,18 @@ def process_file(csv_file, meta_config, resp_agent, entity_types, stop_file_path for merged_entity in merged_entities: merged_entity = merged_entity.strip() try: - meta_editor.merge(surviving_entity, URIRef(merged_entity)) + meta_editor.merge(g_set, surviving_entity, URIRef(merged_entity)) except ValueError: continue row['Done'] = 'True' - count += 1 - if count >= 10: - write_csv(csv_file, data) - count = 0 + # Salviamo le modifiche una sola volta alla fine + meta_editor.save(g_set) - if count > 0: - write_csv(csv_file, data) + write_csv(csv_file, data) return csv_file - def main(): parser = argparse.ArgumentParser(description="Merge entities from CSV files in a folder.") parser.add_argument('csv_folder', type=str, help="Path to the folder containing CSV files") diff --git a/oc_meta/run/upload/on_triplestore.py b/oc_meta/run/upload/on_triplestore.py index f01d47e3..3b649fd3 100644 --- a/oc_meta/run/upload/on_triplestore.py +++ b/oc_meta/run/upload/on_triplestore.py @@ -83,12 +83,12 @@ def split_queries(file_path, batch_size): quads_to_add, quads_to_remove = process_sparql_file(file_path) return generate_sparql_queries(quads_to_add, quads_to_remove, batch_size) -def remove_stop_file(stop_file): +def remove_stop_file(stop_file=DEFAULT_STOP_FILE): if os.path.exists(stop_file): os.remove(stop_file) print(f"Existing stop file {stop_file} has been removed.") -def upload_sparql_updates(endpoint, folder, batch_size, stop_file): +def upload_sparql_updates(endpoint, folder, batch_size, stop_file=DEFAULT_STOP_FILE): if not os.path.exists(folder): return