Skip to content

Commit

Permalink
refactor: optimize triplestore upload process
Browse files Browse the repository at this point in the history
Optimize upload_sparql_updates function with:
- Add connection pooling through TriplestoreConnection singleton
- Implement hybrid JSON/Redis caching with CacheManager
- Optimize cache lookups by loading all data in memory at initialization
- Add graceful shutdown handling for cache persistence
- Add informative progress messages

The main optimizations focus on reducing database connections overhead
and improving cache performance for large-scale uploads.
  • Loading branch information
arcangelo7 committed Jan 28, 2025
1 parent 491cc1e commit 648091e
Show file tree
Hide file tree
Showing 4 changed files with 557 additions and 29 deletions.
117 changes: 117 additions & 0 deletions oc_meta/run/upload/cache_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import json
import os
import signal
import atexit
from typing import Set
import redis
from redis.exceptions import ConnectionError as RedisConnectionError

class CacheManager:
REDIS_DB = 4 # Database di default per Redis
REDIS_KEY = "processed_files" # Chiave per il set Redis

def __init__(self, json_cache_file: str, redis_host: str = 'localhost', redis_port: int = 6379):
self.json_cache_file = json_cache_file
self._redis = None
self.redis_host = redis_host
self.redis_port = redis_port
self.processed_files: Set[str] = set()

# Inizializza il cache
self._init_cache()

# Registra handlers per graceful shutdown
self._register_shutdown_handlers()

def _init_redis(self) -> None:
"""Inizializza la connessione Redis"""
try:
self._redis = redis.Redis(
host=self.redis_host,
port=self.redis_port,
db=self.REDIS_DB,
decode_responses=True # Assicura che le stringhe siano decodificate
)
self._redis.ping() # Verifica la connessione
except RedisConnectionError:
print("Warning: Redis non disponibile. Using only JSON cache.")
self._redis = None

def _init_cache(self) -> None:
"""Inizializza il cache da file JSON e Redis"""
self._init_redis()

# Carica dal file JSON
if os.path.exists(self.json_cache_file):
with open(self.json_cache_file, 'r', encoding='utf8') as f:
self.processed_files.update(json.load(f))

# Se Redis è disponibile, sincronizza
if self._redis:
# Carica i dati esistenti da Redis
existing_redis_files = self._redis.smembers(self.REDIS_KEY)
# Aggiunge i file dal JSON a Redis
if self.processed_files:
self._redis.sadd(self.REDIS_KEY, *self.processed_files)
# Aggiorna il set locale con i dati da Redis
self.processed_files.update(existing_redis_files)

def _save_to_json(self) -> None:
"""Salva il cache su file JSON"""
with open(self.json_cache_file, 'w', encoding='utf8') as f:
json.dump(list(self.processed_files), f)

def _register_shutdown_handlers(self) -> None:
"""Registra gli handler per gestire l'interruzione del processo"""
atexit.register(self._cleanup)
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)

def _signal_handler(self, signum, frame) -> None:
"""Gestisce i segnali di interruzione"""
print(f"\nRicevuto segnale di interruzione {signum}")
self._cleanup()
exit(0)

def _cleanup(self) -> None:
"""Esegue le operazioni di cleanup"""
print("\nSalvataggio cache su file...")
if self._redis:
# Aggiorna il set locale con i dati più recenti da Redis
self.processed_files.update(self._redis.smembers(self.REDIS_KEY))
self._save_to_json()
print("Cache salvato.")

def add(self, filename: str) -> None:
"""
Aggiunge un file al cache
Args:
filename (str): Nome del file da aggiungere
"""
self.processed_files.add(filename)
if self._redis:
self._redis.sadd(self.REDIS_KEY, filename)

def __contains__(self, filename: str) -> bool:
"""
Verifica se un file è nel cache
Args:
filename (str): Nome del file da verificare
Returns:
bool: True se il file è nel cache, False altrimenti
"""
return filename in self.processed_files

def get_all(self) -> Set[str]:
"""
Restituisce tutti i file nel cache
Returns:
Set[str]: Set di nomi dei file processati
"""
if self._redis:
self.processed_files.update(self._redis.smembers(self.REDIS_KEY))
return self.processed_files
87 changes: 58 additions & 29 deletions oc_meta/run/upload/on_triplestore.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,33 @@
import argparse
import json
import os
import time

from oc_meta.run.split_insert_and_delete import process_sparql_file
from SPARQLWrapper import POST, SPARQLWrapper
from oc_meta.run.upload.cache_manager import CacheManager
from oc_meta.run.upload.triplestore_connection import TriplestoreConnection
from tqdm import tqdm

CACHE_FILE = 'ts_upload_cache.json'
FAILED_QUERIES_FILE = 'failed_queries.txt'
DEFAULT_STOP_FILE = '.stop_upload'

def load_cache():
if os.path.exists(CACHE_FILE):
with open(CACHE_FILE, 'r', encoding='utf8') as cache_file:
return set(json.load(cache_file))
return set()

def save_cache(processed_files):
with open(CACHE_FILE, 'w', encoding='utf8') as cache_file:
json.dump(list(processed_files), cache_file)

def save_failed_query_file(filename):
with open(FAILED_QUERIES_FILE, 'a', encoding='utf8') as failed_file:
def save_failed_query_file(filename, failed_file):
with open(failed_file, 'a', encoding='utf8') as failed_file:
failed_file.write(f"{filename}\n")

