Skip to content

Commit

Permalink
Merge pull request #986 from biorack/untargeted_dev
Browse files Browse the repository at this point in the history
Handle errors in SFTP mirror with try/except and reattempt connection when reset by peer
  • Loading branch information
bkieft-usa authored Nov 15, 2024
2 parents 8fa75be + 4a83cb8 commit 7008476
Showing 1 changed file with 30 additions and 13 deletions.
43 changes: 30 additions & 13 deletions metatlas/untargeted/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ def download_fbmn_results(
if overwrite_fbmn==False and os.path.exists(graphml_filename) and os.path.exists(results_table_filename) and os.path.exists(gnps2_link_filename):
continue # Skip finished projects unless overwrite is forced
# Bail out conditions
logging.info(tab_print("Working on project %s:"%(project_name), 1))
logging.info(tab_print("Working on %s mode for project %s:"%(polarity,project_name), 1))
if row['%s_%s_status'%(tasktype,polarity_short)] == '12 not relevant':
logging.info(tab_print("Bailed out because FBMN status for %s mode is '12 not relevant'. Skipping download."%(polarity), 2))
continue
Expand Down Expand Up @@ -935,9 +935,10 @@ def mirror_mzmine_results_to_gnps2(

logging.info(tab_print("Mirroring MZmine results for %s to GNPS2..."%(project), 3))

# Suppress paramiko logs except for errors
# Suppress all paramiko logs
paramiko_logger = logging.getLogger("paramiko")
paramiko_logger.setLevel(logging.ERROR)
paramiko_logger.setLevel(logging.CRITICAL)
paramiko_logger.addHandler(logging.NullHandler())
paramiko_logger.propagate = False

project_directory = f"{project}_{polarity}"
Expand All @@ -952,8 +953,16 @@ def mirror_mzmine_results_to_gnps2(
transport.connect(username=remote_user, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
except paramiko.SSHException as e:
logging.error(f"Failed to connect to GNPS2: {e}")
return
logging.error(tab_print(f"Failed to connect to GNPS2: {e}", 4))
logging.error(tab_print("Attempting to connect again...", 5))
time.sleep(10)
try:
transport = paramiko.Transport((remote_host, remote_port))
transport.connect(username=remote_user, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
except paramiko.SSHException as e:
logging.error(tab_print(f"Failed to connect to GNPS2 again. Skipping mirror with error: {e}", 4))
return

try:
sftp.mkdir(remote_directory)
Expand Down Expand Up @@ -1015,9 +1024,10 @@ def mirror_raw_data_to_gnps2(

logging.info(tab_print(f"Mirroring raw data (mzML files) for {project} to GNPS2...", 2))

# Suppress paramiko logs except for errors
# Suppress all paramiko logs
paramiko_logger = logging.getLogger("paramiko")
paramiko_logger.setLevel(logging.ERROR)
paramiko_logger.setLevel(logging.CRITICAL)
paramiko_logger.addHandler(logging.NullHandler())
paramiko_logger.propagate = False

if raw_data_subdir is None: # This means we'll have to try to infer the locations of the mzML files from the project name
Expand Down Expand Up @@ -1064,8 +1074,16 @@ def mirror_raw_data_to_gnps2(
transport.connect(username=remote_user, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
except paramiko.SSHException as e:
logging.error(f"Failed to connect to GNPS2: {e}")
return
logging.error(tab_print(f"Failed to connect to GNPS2: {e}", 3))
logging.error(tab_print("Attempting to connect again...", 4))
time.sleep(10)
try:
transport = paramiko.Transport((remote_host, remote_port))
transport.connect(username=remote_user, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
except paramiko.SSHException as e:
logging.error(tab_print(f"Failed to connect to GNPS2 again. Skipping mirror with error: {e}", 3))
return

polarity_short = f"_{polarity[:3].upper()}_"
try:
Expand Down Expand Up @@ -1230,14 +1248,13 @@ def submit_fbmn_jobs(
if polarity_list is None:
logging.warning(tab_print("Warning! Project %s does not have a negative or a positive polarity directory. Skipping..."%(project_name), 2))
continue

logging.info(tab_print("Working on project %s:"%(project_name), 1))
for polarity in polarity_list:
polarity_short = polarity[:3]
pathname = os.path.join(row['output_dir'],'%s_%s'%(project_name,polarity))
fbmn_filename = os.path.join(pathname,'%s_%s_gnps2-fbmn-task.txt'%(project_name,polarity))

# Bail out conditions
logging.info(tab_print("Working on %s mode for project %s:"%(polarity,project_name), 1))
if row['%s_%s_status'%(tasktype,polarity_short)] == '09 error':
logging.warning(tab_print("Warning! Project %s mode has an error status. Attempting to resubmit..."%(polarity), 2))
os.remove(fbmn_filename) # Remove failed task ID file in order to submit again
Expand Down Expand Up @@ -1285,7 +1302,7 @@ def submit_fbmn_jobs(
mgf_filename = os.path.join(row['output_dir'],'%s_%s'%(project_name,polarity),'%s_%s.mgf'%(project_name,polarity))
mgf_lines = count_mgf_lines(mgf_filename)
if mgf_lines == 0:
logging.info(tab_print("Note! MGF file in %s mode has no MSMS data. Updating FBMN status to not relevant"%(polarity), 2))
logging.info(tab_print("Note! MGF file in %s mode has no MSMS data. Updating FBMN status to '12 not relevant'"%(polarity), 2))
df.loc[i,'%s_%s_status'%(tasktype,polarity_short)] = '12 not relevant'
continue
params = set_fbmn_parameters(description, quant_file, spectra_file, metadata_file, raw_data)
Expand Down Expand Up @@ -2111,5 +2128,5 @@ def update_new_untargeted_tasks(
lims_untargeted_list = []
lims_untargeted_df = pd.DataFrame(lims_untargeted_list)

logging.info(tab_print("Exported new project info for MZmine submission.", 1))
logging.info(tab_print("Exported info for %s new projects for MZmine submission."%(lims_untargeted_df.shape[0]), 1))
return lims_untargeted_df

0 comments on commit 7008476

Please sign in to comment.