From e6609589dde26fefbf552f941d3a54189ec6df5b Mon Sep 17 00:00:00 2001 From: arcangelo7 Date: Sun, 9 Jun 2024 11:58:02 +0200 Subject: [PATCH] no DISTINCT --- oc_meta/lib/finder.py | 8 +-- oc_meta/run/sparql_updates.py | 62 ----------------------- oc_meta/run/upload/on_triplestore.py | 73 ++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 66 deletions(-) delete mode 100644 oc_meta/run/sparql_updates.py create mode 100644 oc_meta/run/upload/on_triplestore.py diff --git a/oc_meta/lib/finder.py b/oc_meta/lib/finder.py index 37911a68..3dac7166 100644 --- a/oc_meta/lib/finder.py +++ b/oc_meta/lib/finder.py @@ -725,7 +725,7 @@ def get_initial_subjects_from_identifiers(identifiers): escaped_identifier = literal.replace('\\', '\\\\').replace('"', '\\"') query = f''' PREFIX bds: - SELECT DISTINCT ?s WHERE {{ + SELECT ?s WHERE {{ ?literal bds:search "{escaped_identifier}" ; bds:matchAllTerms "true" ; ^<{GraphEntity.iri_has_literal_value}> ?id. @@ -751,7 +751,7 @@ def get_initial_subjects_from_identifiers(identifiers): """) union_query = " UNION ".join(union_blocks) query = f''' - SELECT DISTINCT ?s WHERE {{ + SELECT ?s WHERE {{ {union_query} }} ''' @@ -766,7 +766,7 @@ def get_initial_subjects_from_identifiers(identifiers): identifiers_values.append(f"(<{GraphEntity.DATACITE + scheme}> \"{escaped_literal}\")") identifiers_values_str = " ".join(identifiers_values) query = f''' - SELECT DISTINCT ?s WHERE {{ + SELECT ?s WHERE {{ VALUES (?scheme ?literal) {{ {identifiers_values_str} }} ?id <{GraphEntity.iri_uses_identifier_scheme}> ?scheme; <{GraphEntity.iri_has_literal_value}> ?literal; @@ -788,7 +788,7 @@ def get_initial_subjects_from_vvis(vvis): for volume, issue, venue_metaid in batch: vvi_type = GraphEntity.iri_journal_issue if issue else GraphEntity.iri_journal_volume query = f''' - SELECT DISTINCT ?s WHERE {{ + SELECT ?s WHERE {{ ?s a <{vvi_type}>; <{GraphEntity.iri_part_of}>+ <{self.base_iri}/{venue_metaid.replace("omid:", "")}>; <{GraphEntity.iri_has_sequence_identifier}> "{issue if issue else volume}". diff --git a/oc_meta/run/sparql_updates.py b/oc_meta/run/sparql_updates.py deleted file mode 100644 index 4187e710..00000000 --- a/oc_meta/run/sparql_updates.py +++ /dev/null @@ -1,62 +0,0 @@ -import argparse -import os -from SPARQLWrapper import SPARQLWrapper, POST -from tqdm import tqdm - -def execute_sparql_update(endpoint, query): - try: - sparql = SPARQLWrapper(endpoint) - sparql.setMethod(POST) - sparql.setQuery(query) - response = sparql.queryAndConvert() - return response - except Exception as e: - print(f"Error: {e}") - return False - -def split_queries(content): - queries = [] - current_query = [] - in_query = False - - lines = content.split('\n') - for line in lines: - stripped_line = line.strip() - if stripped_line.upper().startswith("INSERT DATA"): - if in_query: - queries.append("\n".join(current_query)) - current_query = [] - in_query = True - if in_query: - current_query.append(line) - - if current_query: - queries.append("\n".join(current_query)) - - return queries - -def main(): - parser = argparse.ArgumentParser(description='Execute SPARQL update queries on a triple store.') - parser.add_argument('endpoint', type=str, help='Endpoint URL of the triple store') - parser.add_argument('folder', type=str, help='Path to the folder containing SPARQL update query files') - parser.add_argument('--batch_size', type=int, default=10, help='Number of queries to include in a batch (default: 10)') - - args = parser.parse_args() - - files = [f for f in os.listdir(args.folder) if f.endswith('.txt')] - - for file in tqdm(files, desc="Processing files"): - file_path = os.path.join(args.folder, file) - with open(file_path, 'r', encoding='utf8') as query_file: - content = query_file.read() - queries = split_queries(content) # Split queries manually but structured - print(len(queries)) - # batch_size = args.batch_size - # for i in range(0, len(queries), batch_size): - # batch_queries = ";\n".join(queries[i:i + batch_size]) + ";" # Join batch of queries - # success = execute_sparql_update(args.endpoint, batch_queries) - # if not success: - # print(f"Failed to execute batch starting from query {i} in file {file}") - -if __name__ == "__main__": - main() diff --git a/oc_meta/run/upload/on_triplestore.py b/oc_meta/run/upload/on_triplestore.py new file mode 100644 index 00000000..6008a12e --- /dev/null +++ b/oc_meta/run/upload/on_triplestore.py @@ -0,0 +1,73 @@ +import argparse +import os +import json +from SPARQLWrapper import SPARQLWrapper, POST +from tqdm import tqdm + +CACHE_FILE = 'ts_upload_cache.json' + +def load_cache(): + if os.path.exists(CACHE_FILE): + with open(CACHE_FILE, 'r', encoding='utf8') as cache_file: + return set(json.load(cache_file)) + return set() + +def save_cache(processed_files): + with open(CACHE_FILE, 'w', encoding='utf8') as cache_file: + json.dump(list(processed_files), cache_file) + +def execute_sparql_update(endpoint, query): + try: + sparql = SPARQLWrapper(endpoint) + sparql.setMethod(POST) + sparql.setQuery(query) + response = sparql.queryAndConvert() + return True + except Exception as e: + print(f"Error: {e}") + return False + +def split_queries(content): + content = content + " ;" + queries = content.split(". } } ;") + queries = [query.strip() + ". } }" for query in queries if query.strip()] + return queries + +def main(): + parser = argparse.ArgumentParser(description='Execute SPARQL update queries on a triple store.') + parser.add_argument('endpoint', type=str, help='Endpoint URL of the triple store') + parser.add_argument('folder', type=str, help='Path to the folder containing SPARQL update query files') + parser.add_argument('--batch_size', type=int, default=10, help='Number of queries to include in a batch (default: 10)') + + args = parser.parse_args() + + processed_files = load_cache() + failed_files = [] + + all_files = [f for f in os.listdir(args.folder) if f.endswith('.txt')] + files_to_process = [f for f in all_files if f not in processed_files] + + for file in tqdm(files_to_process, desc="Processing files"): + file_path = os.path.join(args.folder, file) + with open(file_path, 'r', encoding='utf8') as query_file: + content = query_file.read() + queries = split_queries(content) + batch_size = args.batch_size + for i in range(0, len(queries), batch_size): + batch_queries = ";\n".join(queries[i:i + batch_size]) + success = execute_sparql_update(args.endpoint, batch_queries) + if not success: + print(f"Failed to execute batch starting from query {i} in file {file}") + failed_files.append(file) + break + else: + processed_files.add(file) + save_cache(processed_files) + + if failed_files: + print("Files with failed queries:") + for file in failed_files: + print(file) + +if __name__ == "__main__": + main()