def execute_sparql_update(endpoint, query):
attempt = 0
max_attempts = 3
wait_time = 5 # Initial wait time in seconds

connection = TriplestoreConnection(endpoint)

while attempt < max_attempts:
try:
sparql = SPARQLWrapper(endpoint)
sparql.setMethod(POST)
sparql.setQuery(query)
response = sparql.queryAndConvert()
return True
success = connection.execute_update(query)
if success:
return True
raise Exception("Query execution failed")
except Exception as e:
attempt += 1
if attempt < max_attempts:
Expand Down Expand Up @@ -88,30 +79,59 @@ def remove_stop_file(stop_file=DEFAULT_STOP_FILE):
os.remove(stop_file)
print(f"Existing stop file {stop_file} has been removed.")

def upload_sparql_updates(endpoint, folder, batch_size, stop_file=DEFAULT_STOP_FILE):
def upload_sparql_updates(
endpoint,
folder,
batch_size,
cache_file='ts_upload_cache.json',
failed_file='failed_queries.txt',
stop_file='.stop_upload'
):
"""
Carica gli aggiornamenti SPARQL sul triplestore.
Args:
endpoint (str): URL dell'endpoint SPARQL
folder (str): Cartella contenente i file SPARQL da processare
batch_size (int): Numero di triple da includere in ogni batch
cache_file (str, optional): File per il caching dei file processati. Default 'ts_upload_cache.json'
failed_file (str, optional): File per registrare le query fallite. Default 'failed_queries.txt'
stop_file (str, optional): File per interrompere il processo. Default '.stop_upload'
"""
if not os.path.exists(folder):
return
processed_files = load_cache()

cache_manager = CacheManager(cache_file)
failed_files = []

# Misura tempo scansione directory e filtraggio file
all_files = [f for f in os.listdir(folder) if f.endswith('.sparql')]
files_to_process = [f for f in all_files if f not in processed_files]
files_to_process = [f for f in all_files if f not in cache_manager]
print(f"Found {len(files_to_process)} files to process out of {len(all_files)} total files")

for file in tqdm(files_to_process, desc="Processing files"):
if os.path.exists(stop_file):
print(f"\nStop file {stop_file} detected. Interrupting the process...")
break

file_path = os.path.join(folder, file)
queries = split_queries(file_path, batch_size)

if not queries:
save_failed_query_file(file, failed_file)
continue

all_queries_successful = True

for query in queries:
success = execute_sparql_update(endpoint, query)
if not success:
save_failed_query_file(file)
save_failed_query_file(file, failed_file)
all_queries_successful = False
break
else:
processed_files.add(file)
save_cache(processed_files)

if all_queries_successful:
cache_manager.add(file)

if failed_files:
print("Files with failed queries:")
Expand All @@ -123,13 +143,22 @@ def main():
parser.add_argument('endpoint', type=str, help='Endpoint URL of the triple store')
parser.add_argument('folder', type=str, help='Path to the folder containing SPARQL update query files')
parser.add_argument('--batch_size', type=int, default=10, help='Number of quadruples to include in a batch (default: 10)')
parser.add_argument('--stop_file', type=str, default=DEFAULT_STOP_FILE, help=f'Path to the stop file (default: {DEFAULT_STOP_FILE})')
parser.add_argument('--cache_file', type=str, default='ts_upload_cache.json', help='Path to cache file')
parser.add_argument('--failed_file', type=str, default='failed_queries.txt', help='Path to failed queries file')
parser.add_argument('--stop_file', type=str, default='.stop_upload', help='Path to stop file')

args = parser.parse_args()

remove_stop_file(args.stop_file)

upload_sparql_updates(args.endpoint, args.folder, args.batch_size, args.stop_file)
upload_sparql_updates(
args.endpoint,
args.folder,
args.batch_size,
args.cache_file,
args.failed_file,
args.stop_file
)

if __name__ == "__main__":
main()
46 changes: 46 additions & 0 deletions oc_meta/run/upload/triplestore_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from SPARQLWrapper import SPARQLWrapper, POST
from typing import Optional

class TriplestoreConnection:
_instance: Optional['TriplestoreConnection'] = None
_sparql: Optional[SPARQLWrapper] = None

def __new__(cls, endpoint_url: Optional[str] = None):
if cls._instance is None:
cls._instance = super(TriplestoreConnection, cls).__new__(cls)
if endpoint_url:
cls._instance._init_connection(endpoint_url)
elif endpoint_url:
# Se viene fornito un nuovo URL e l'istanza esiste già, aggiorna la connessione
cls._instance._init_connection(endpoint_url)
return cls._instance

def _init_connection(self, endpoint_url: str) -> None:
"""Inizializza la connessione al triplestore"""
self._sparql = SPARQLWrapper(endpoint_url)
self._sparql.setMethod(POST)

@property
def sparql(self) -> SPARQLWrapper:
"""Restituisce l'istanza di SPARQLWrapper"""
if self._sparql is None:
raise RuntimeError("Connection not initialized. Provide endpoint_url when creating instance.")
return self._sparql

def execute_update(self, query: str) -> bool:
"""
Esegue una query di update sul triplestore
Args:
query (str): Query SPARQL da eseguire
Returns:
bool: True se l'esecuzione ha successo, False altrimenti
"""
try:
self.sparql.setQuery(query)
self.sparql.queryAndConvert()
return True
except Exception as e:
print(f"Error executing query: {e}")
return False
Loading

0 comments on commit 648091e

Please sign in to comment.