From ede16a7cc732bde1dca65644dc8f5cba32d6ff41 Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Thu, 12 Mar 2020 15:29:06 -0400 Subject: [PATCH 01/22] Change send_command_to_server to a private method Ideally, this is for a better security. Other class now cannot import this function. This will also increase the readability of the code, since we have to replace the lines calling this function to other method explicitly showing its function from its name. --- arc/job/ssh.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index 1a44e0b8fc..eb22b48059 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -49,11 +49,19 @@ def __init__(self, server=''): self.key = servers[server]['key'] logging.getLogger("paramiko").setLevel(logging.WARNING) - def send_command_to_server(self, command, remote_path=''): + def _send_command_to_server(self, command: str, remote_path: str='') -> (list, list): """ - Send commands to server. `command` is either a sting or an array of string commands to send. - If remote_path is not an empty string, the command will be executed in the directory path it points to. - Returns lists of stdout, stderr corresponding to the commands sent. + Send commands to server. + + Args: + command (str or list): A string or an array of string commands to send. + remote_path (str, optional): The directory path at which the command will be executed. + + Returns: + list: A list of lines of standard output stream. + + Returns: + list: A list of lines of standard error stream. """ ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) From 8d39a8f2ef7c712d86c51414d437b75dccf44c6b Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Thu, 12 Mar 2020 15:39:40 -0400 Subject: [PATCH 02/22] Add list_dir() to SSHClient --- arc/job/job.py | 2 +- arc/job/ssh.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/arc/job/job.py b/arc/job/job.py index 5d2ba4d0a9..5f7da78108 100644 --- a/arc/job/job.py +++ b/arc/job/job.py @@ -1456,7 +1456,7 @@ def _get_additional_job_info(self): elif cluster_soft == 'slurm': if self.server != 'local': ssh = SSHClient(self.server) - response = ssh.send_command_to_server(command='ls -alF', remote_path=self.remote_path) + response = ssh.list_dir(remote_path=self.remote_path) else: response = execute_command('ls -alF {0}'.format(self.local_path)) files = list() diff --git a/arc/job/ssh.py b/arc/job/ssh.py index eb22b48059..2942bcb220 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -291,6 +291,17 @@ def get_last_modified_time(self, remote_file_path): ssh.close() return datetime.datetime.fromtimestamp(timestamp) + def list_dir(self, remote_path: str = '') -> list: + """ + List directory contents. + + Args: + mode (str): The mode change to be applied, can be either octal or symbolic. + remote_path (str, optional): The directory path at which the command will be executed. + """ + command = f'ls -alF' + return self._send_command_to_server(command, remote_path)[0] + def write_file(sftp, remote_file_path, local_file_path='', file_string=''): """ From f8cbbeeae4386945801a91aac04b46aa7e1626b4 Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Thu, 12 Mar 2020 15:42:54 -0400 Subject: [PATCH 03/22] Add find_package() to SSHClient --- arc/job/ssh.py | 10 ++++++++++ arc/main.py | 21 +++++++-------------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index 2942bcb220..fa0d7568d4 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -302,6 +302,16 @@ def list_dir(self, remote_path: str = '') -> list: command = f'ls -alF' return self._send_command_to_server(command, remote_path)[0] + def find_package(self, package_name: str) -> list: + """ + Find the path to the package. + + Args: + package_name (str): The name of the package to search for. + """ + command = f'. ~/.bashrc; which {package_name}' + return self._send_command_to_server(command)[0] + def write_file(sftp, remote_file_path, local_file_path='', file_string=''): """ diff --git a/arc/main.py b/arc/main.py index 686924d390..81a5ec93a5 100644 --- a/arc/main.py +++ b/arc/main.py @@ -827,12 +827,9 @@ def determine_ess_settings(self, diagnostics=False): logger.info('\nTrying {0}'.format(server)) ssh = SSHClient(server) - cmd = '. ~/.bashrc; which g03' - g03 = ssh.send_command_to_server(cmd)[0] - cmd = '. ~/.bashrc; which g09' - g09 = ssh.send_command_to_server(cmd)[0] - cmd = '. ~/.bashrc; which g16' - g16 = ssh.send_command_to_server(cmd)[0] + g03 = ssh.find_package('g03') + g09 = ssh.find_package('g09') + g16 = ssh.find_package('g16') if g03 or g09 or g16: if diagnostics: logger.info(f' Found Gaussian on {server}: g03={g03}, g09={g09}, g16={g16}') @@ -840,8 +837,7 @@ def determine_ess_settings(self, diagnostics=False): elif diagnostics: logger.info(f' Did NOT find Gaussian on {server}') - cmd = '. ~/.bashrc; which qchem' - qchem = ssh.send_command_to_server(cmd)[0] + qchem = ssh.find_package('qchem') if qchem: if diagnostics: logger.info(f' Found QChem on {server}') @@ -849,8 +845,7 @@ def determine_ess_settings(self, diagnostics=False): elif diagnostics: logger.info(f' Did NOT find QChem on {server}') - cmd = '. ~/.bashrc; which orca' - orca = ssh.send_command_to_server(cmd)[0] + orca = ssh.find_package('orca') if orca: if diagnostics: logger.info(f' Found Orca on {server}') @@ -858,8 +853,7 @@ def determine_ess_settings(self, diagnostics=False): elif diagnostics: logger.info(f' Did NOT find Orca on {server}') - cmd = '. ~/.bashrc; which terachem' - terachem = ssh.send_command_to_server(cmd)[0] + terachem = ssh.find_package('terachem') if terachem: if diagnostics: logging.info(f' Found TeraChem on {server}') @@ -867,8 +861,7 @@ def determine_ess_settings(self, diagnostics=False): elif diagnostics: logging.info(f' Did NOT find TeraChem on {server}') - cmd = '. .bashrc; which molpro' - molpro = ssh.send_command_to_server(cmd)[0] + molpro = ssh.find_package('molpro') if molpro: if diagnostics: logger.info(f' Found Molpro on {server}') From ae9d0d1638c5e71853ee4cfa2c080f7a79f24186 Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Thu, 12 Mar 2020 15:53:09 -0400 Subject: [PATCH 04/22] Add list_available_nodes() to SSHClient This commit also changed the list_available_nodes_command. Instead of 'sinfo', the new command allows nodes information output in a better format. --- arc/job/ssh.py | 28 +++++++++++++++++++++++++++- arc/settings.py | 2 +- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index fa0d7568d4..8f0376d59f 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -19,7 +19,8 @@ from arc.common import get_logger from arc.exceptions import InputError, ServerError -from arc.settings import servers, check_status_command, submit_command, submit_filename, delete_command +from arc.settings import check_status_command, delete_command, list_available_nodes_command, \ + servers, submit_command, submit_filename logger = get_logger() @@ -312,6 +313,31 @@ def find_package(self, package_name: str) -> list: command = f'. ~/.bashrc; which {package_name}' return self._send_command_to_server(command)[0] + def list_available_nodes(self) -> list: + """ + List available nodes on the server. + + Args: + mode (str): The mode change to be applied, can be either octal or symbolic. + + Returns: + list: lines of the node hostnames. + """ + cluster_soft = servers[self.server]['cluster_soft'] + cmd = list_available_nodes_command[cluster_soft] + stdout = self._send_command_to_server(command=cmd)[0] + if cluster_soft.lower() == 'oge': + # Stdout line example: + # long1@node01.cluster BIP 0/0/8 -NA- lx24-amd64 aAdu + nodes = [line.split()[0].split('@')[1] + for line in stdout if '0/0/8' in line] + elif cluster_soft.lower() == 'slurm': + # Stdout line example: + # node01 alloc 1.00 none + nodes = [line.split()[0] for line in stdout + if line.split()[1] in ['mix', 'alloc', 'idle']] + return nodes + def write_file(sftp, remote_file_path, local_file_path='', file_string=''): """ diff --git a/arc/settings.py b/arc/settings.py index a42a6157e5..1688245752 100644 --- a/arc/settings.py +++ b/arc/settings.py @@ -102,7 +102,7 @@ 'Slurm': '/usr/bin/scancel'} list_available_nodes_command = {'OGE': 'export SGE_ROOT=/opt/sge; /opt/sge/bin/lx24-amd64/qstat -f | grep "/8 " | grep "long" | grep -v "8/8"| grep -v "aAu"', - 'Slurm': 'sinfo'} + 'Slurm': 'sinfo -o "%n %t %O %E"'} submit_filename = {'OGE': 'submit.sh', 'Slurm': 'submit.sl'} From b9e2341e26146457dc8e64355d910f7b40f689a2 Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Thu, 12 Mar 2020 16:01:47 -0400 Subject: [PATCH 05/22] Minor: simplify read_remote_file() --- arc/job/ssh.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index 8f0376d59f..d7985a72bb 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -155,14 +155,18 @@ def _download_file(self, remote_file_path, local_file_path): sftp.close() ssh.close() - def read_remote_file(self, remote_path, filename): + def read_remote_file(self, remote_file_path: str) -> list: """ - Read a remote file. `remote_path` is the remote path (required), a `filename` is also required. - Returns the file's content. + Read a remote file. + + Args: + remote_file_path (str): The remote path to be read. + + Returns: + list: A list of lines read from the file. """ sftp, ssh = self.connect() - full_path = os.path.join(remote_path, filename) - with sftp.open(full_path, 'r') as f_remote: + with sftp.open(remote_file_path, 'r') as f_remote: content = f_remote.readlines() sftp.close() ssh.close() From 7ef79c8bcc353442b0e42843652d9a3845d752d6 Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Thu, 12 Mar 2020 16:04:36 -0400 Subject: [PATCH 06/22] Clean up trsh_job_on_server and add slurm support --- arc/job/trsh.py | 84 ++++++++++++++++++++++++++----------------------- 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/arc/job/trsh.py b/arc/job/trsh.py index 6224332fab..880db5f1cf 100644 --- a/arc/job/trsh.py +++ b/arc/job/trsh.py @@ -25,7 +25,6 @@ from arc.settings import (delete_command, inconsistency_ab, inconsistency_az, - list_available_nodes_command, maximum_barrier, preserve_param_in_scan_stable, rotor_scan_resolution, @@ -1120,54 +1119,59 @@ def trsh_job_on_server(server: str, bool: Whether to re-run the job, `True` to rerun. """ server_nodes = server_nodes if server_nodes is not None else list() + cluster_soft = servers[server]['cluster_soft'] if job_server_status != 'done': logger.error(f'Job {job_name} has server status "{job_server_status}" on {server}.') # delete current server run - command = delete_command[servers[server]['cluster_soft']] + ' ' + str(job_id) if server == 'local': - execute_command(command) + cmd = delete_command[cluster_soft] + ' ' + str(job_id) + execute_command(cmd) return None, True else: ssh = SSHClient(server) - ssh.send_command_to_server(command) - - if servers[server]['cluster_soft'].lower() == 'oge': - logger.error('Troubleshooting by changing node.') - ssh = SSHClient(server) - # find available nodes - stdout = ssh.send_command_to_server(command=list_available_nodes_command[servers[server]['cluster_soft']])[0] - for line in stdout: - node = line.split()[0].split('.')[0].split('node')[1] - if servers[server]['cluster_soft'] == 'OGE' and '0/0/8' in line and node not in server_nodes: - server_nodes.append(node) - break - else: - logger.error(f'Could not find an available node on the server {server}') - # TODO: continue troubleshooting; if all else fails, put the job to sleep, - # and try again searching for a node - return None, False - - # modify the submit file - content = ssh.read_remote_file(remote_path=remote_path, - filename=submit_filename[servers[server]['cluster_soft']]) - for i, line in enumerate(content): - if '#$ -l h=node' in line: - content[i] = '#$ -l h=node{0}.cluster'.format(node) - break - else: - content.insert(7, '#$ -l h=node{0}.cluster'.format(node)) - content = ''.join(content) # convert list into a single string, not to upset paramiko - # resubmit - ssh.upload_file(remote_file_path=os.path.join(remote_path, - submit_filename[servers[server]['cluster_soft']]), file_string=content) - return node, True - - elif servers[server]['cluster_soft'].lower() == 'slurm': - # TODO: change node on Slurm - return None, True + ssh.delete_job(job_id) + + # find available node + logger.error('Troubleshooting by changing node.') + ssh = SSHClient(server) + nodes = ssh.list_available_nodes() + for node in nodes: + if node not in server_nodes: + server_nodes.append(node) + break + else: + logger.error(f'Could not find an available node on the server {server}') + # TODO: continue troubleshooting; if all else fails, put the job to sleep, + # and try again searching for a node + return None, False + + # modify the submit file + remote_submit_file = os.path.join(remote_path, submit_filename[cluster_soft]) + content = ssh.read_remote_file(remote_file_path=remote_submit_file) + if cluster_soft.lower() == 'oge': + node_assign = '#$ -l h=' + insert_line_num = 7 + elif cluster_soft.lower() == 'slurm': + node_assign = '#$BATCH -w, --nodelist=' + insert_line_num = 5 + else: + # Other software? + logger.denug(f'Unknown cluster software {cluster_soft} is encountered when ' + f'troubleshooting by changing node.') + return None, False + for i, line in enumerate(content): + if node_assign in line: + content[i] = node_assign + node + break + else: + content.insert(insert_line_num, node_assign + node) + content = ''.join(content) # convert list into a single string, not to upset paramiko - return None, False + # resubmit + ssh.upload_file(remote_file_path=os.path.join(remote_path, + submit_filename[cluster_soft]), file_string=content) + return node, True def scan_quality_check(label: str, From 8eda7c52d49b905f3300a77d2cd2298353d30441 Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Thu, 12 Mar 2020 16:13:31 -0400 Subject: [PATCH 07/22] Add change mode to SSHClient --- arc/job/job.py | 2 +- arc/job/ssh.py | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/arc/job/job.py b/arc/job/job.py index 5f7da78108..ff31913f40 100644 --- a/arc/job/job.py +++ b/arc/job/job.py @@ -1249,7 +1249,7 @@ def _upload_input_file(self): 'got: {1}'.format(up_file['name'], up_file['source'])) ssh.upload_file(remote_file_path=up_file['remote'], local_file_path=local_file_path) if up_file['make_x']: - ssh.send_command_to_server(command='chmod +x {0}'.format(up_file['name']), remote_path=self.remote_path) + ssh.change_mode(mode='+x', path=up_file['name'], remote_path=self.remote_path) self.initial_time = ssh.get_last_modified_time( remote_file_path=os.path.join(self.remote_path, submit_filename[servers[self.server]['cluster_soft']])) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index d7985a72bb..072005e2c5 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -342,6 +342,25 @@ def list_available_nodes(self) -> list: if line.split()[1] in ['mix', 'alloc', 'idle']] return nodes + def change_mode(self, + mode: str, + path: str, + recursive: bool = False, + remote_path: str = ''): + """ + Change the mode to a file or a directory. + + Args: + mode (str): The mode change to be applied, can be either octal or symbolic. + path (str): The path to the file or the directory to be changed. + recursive (bool, optional): Whether to recursively change the mode to all files + under a directory.``True`` for recursively change. + remote_path (str, optional): The directory path at which the command will be executed. + """ + recursive = '-R' if recursive else '' + command = f'chmod {recursive} {mode} {path}' + self._send_command_to_server(command, remote_path) + def write_file(sftp, remote_file_path, local_file_path='', file_string=''): """ From f9864764c307951697743cfbf3dcc24243b8e4ac Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Thu, 12 Mar 2020 16:15:18 -0400 Subject: [PATCH 08/22] Add functions to check if a file/dir exists on the remote --- arc/job/ssh.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index 072005e2c5..bc3131a0c5 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -361,6 +361,36 @@ def change_mode(self, command = f'chmod {recursive} {mode} {path}' self._send_command_to_server(command, remote_path) + def _check_file_exists(self, remote_file_path: str) -> bool: + """ + Check if a file exists on the remote server. + + Args: + remote_file_path (str): The path to the file on the remote server. + + Returs: + bool: If the file exists on the remote server. ``True`` if exist. + """ + command = f'[ -f "{remote_file_path}" ] && echo "File exists"' + stdout, _ = self._send_command_to_server(command, remote_path='') + if len(stdout): + return True + + def _check_dir_exists(self, + remote_dir_path: str) -> bool: + """ + Check if a directory exists on the remote server. + + Args: + remote_dir_path (str): The path to the directory on the remote server. + + Returns: + bool: If the directory exists on the remote server. ``True`` if exist. + """ + command = f'[ -d "{remote_dir_path}" ] && echo "Dir exists"' + stdout, _ = self._send_command_to_server(command) + if len(stdout): + return True def write_file(sftp, remote_file_path, local_file_path='', file_string=''): """ From 45ad247e29804227a658787bbef77128788d021e Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Thu, 12 Mar 2020 16:31:41 -0400 Subject: [PATCH 09/22] Improve upload_file() This function is now changed to a frame alike download_file(). Add _check_dir_exists and _create_dir, to avoid cases where upload cannot be achieved by dir issues (issue #346). Make sure each time ssh and sftp is relinked, during debuging. --- arc/job/job.py | 2 - arc/job/ssh.py | 103 +++++++++++++++++++++++++++++++++++-------------- 2 files changed, 73 insertions(+), 32 deletions(-) diff --git a/arc/job/job.py b/arc/job/job.py index ff31913f40..9f79361d0e 100644 --- a/arc/job/job.py +++ b/arc/job/job.py @@ -1229,13 +1229,11 @@ def write_input_file(self): def _upload_submit_file(self): ssh = SSHClient(self.server) - ssh.send_command_to_server(command='mkdir -p {0}'.format(self.remote_path)) remote_file_path = os.path.join(self.remote_path, submit_filename[servers[self.server]['cluster_soft']]) ssh.upload_file(remote_file_path=remote_file_path, file_string=self.submit) def _upload_input_file(self): ssh = SSHClient(self.server) - ssh.send_command_to_server(command='mkdir -p {0}'.format(self.remote_path)) if self.input is not None: remote_file_path = os.path.join(self.remote_path, input_filename[self.software]) ssh.upload_file(remote_file_path=remote_file_path, file_string=self.input) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index bc3131a0c5..e8b9415779 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -89,36 +89,88 @@ def _send_command_to_server(self, command: str, remote_path: str='') -> (list, l ssh.close() return stdout, stderr - def upload_file(self, remote_file_path, local_file_path='', file_string=''): + def upload_file(self, remote_file_path: str, local_file_path: str='', file_string: str=''): """ - Upload `local_file_path` or the contents of `file_string` to `remote_file_path`. - Either `file_string` or `local_file_path` must be given. + A modulator method of _upload_file(). Upload a local file or contents + from a string to the remote server. + + Args: + remote_file_path (str): The path to write into on the remote server. + local_file_path (str, optional): The local file path to be copied to the remote location. + file_string (str, optional): The file content to be copied and saved as the remote file. + + Raises: + InputError: If both `local_file_path` or `file_string` are invalid, + or `local_file_path` does not exists. + ServerError: If the file cannot be uploaded with maximum times to try """ + if not local_file_path and not file_string: + raise InputError('Cannot not upload file to server. Either `file_string` or `local_file_path`' + ' must be specified') if local_file_path and not os.path.isfile(local_file_path): raise InputError(f'Cannot upload a non-existing file. ' f'Check why file in path {local_file_path} is missing.') - sftp, ssh = self.connect() + i, max_times_to_try = 1, 30 success = False sleep_time = 10 # seconds - while i < max_times_to_try: + while i < max_times_to_try and not success: try: - write_file(sftp, remote_file_path, local_file_path, file_string) + self._upload_file(remote_file_path, local_file_path, file_string) + except InputError: + raise InputError(f'Cannot upload the file when the directory of the path {remote_file_path} ' + f'does not exist.') except IOError: logger.error(f'Could not upload file {local_file_path} to {self.server}!') logger.error(f'ARC is sleeping for {sleep_time * i} seconds before re-trying, ' f'please check your connectivity.') logger.info('ZZZZZ..... ZZZZZ.....') time.sleep(sleep_time * i) # in seconds + i += 1 else: success = True - i = 1000 - i += 1 if not success: raise ServerError(f'Could not write file {remote_file_path} on {self.server}. ' f'Tried {max_times_to_try} times.') - sftp.close() - ssh.close() + + def _upload_file(self, remote_file_path:str, local_file_path: str='', file_string: str='', force_upload: bool=True): + """ + Upload a file. If `file_string` is given, write it as the content of the file. + Else, if `local_file_path` is given, copy it to `remote_file_path`. + + Args: + remote_file_path (str): The path to write into on the remote server. + local_file_path (str, optional): The local file path to be copied to the remote location. + file_string (str, optional): The file content to be copied and saved as the remote file. + force_upload (bool, optional): Whether upload the file if the directory of the file does not exists. + ``True`` for make the directory and upload. + """ + sftp, ssh = self.connect() + # If the directory does not exist, open cannot create a file based on the given path + remote_dir_path = os.path.dirname(remote_file_path) + if not self._check_dir_exists(remote_file_path): + if force_upload: + self._create_dir(remote_dir_path) + else: + logger.error(f'{remote_dir_path} does not exist on {self.server}. ' + f'Cannot upload file {remote_file_path}') + raise InputError(f'Remote file path {remote_file_path} is invalid, since its ' + 'directory does not exist.') + try: + if file_string: + with sftp.open(remote_file_path, 'w') as f_remote: + f_remote.write(file_string) + else: + sftp.put(localpath=local_file_path, + remotepath=remote_file_path) + except IOError: + logger.debug( + f'Got an IOError when trying to upload file {remote_file_path} from {self.server}') + raise IOError( + f'Got an IOError when trying to upload file {remote_file_path} from {self.server}') + finally: + sftp.close() + ssh.close() def download_file(self, remote_file_path, local_file_path): """ @@ -392,27 +444,18 @@ def _check_dir_exists(self, if len(stdout): return True -def write_file(sftp, remote_file_path, local_file_path='', file_string=''): - """ - Write a file. If `file_string` is given, write it as the content of the file. - Else, if `local_file_path` is given, copy it to `remote_file_path`. + def _create_dir(self, remote_path: str): + """ + Create a new directory on the server. - Args: - sftp (paramiko's SFTP): The SFTP object. - remote_file_path (str): The path to write into on the remote server. - local_file_path (str, optional): A local file path to be copied into the remote location. - file_string (str): The file content to be copied and saved as the remote file. - """ - with sftp.open(remote_file_path, 'w') as f_remote: - if file_string: - f_remote.write(file_string) - elif local_file_path: - # with open(local_file_path, 'r') as f_local: - # f_remote.write(f_local.readlines()) - sftp.put(localpath=local_file_path, remotepath=remote_file_path) - else: - raise ValueError('Could not upload file to server. Either `file_string` or `local_file_path`' - ' must be specified') + Args: + remote_path (str): The path to the directory to create on the remote server. + """ + command = f'mkdir -p "{remote_path}"' + _, stderr = self._send_command_to_server(command) + if stderr: + raise ServerError( + f'Cannot create dir for the given path ({remote_path}).\nGot: {stderr}') def check_job_status_in_stdout(job_id, stdout, server): From 6dfc39559e29520cda95ee1ff8929d1555210dd0 Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Thu, 12 Mar 2020 16:35:19 -0400 Subject: [PATCH 10/22] Consistent usage of _send_command_to_server and minor style change --- arc/job/ssh.py | 151 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 116 insertions(+), 35 deletions(-) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index e8b9415779..08abb280b6 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -172,18 +172,34 @@ def _upload_file(self, remote_file_path:str, local_file_path: str='', file_strin sftp.close() ssh.close() - def download_file(self, remote_file_path, local_file_path): + def download_file(self, remote_file_path: str, local_file_path: str): """ - Download a file from `remote_file_path` to `local_file_path`. + A modulator function of _download_file(). Download a file from the server. + + Args: + remote_file_path (str): The remote path to be downloaded from. + local_file_path (str): The local path to be downloaded to. + + Raises: + ServerError: If the file cannot be downloaded with maximum times to try """ i, max_times_to_try = 1, 30 success = False sleep_time = 10 # seconds - while i < 30: + + if not self._check_file_exists(remote_file_path): + # Check if a file exists + # This doesn't have a real impact now to avoid screwing up ESS trsh + # but introduce an opportunity for better troubleshooting. + # The current behavior is that if the remote path does not exist + # an empty file will be created at the local path + logger.debug( + f'{remote_file_path} does not exist on {self.server}.') + + while i < max_times_to_try and not success: self._download_file(remote_file_path, local_file_path) if os.path.isfile(local_file_path): success = True - i = 1000 else: logger.error(f'Could not download file {remote_file_path} from {self.server}!') logger.error(f'ARC is sleeping for {sleep_time * i} seconds before re-trying, ' @@ -195,9 +211,16 @@ def download_file(self, remote_file_path, local_file_path): raise ServerError(f'Could not download file {remote_file_path} from {self.server}. ' f'Tried {max_times_to_try} times.') - def _download_file(self, remote_file_path, local_file_path): + def _download_file(self, remote_file_path: str, local_file_path: str): """ - Download a file from `remote_file_path` to `local_file_path`. + Download a file from the server. + + Args: + remote_file_path (str): The remote path to be downloaded from. + local_file_path (str): The local path to be downloaded to. + + Raises: + IOError: Cannot download file via sftp. """ sftp, ssh = self.connect() try: @@ -224,9 +247,16 @@ def read_remote_file(self, remote_file_path: str) -> list: ssh.close() return content - def check_job_status(self, job_id): + def check_job_status(self, job_id: int) -> str: """ - A modulator method of _check_job_status() + A modulator method of _check_job_status(). Check job's status. + + Args: + job_id (int): The job's ID. + + Returns: + str: Possible statuses: `before_submission`, `running`, `errored on node xx`, + `done`, and `connection error` """ i = 1 sleep_time = 1 # minutes @@ -242,48 +272,72 @@ def check_job_status(self, job_id): i += 1 return result - def _check_job_status(self, job_id): + def _check_job_status(self, job_id: int) -> str: """ - Possible statuses: `before_submission`, `running`, `errored on node xx`, `done` - Status line formats: - pharos: '540420 0.45326 xq1340b user_name r 10/26/2018 11:08:30 long1@node18.cluster' - rmg: '14428 debug xq1371m2 user_name R 50-04:04:46 1 node06' + Check job's status. + + Args: + job_id (int): The job's ID. + + Returns: + str: Possible statuses: `before_submission`, `running`, `errored on node xx`, + `done`, and `connection error` """ cmd = check_status_command[servers[self.server]['cluster_soft']] + ' -u $USER' - stdout, stderr = self.send_command_to_server(cmd) + stdout, stderr = self._send_command_to_server(cmd) + # Status line formats: + # OGE: '540420 0.45326 xq1340b user_name r 10/26/2018 11:08:30 long1@node18.cluster' + # SLURM: '14428 debug xq1371m2 user_name R 50-04:04:46 1 node06' if stderr: logger.info('\n\n') logger.error(f'Could not check status of job {job_id} due to {stderr}') return 'connection error' return check_job_status_in_stdout(job_id=job_id, stdout=stdout, server=self.server) - def delete_job(self, job_id): + def delete_job(self, job_id: int): """ - Deletes a running job + Deletes a running job. + + Args: + job_id (int): The job's ID. """ cmd = delete_command[servers[self.server]['cluster_soft']] + ' ' + str(job_id) - self.send_command_to_server(cmd) + self._send_command_to_server(cmd) - def check_running_jobs_ids(self): + def check_running_jobs_ids(self) -> list: """ - Return a list of ``int`` representing job IDs of all jobs submitted by the user on a server + Check all jobs submitted by the user on a server. + + Returns: + list: A list of job IDs """ running_jobs_ids = list() cmd = check_status_command[servers[self.server]['cluster_soft']] + ' -u $USER' - stdout = self.send_command_to_server(cmd)[0] + stdout = self._send_command_to_server(cmd)[0] for i, status_line in enumerate(stdout): if (servers[self.server]['cluster_soft'].lower() == 'slurm' and i > 0)\ or (servers[self.server]['cluster_soft'].lower() == 'oge' and i > 1): running_jobs_ids.append(int(status_line.split()[0])) return running_jobs_ids - def submit_job(self, remote_path): - """Submit a job""" + def submit_job(self, remote_path: str) -> (str, int): + """ + Submit a job to the server. + + Args: + remote_path (str): The remote path contains the input file and the submission script. + + Returns: + str: A string indicate the status of job submission. Either `errored` or `submitted`. + + Returns: + int: the job ID of the submitted job. + """ job_status = '' job_id = 0 - cmd = submit_command[servers[self.server]['cluster_soft']] + ' '\ - + submit_filename[servers[self.server]['cluster_soft']] - stdout, stderr = self.send_command_to_server(cmd, remote_path) + cluster_soft = servers[self.server]['cluster_soft'] + cmd = submit_command[cluster_soft] + ' ' + submit_filename[cluster_soft] + stdout, stderr = self._send_command_to_server(cmd, remote_path) if len(stderr) > 0 or len(stdout) == 0: logger.warning(f'Got stderr when submitting job:\n{stderr}') job_status = 'errored' @@ -293,23 +347,34 @@ def submit_job(self, remote_path): f'settings, such as cpus and memory, in ARC/arc/settings.py') elif 'submitted' in stdout[0].lower(): job_status = 'running' - if servers[self.server]['cluster_soft'].lower() == 'oge': + if cluster_soft.lower() == 'oge': job_id = int(stdout[0].split()[2]) - elif servers[self.server]['cluster_soft'].lower() == 'slurm': + elif cluster_soft.lower() == 'slurm': job_id = int(stdout[0].split()[3]) else: raise ValueError(f'Unrecognized cluster software {servers[self.server]["cluster_soft"]}') return job_status, job_id def connect(self): - """A helper function for calling self.try_connecting until successful""" + """ + A modulator function for _connect(). Connect to the server. + + Raises: + ServerError: Cannot connect to the server with maximum times to try + + Returns: + paramiko.sftp_client.SFTPClient + + Returns: + paramiko.SSHClient + """ times_tried = 0 - max_times_to_try = 1440 # continue trying for 24 hrs... + max_times_to_try = 1440 # continue trying for 24 hrs (24 hr * 60 min/hr)... interval = 60 # wait 60 sec between trials while times_tried < max_times_to_try: times_tried += 1 try: - sftp, ssh = self.try_connecting() + sftp, ssh = self._connect() except Exception as e: if not times_tried % 10: logger.info(f'Tried connecting to {self.server} {times_tried} times with no success...' @@ -323,8 +388,16 @@ def connect(self): time.sleep(interval) raise ServerError(f'Could not connect to server {self.server} even after {times_tried} trials.') - def try_connecting(self): - """A helper function for connecting via paramiko, returns the `sftp` and `ssh` objects""" + def _connect(self): + """ + Connect via paramiko, and open a SSH session as well as a SFTP session. + + Returns: + paramiko.sftp_client.SFTPClient + + Returns: + paramiko.SSHClient + """ ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.load_system_host_keys(filename=self.key) @@ -337,8 +410,16 @@ def try_connecting(self): sftp = ssh.open_sftp() return sftp, ssh - def get_last_modified_time(self, remote_file_path): - """returns the last modified time of `remote_file_path` in a datetime format""" + def get_last_modified_time(self, remote_file_path: str): + """ + Get the last modified time of a remote file. + + Args: + remote_file_path (str): The remote file path to check. + + Returns: + datetime.datetime: the last modified time of the file + """ sftp, ssh = self.connect() try: timestamp = sftp.stat(remote_file_path).st_mtime @@ -513,7 +594,7 @@ def delete_all_arc_jobs(server_list, jobs=None): print(f'\nDeleting {jobs_message} ARC jobs from {server}...') cmd = check_status_command[servers[server]['cluster_soft']] + ' -u $USER' ssh = SSHClient(server) - stdout = ssh.send_command_to_server(cmd)[0] + stdout = ssh._send_command_to_server(cmd)[0] for status_line in stdout: s = re.search(r' a\d+', status_line) if s is not None: From 0ac8a4fb893699291c166badf750876586916e37 Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Fri, 13 Mar 2020 12:55:09 -0400 Subject: [PATCH 11/22] Add two private attribute _ssh and _sftp Multiple ssh and sftp are opened and closed during using the SSHClient module can brought several problems: (1) increasing connection load and more sensitive to the stability of the connection. (2) sessions, for some reason are not synchronized immediately. (3) repetition of creating and closing SSH and SFTP blocks in most of the methods. Adding these two private attributes is more desirable. --- arc/job/ssh.py | 83 +++++++++++++++++++------------------------------- 1 file changed, 31 insertions(+), 52 deletions(-) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index 08abb280b6..575c816174 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -48,11 +48,13 @@ def __init__(self, server=''): self.address = servers[server]['address'] self.un = servers[server]['un'] self.key = servers[server]['key'] + self._sftp = None + self._ssh = None logging.getLogger("paramiko").setLevel(logging.WARNING) def _send_command_to_server(self, command: str, remote_path: str='') -> (list, list): """ - Send commands to server. + A wapper for exec_command in paramiko. SSHClient. Send commands to the server. Args: command (str or list): A string or an array of string commands to send. @@ -64,29 +66,27 @@ def _send_command_to_server(self, command: str, remote_path: str='') -> (list, l Returns: list: A list of lines of standard error stream. """ - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.load_system_host_keys(filename=self.key) - try: - ssh.connect(hostname=self.address, username=self.un) - except: - return '', 'paramiko failed to connect' if isinstance(command, list): command = '; '.join(command) if remote_path != '': # execute command in remote_path directory. - # Since each `.exec_command()` is a single session, `cd` has to be added to all commands. - command = f'cd {remote_path}; {command}' + # Check remote path existence, otherwise the cmd will be invalid + # and even yield different behaviors. + # Make sure to change directory back after the command is executed + if self._check_dir_exists(remote_path): + command = f'cd "{remote_path}"; {command}; cd ' + else: + raise InputError( + f'Cannot execute command at given remote_path({remote_path})') try: - _, stdout, stderr = ssh.exec_command(command) + _, stdout, stderr = self._ssh.exec_command(command) except: # SSHException: Timeout opening channel. try: # try again - _, stdout, stderr = ssh.exec_command(command) + _, stdout, stderr = self._ssh.exec_command(command) except: return '', 'ssh timed-out after two trials' stdout = stdout.readlines() stderr = stderr.readlines() - ssh.close() return stdout, stderr def upload_file(self, remote_file_path: str, local_file_path: str='', file_string: str=''): @@ -110,16 +110,18 @@ def upload_file(self, remote_file_path: str, local_file_path: str='', file_strin if local_file_path and not os.path.isfile(local_file_path): raise InputError(f'Cannot upload a non-existing file. ' f'Check why file in path {local_file_path} is missing.') + # If the directory does not exist, _upload_file cannot create a file based on the given path + remote_dir_path = os.path.dirname(remote_file_path) + if not self._check_dir_exists(remote_dir_path): + self._create_dir(remote_dir_path) i, max_times_to_try = 1, 30 success = False sleep_time = 10 # seconds while i < max_times_to_try and not success: try: - self._upload_file(remote_file_path, local_file_path, file_string) - except InputError: - raise InputError(f'Cannot upload the file when the directory of the path {remote_file_path} ' - f'does not exist.') + self._upload_file(remote_file_path, + local_file_path, file_string) except IOError: logger.error(f'Could not upload file {local_file_path} to {self.server}!') logger.error(f'ARC is sleeping for {sleep_time * i} seconds before re-trying, ' @@ -133,7 +135,7 @@ def upload_file(self, remote_file_path: str, local_file_path: str='', file_strin raise ServerError(f'Could not write file {remote_file_path} on {self.server}. ' f'Tried {max_times_to_try} times.') - def _upload_file(self, remote_file_path:str, local_file_path: str='', file_string: str='', force_upload: bool=True): + def _upload_file(self, remote_file_path: str, local_file_path: str = '', file_string: str = ''): """ Upload a file. If `file_string` is given, write it as the content of the file. Else, if `local_file_path` is given, copy it to `remote_file_path`. @@ -142,35 +144,19 @@ def _upload_file(self, remote_file_path:str, local_file_path: str='', file_strin remote_file_path (str): The path to write into on the remote server. local_file_path (str, optional): The local file path to be copied to the remote location. file_string (str, optional): The file content to be copied and saved as the remote file. - force_upload (bool, optional): Whether upload the file if the directory of the file does not exists. - ``True`` for make the directory and upload. """ - sftp, ssh = self.connect() - # If the directory does not exist, open cannot create a file based on the given path - remote_dir_path = os.path.dirname(remote_file_path) - if not self._check_dir_exists(remote_file_path): - if force_upload: - self._create_dir(remote_dir_path) - else: - logger.error(f'{remote_dir_path} does not exist on {self.server}. ' - f'Cannot upload file {remote_file_path}') - raise InputError(f'Remote file path {remote_file_path} is invalid, since its ' - 'directory does not exist.') try: if file_string: - with sftp.open(remote_file_path, 'w') as f_remote: + with self._sftp.open(remote_file_path, 'w') as f_remote: f_remote.write(file_string) else: - sftp.put(localpath=local_file_path, - remotepath=remote_file_path) + self._sftp.put(localpath=local_file_path, + remotepath=remote_file_path) except IOError: logger.debug( f'Got an IOError when trying to upload file {remote_file_path} from {self.server}') raise IOError( f'Got an IOError when trying to upload file {remote_file_path} from {self.server}') - finally: - sftp.close() - ssh.close() def download_file(self, remote_file_path: str, local_file_path: str): """ @@ -222,13 +208,12 @@ def _download_file(self, remote_file_path: str, local_file_path: str): Raises: IOError: Cannot download file via sftp. """ - sftp, ssh = self.connect() try: - sftp.get(remotepath=remote_file_path, localpath=local_file_path) + self._sftp.get(remotepath=remote_file_path, + localpath=local_file_path) except IOError: - logger.debug(f'Got an IOError when trying to download file {remote_file_path} from {self.server}') - sftp.close() - ssh.close() + logger.debug( + f'Got an IOError when trying to download file {remote_file_path} from {self.server}') def read_remote_file(self, remote_file_path: str) -> list: """ @@ -240,11 +225,8 @@ def read_remote_file(self, remote_file_path: str) -> list: Returns: list: A list of lines read from the file. """ - sftp, ssh = self.connect() - with sftp.open(remote_file_path, 'r') as f_remote: + with self._sftp.open(remote_file_path, 'r') as f_remote: content = f_remote.readlines() - sftp.close() - ssh.close() return content def check_job_status(self, job_id: int) -> str: @@ -374,7 +356,7 @@ def connect(self): while times_tried < max_times_to_try: times_tried += 1 try: - sftp, ssh = self._connect() + self._sftp, self._ssh = self._connect() except Exception as e: if not times_tried % 10: logger.info(f'Tried connecting to {self.server} {times_tried} times with no success...' @@ -384,7 +366,7 @@ def connect(self): f'\nGot: {e}') else: logger.debug(f'Successfully connected to {self.server} at the {times_tried} trial.') - return sftp, ssh + return time.sleep(interval) raise ServerError(f'Could not connect to server {self.server} even after {times_tried} trials.') @@ -420,13 +402,10 @@ def get_last_modified_time(self, remote_file_path: str): Returns: datetime.datetime: the last modified time of the file """ - sftp, ssh = self.connect() try: - timestamp = sftp.stat(remote_file_path).st_mtime + timestamp = self._sftp.stat(remote_file_path).st_mtime except IOError: return None - sftp.close() - ssh.close() return datetime.datetime.fromtimestamp(timestamp) def list_dir(self, remote_path: str = '') -> list: From abc68fa7e9680c80275b65dcac722188fa38de8c Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Fri, 13 Mar 2020 13:09:36 -0400 Subject: [PATCH 12/22] Convert SSHClient into a Context Manager like Class Converting ARC SSHClient into a Context Manager provides benefits of better handling close() function for paramiko SSHClient and SFTPClient. There are two ways of using it. Either `ssh = SSHClient; ssh.mehod(); ssh.close()`, or more commonly `with SSHClient() as ssh; ssh.method(); Using the second syntax not only results in a shorter script but also has better error handling, making sure the clients are closed, even errors are raised. --- arc/job/ssh.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index 575c816174..4886c6f436 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -52,6 +52,13 @@ def __init__(self, server=''): self._ssh = None logging.getLogger("paramiko").setLevel(logging.WARNING) + def __enter__(self): + self.connect() + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + self.close() + def _send_command_to_server(self, command: str, remote_path: str='') -> (list, list): """ A wapper for exec_command in paramiko. SSHClient. Send commands to the server. @@ -392,6 +399,16 @@ def _connect(self): sftp = ssh.open_sftp() return sftp, ssh + def close(self): + """ + Close the connection to paramiko SSHClient and SFTPClient + """ + if self._sftp is not None: + self._sftp.close() + if self._ssh is not None: + self._ssh.close() + + def get_last_modified_time(self, remote_file_path: str): """ Get the last modified time of a remote file. From 5d16b1b6f95ed5d2098fec0899441404b04f5e53 Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Fri, 13 Mar 2020 13:23:50 -0400 Subject: [PATCH 13/22] Change the usage of SSHClient to `with ... as ...` --- arc/job/job.py | 225 +++++++++++++++++++++++------------------------ arc/job/ssh.py | 28 +++--- arc/job/trsh.py | 12 +-- arc/main.py | 86 +++++++++--------- arc/scheduler.py | 4 +- 5 files changed, 175 insertions(+), 180 deletions(-) diff --git a/arc/job/job.py b/arc/job/job.py index 9f79361d0e..f9cf2d78be 100644 --- a/arc/job/job.py +++ b/arc/job/job.py @@ -1228,38 +1228,38 @@ def write_input_file(self): self._upload_check_file(local_check_file_path=self.checkfile) def _upload_submit_file(self): - ssh = SSHClient(self.server) remote_file_path = os.path.join(self.remote_path, submit_filename[servers[self.server]['cluster_soft']]) - ssh.upload_file(remote_file_path=remote_file_path, file_string=self.submit) + with SSHClient(self.server) as ssh: + ssh.upload_file(remote_file_path=remote_file_path, file_string=self.submit) def _upload_input_file(self): - ssh = SSHClient(self.server) - if self.input is not None: - remote_file_path = os.path.join(self.remote_path, input_filename[self.software]) - ssh.upload_file(remote_file_path=remote_file_path, file_string=self.input) - for up_file in self.additional_files_to_upload: - if up_file['source'] == 'path': - local_file_path = up_file['local'] - elif up_file['source'] == 'input_files': - local_file_path = input_files[up_file['local']] - else: - raise JobError('Unclear file source for {0}. Should either be "path" of "input_files", ' - 'got: {1}'.format(up_file['name'], up_file['source'])) - ssh.upload_file(remote_file_path=up_file['remote'], local_file_path=local_file_path) - if up_file['make_x']: - ssh.change_mode(mode='+x', path=up_file['name'], remote_path=self.remote_path) - self.initial_time = ssh.get_last_modified_time( - remote_file_path=os.path.join(self.remote_path, submit_filename[servers[self.server]['cluster_soft']])) + with SSHClient(self.server) as ssh: + if self.input is not None: + remote_file_path = os.path.join(self.remote_path, input_filename[self.software]) + ssh.upload_file(remote_file_path=remote_file_path, file_string=self.input) + for up_file in self.additional_files_to_upload: + if up_file['source'] == 'path': + local_file_path = up_file['local'] + elif up_file['source'] == 'input_files': + local_file_path = input_files[up_file['local']] + else: + raise JobError(f'Unclear file source for {up_file["name"]}. Should either be "path" or ' + f'"input_files", got: {up_file["source"]}') + ssh.upload_file(remote_file_path=up_file['remote'], local_file_path=local_file_path) + if up_file['make_x']: + ssh.change_mode(mode='+x', path=up_file['name'], remote_path=self.remote_path) + self.initial_time = ssh.get_last_modified_time( + remote_file_path=os.path.join(self.remote_path, submit_filename[servers[self.server]['cluster_soft']])) def _upload_check_file(self, local_check_file_path=None): if self.server != 'local': - ssh = SSHClient(self.server) remote_check_file_path = os.path.join(self.remote_path, 'check.chk') - local_check_file_path = os.path.join(self.local_path, 'check.chk') if remote_check_file_path is None\ + local_check_file_path = os.path.join(self.local_path, 'check.chk') if local_check_file_path is None\ else local_check_file_path if os.path.isfile(local_check_file_path) and self.software.lower() == 'gaussian': - ssh.upload_file(remote_file_path=remote_check_file_path, local_file_path=local_check_file_path) - logger.debug('uploading checkpoint file for {0}'.format(self.job_name)) + with SSHClient(self.server) as ssh: + ssh.upload_file(remote_file_path=remote_check_file_path, local_file_path=local_check_file_path) + logger.debug(f'uploading checkpoint file for {self.job_name}') else: # running locally, just copy the check file to the job folder new_check_file_path = os.path.join(self.local_path, 'check.chk') @@ -1269,67 +1269,67 @@ def _download_output_file(self): """ Download ESS output, orbitals check file, and the Gaussian check file, if relevant. """ - ssh = SSHClient(self.server) - - # download output file - remote_file_path = os.path.join(self.remote_path, output_filename[self.software]) - ssh.download_file(remote_file_path=remote_file_path, local_file_path=self.local_path_to_output_file) - if not os.path.isfile(self.local_path_to_output_file): - raise JobError('output file for {0} was not downloaded properly'.format(self.job_name)) - self.final_time = ssh.get_last_modified_time(remote_file_path=remote_file_path) - - # download orbitals FChk file - if self.job_type == 'orbitals': - remote_file_path = os.path.join(self.remote_path, 'input.FChk') - ssh.download_file(remote_file_path=remote_file_path, local_file_path=self.local_path_to_orbitals_file) - if not os.path.isfile(self.local_path_to_orbitals_file): - logger.warning('Orbitals FChk file for {0} was not downloaded properly ' - '(this is not the Gaussian formatted check file...)'.format(self.job_name)) - - # download Gaussian check file - if self.software.lower() == 'gaussian': - remote_check_file_path = os.path.join(self.remote_path, 'check.chk') - ssh.download_file(remote_file_path=remote_check_file_path, local_file_path=self.local_path_to_check_file) - if not os.path.isfile(self.local_path_to_check_file): - logger.warning('Gaussian check file for {0} was not downloaded properly'.format(self.job_name)) - - # download Orca .hess hessian file generated by frequency calculations - # Hessian is useful when the user would like to project rotors - if self.software.lower() == 'orca' and self.job_type == 'freq': - remote_hess_file_path = os.path.join(self.remote_path, 'input.hess') - ssh.download_file(remote_file_path=remote_hess_file_path, local_file_path=self.local_path_to_hess_file) - if not os.path.isfile(self.local_path_to_hess_file): - logger.warning(f'Orca hessian file for {self.job_name} was not downloaded properly') - - # download Lennard_Jones data file - if self.software.lower() == 'onedmin': - remote_lj_file_path = os.path.join(self.remote_path, 'lj.dat') - ssh.download_file(remote_file_path=remote_lj_file_path, local_file_path=self.local_path_to_lj_file) - if not os.path.isfile(self.local_path_to_lj_file): - logger.warning('Lennard-Jones data file for {0} was not downloaded properly'.format(self.job_name)) - - # download molpro log file (in addition to the output file) - if self.software.lower() == 'molpro': - remote_log_file_path = os.path.join(self.remote_path, 'input.log') - local_log_file_path = os.path.join(self.local_path, 'output.log') - ssh.download_file(remote_file_path=remote_log_file_path, local_file_path=local_log_file_path) - if not os.path.isfile(local_log_file_path): - logger.warning('Could not download Molpro log file for {0} ' - '(this is not the output file)'.format(self.job_name)) - - # download terachem files (in addition to the output file) - if self.software.lower() == 'terachem': - base_path = os.path.join(self.remote_path, 'scr') - filenames = ['results.dat', 'output.molden', 'charge.xls', 'charge_mull.xls', 'optlog.xls', 'optim.xyz', - 'Frequencies.dat', 'I_matrix.dat', 'Mass.weighted.modes.dat', 'moments_of_inertia.dat', - 'output.basis', 'output.geometry', 'output.molden', 'Reduced.mass.dat', 'results.dat'] - for filename in filenames: - remote_log_file_path = os.path.join(base_path, filename) - local_log_file_path = os.path.join(self.local_path, 'scr', filename) + with SSHClient(self.server) as ssh: + + # download output file + remote_file_path = os.path.join(self.remote_path, output_filename[self.software]) + ssh.download_file(remote_file_path=remote_file_path, local_file_path=self.local_path_to_output_file) + if not os.path.isfile(self.local_path_to_output_file): + raise JobError(f'output file for {self.job_name} was not downloaded properly') + self.final_time = ssh.get_last_modified_time(remote_file_path=remote_file_path) + + # download orbitals FChk file + if self.job_type == 'orbitals': + remote_file_path = os.path.join(self.remote_path, 'input.FChk') + ssh.download_file(remote_file_path=remote_file_path, local_file_path=self.local_path_to_orbitals_file) + if not os.path.isfile(self.local_path_to_orbitals_file): + logger.warning(f'Orbitals FChk file for {self.job_name} was not downloaded properly ' + f'(this is not the Gaussian formatted check file...)') + + # download Gaussian check file + if self.software.lower() == 'gaussian': + remote_check_file_path = os.path.join(self.remote_path, 'check.chk') + ssh.download_file(remote_file_path=remote_check_file_path, local_file_path=self.local_path_to_check_file) + if not os.path.isfile(self.local_path_to_check_file): + logger.warning(f'Gaussian check file for {self.job_name} was not downloaded properly') + + # download Orca .hess hessian file generated by frequency calculations + # Hessian is useful when the user would like to project rotors + if self.software.lower() == 'orca' and self.job_type == 'freq': + remote_hess_file_path = os.path.join(self.remote_path, 'input.hess') + ssh.download_file(remote_file_path=remote_hess_file_path, local_file_path=self.local_path_to_hess_file) + if not os.path.isfile(self.local_path_to_hess_file): + logger.warning(f'Orca hessian file for {self.job_name} was not downloaded properly') + + # download Lennard_Jones data file + if self.software.lower() == 'onedmin': + remote_lj_file_path = os.path.join(self.remote_path, 'lj.dat') + ssh.download_file(remote_file_path=remote_lj_file_path, local_file_path=self.local_path_to_lj_file) + if not os.path.isfile(self.local_path_to_lj_file): + logger.warning(f'Lennard-Jones data file for {self.job_name} was not downloaded properly') + + # download molpro log file (in addition to the output file) + if self.software.lower() == 'molpro': + remote_log_file_path = os.path.join(self.remote_path, 'input.log') + local_log_file_path = os.path.join(self.local_path, 'output.log') ssh.download_file(remote_file_path=remote_log_file_path, local_file_path=local_log_file_path) - xyz_path = os.path.join(base_path, 'optim.xyz') - if os.path.isfile(xyz_path): - self.local_path_to_xyz = xyz_path + if not os.path.isfile(local_log_file_path): + logger.warning(f'Could not download Molpro log file for {self.job_name} ' \ + f'(this is not the output file)') + + # download terachem files (in addition to the output file) + if self.software.lower() == 'terachem': + base_path = os.path.join(self.remote_path, 'scr') + filenames = ['results.dat', 'output.molden', 'charge.xls', 'charge_mull.xls', 'optlog.xls', 'optim.xyz', + 'Frequencies.dat', 'I_matrix.dat', 'Mass.weighted.modes.dat', 'moments_of_inertia.dat', + 'output.basis', 'output.geometry', 'output.molden', 'Reduced.mass.dat', 'results.dat'] + for filename in filenames: + remote_log_file_path = os.path.join(base_path, filename) + local_log_file_path = os.path.join(self.local_path, 'scr', filename) + ssh.download_file(remote_file_path=remote_log_file_path, local_file_path=local_log_file_path) + xyz_path = os.path.join(base_path, 'optim.xyz') + if os.path.isfile(xyz_path): + self.local_path_to_xyz = xyz_path def run(self): """ @@ -1350,15 +1350,9 @@ def run(self): logger.debug('writing input file...') self.write_input_file() if self.server != 'local': - ssh = SSHClient(self.server) logger.debug('submitting job...') # submit_job returns job server status and job server id - try: - self.job_status[0], self.job_id = ssh.submit_job(remote_path=self.remote_path) - except IndexError: - # if the connection broke, the files might not have been uploaded correctly - self.write_submit_script() - self.write_input_file() + with SSHClient(self.server) as ssh: self.job_status[0], self.job_id = ssh.submit_job(remote_path=self.remote_path) else: # running locally @@ -1370,9 +1364,9 @@ def delete(self): """ logger.debug('Deleting job {name} for {label}'.format(name=self.job_name, label=self.species_name)) if self.server != 'local': - ssh = SSHClient(self.server) - logger.debug('deleting job on {0}...'.format(self.server)) - ssh.delete_job(self.job_id) + logger.debug(f'deleting job on {self.server}...') + with SSHClient(self.server) as ssh: + ssh.delete_job(self.job_id) else: logger.debug('deleting job locally...') delete_job(job_id=self.job_id) @@ -1419,7 +1413,6 @@ def _get_additional_job_info(self): """ Download the additional information of stdout and stderr from the server. """ - ssh = None lines1, lines2 = list(), list() content = '' cluster_soft = servers[self.server]['cluster_soft'].lower() @@ -1427,21 +1420,20 @@ def _get_additional_job_info(self): local_file_path1 = os.path.join(self.local_path, 'out.txt') local_file_path2 = os.path.join(self.local_path, 'err.txt') if self.server != 'local': - ssh = SSHClient(self.server) remote_file_path = os.path.join(self.remote_path, 'out.txt') - try: - ssh.download_file(remote_file_path=remote_file_path, local_file_path=local_file_path1) - except (TypeError, IOError) as e: - logger.warning('Got the following error when trying to download out.txt for {0}:'.format( - self.job_name)) - logger.warning(e) - remote_file_path = os.path.join(self.remote_path, 'err.txt') - try: - ssh.download_file(remote_file_path=remote_file_path, local_file_path=local_file_path2) - except (TypeError, IOError) as e: - logger.warning('Got the following error when trying to download err.txt for {0}:'.format( - self.job_name)) - logger.warning(e) + with SSHClient(self.server) as ssh: + try: + ssh.download_file(remote_file_path=remote_file_path, + local_file_path=local_file_path1) + except (TypeError, IOError) as e: + logger.warning(f'Got the following error when trying to download out.txt for {self.job_name}:') + logger.warning(e) + remote_file_path = os.path.join(self.remote_path, 'err.txt') + try: + ssh.download_file(remote_file_path=remote_file_path, local_file_path=local_file_path2) + except (TypeError, IOError) as e: + logger.warning(f'Got the following error when trying to download err.txt for {self.job_name}:') + logger.warning(e) if os.path.isfile(local_file_path1): with open(local_file_path1, 'r') as f: lines1 = f.readlines() @@ -1453,8 +1445,8 @@ def _get_additional_job_info(self): content += ''.join([line for line in lines2]) elif cluster_soft == 'slurm': if self.server != 'local': - ssh = SSHClient(self.server) - response = ssh.list_dir(remote_path=self.remote_path) + with SSHClient(self.server) as ssh: + response = ssh.list_dir(remote_path=self.remote_path) else: response = execute_command('ls -alF {0}'.format(self.local_path)) files = list() @@ -1466,11 +1458,12 @@ def _get_additional_job_info(self): if self.server != 'local': remote_file_path = os.path.join(self.remote_path, file_name) try: - ssh.download_file(remote_file_path=remote_file_path, local_file_path=local_file_path) + with SSHClient(self.server) as ssh: + ssh.download_file(remote_file_path=remote_file_path, + local_file_path=local_file_path) except (TypeError, IOError) as e: - logger.warning('Got the following error when trying to download {0} for {1}:'.format( - file_name, self.job_name)) - logger.warning(e) + logger.warning(f'Got the following error when trying to download {file_name} ' \ + f'for {self.job_name}: {e}') if os.path.isfile(local_file_path): with open(local_file_path, 'r') as f: lines1 = f.readlines() @@ -1483,8 +1476,8 @@ def _check_job_server_status(self): Possible statuses: `initializing`, `running`, `errored on node xx`, `done`. """ if self.server != 'local': - ssh = SSHClient(self.server) - return ssh.check_job_status(self.job_id) + with SSHClient(self.server) as ssh: + return ssh.check_job_status(self.job_id) else: return check_job_status(self.job_id) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index 4886c6f436..f6c881e9eb 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -589,19 +589,19 @@ def delete_all_arc_jobs(server_list, jobs=None): jobs_message = f'{len(jobs)}' if jobs is not None else 'all' print(f'\nDeleting {jobs_message} ARC jobs from {server}...') cmd = check_status_command[servers[server]['cluster_soft']] + ' -u $USER' - ssh = SSHClient(server) - stdout = ssh._send_command_to_server(cmd)[0] - for status_line in stdout: - s = re.search(r' a\d+', status_line) - if s is not None: - job_id = s.group()[1:] - if job_id in jobs or jobs is None: - if servers[server]['cluster_soft'].lower() == 'slurm': - server_job_id = status_line.split()[0] - ssh.delete_job(server_job_id) - print(f'deleted job {job_id} ({server_job_id} on server)') - elif servers[server]['cluster_soft'].lower() == 'oge': - ssh.delete_job(job_id) - print(f'deleted job {job_id}') + with SSHClient(server) as ssh: + stdout = ssh._send_command_to_server(cmd)[0] + for status_line in stdout: + s = re.search(r' a\d+', status_line) + if s is not None: + job_id = s.group()[1:] + if job_id in jobs or jobs is None: + if servers[server]['cluster_soft'].lower() == 'slurm': + server_job_id = status_line.split()[0] + ssh.delete_job(server_job_id) + print(f'deleted job {job_id} ({server_job_id} on server)') + elif servers[server]['cluster_soft'].lower() == 'oge': + ssh.delete_job(job_id) + print(f'deleted job {job_id}') if server_list: print('\ndone.') diff --git a/arc/job/trsh.py b/arc/job/trsh.py index 880db5f1cf..df71da3d65 100644 --- a/arc/job/trsh.py +++ b/arc/job/trsh.py @@ -1129,8 +1129,8 @@ def trsh_job_on_server(server: str, execute_command(cmd) return None, True else: - ssh = SSHClient(server) - ssh.delete_job(job_id) + with SSHClient(server) as ssh: + ssh.delete_job(job_id) # find available node logger.error('Troubleshooting by changing node.') @@ -1148,7 +1148,8 @@ def trsh_job_on_server(server: str, # modify the submit file remote_submit_file = os.path.join(remote_path, submit_filename[cluster_soft]) - content = ssh.read_remote_file(remote_file_path=remote_submit_file) + with SSHClient(server) as ssh: + content = ssh.read_remote_file(remote_file_path=remote_submit_file) if cluster_soft.lower() == 'oge': node_assign = '#$ -l h=' insert_line_num = 7 @@ -1169,8 +1170,9 @@ def trsh_job_on_server(server: str, content = ''.join(content) # convert list into a single string, not to upset paramiko # resubmit - ssh.upload_file(remote_file_path=os.path.join(remote_path, - submit_filename[cluster_soft]), file_string=content) + with SSHClient(server) as ssh: + ssh.upload_file(remote_file_path=os.path.join(remote_path, + submit_filename[cluster_soft]), file_string=content) return node, True diff --git a/arc/main.py b/arc/main.py index 81a5ec93a5..417fdd5ead 100644 --- a/arc/main.py +++ b/arc/main.py @@ -825,49 +825,49 @@ def determine_ess_settings(self, diagnostics=False): continue if diagnostics: logger.info('\nTrying {0}'.format(server)) - ssh = SSHClient(server) - - g03 = ssh.find_package('g03') - g09 = ssh.find_package('g09') - g16 = ssh.find_package('g16') - if g03 or g09 or g16: - if diagnostics: - logger.info(f' Found Gaussian on {server}: g03={g03}, g09={g09}, g16={g16}') - self.ess_settings['gaussian'].append(server) - elif diagnostics: - logger.info(f' Did NOT find Gaussian on {server}') - - qchem = ssh.find_package('qchem') - if qchem: - if diagnostics: - logger.info(f' Found QChem on {server}') - self.ess_settings['qchem'].append(server) - elif diagnostics: - logger.info(f' Did NOT find QChem on {server}') - - orca = ssh.find_package('orca') - if orca: - if diagnostics: - logger.info(f' Found Orca on {server}') - self.ess_settings['orca'].append(server) - elif diagnostics: - logger.info(f' Did NOT find Orca on {server}') - - terachem = ssh.find_package('terachem') - if terachem: - if diagnostics: - logging.info(f' Found TeraChem on {server}') - self.ess_settings['terachem'].append(server) - elif diagnostics: - logging.info(f' Did NOT find TeraChem on {server}') - - molpro = ssh.find_package('molpro') - if molpro: - if diagnostics: - logger.info(f' Found Molpro on {server}') - self.ess_settings['molpro'].append(server) - elif diagnostics: - logger.info(f' Did NOT find Molpro on {server}') + with SSHClient(server) as ssh: + + g03 = ssh.find_package('g03') + g09 = ssh.find_package('g09') + g16 = ssh.find_package('g16') + if g03 or g09 or g16: + if diagnostics: + logger.info(f' Found Gaussian on {server}: g03={g03}, g09={g09}, g16={g16}') + self.ess_settings['gaussian'].append(server) + elif diagnostics: + logger.info(f' Did NOT find Gaussian on {server}') + + qchem = ssh.find_package('qchem') + if qchem: + if diagnostics: + logger.info(f' Found QChem on {server}') + self.ess_settings['qchem'].append(server) + elif diagnostics: + logger.info(f' Did NOT find QChem on {server}') + + orca = ssh.find_package('orca') + if orca: + if diagnostics: + logger.info(f' Found Orca on {server}') + self.ess_settings['orca'].append(server) + elif diagnostics: + logger.info(f' Did NOT find Orca on {server}') + + terachem = ssh.find_package('terachem') + if terachem: + if diagnostics: + logging.info(f' Found TeraChem on {server}') + self.ess_settings['terachem'].append(server) + elif diagnostics: + logging.info(f' Did NOT find TeraChem on {server}') + + molpro = ssh.find_package('molpro') + if molpro: + if diagnostics: + logger.info(f' Found Molpro on {server}') + self.ess_settings['molpro'].append(server) + elif diagnostics: + logger.info(f' Did NOT find Molpro on {server}') if diagnostics: logger.info('\n\n') if 'gaussian' in self.ess_settings.keys(): diff --git a/arc/scheduler.py b/arc/scheduler.py index 095056a9a2..84d790d0a7 100644 --- a/arc/scheduler.py +++ b/arc/scheduler.py @@ -2511,8 +2511,8 @@ def get_servers_jobs_ids(self): self.servers_jobs_ids = list() for server in self.servers: if server != 'local': - ssh = SSHClient(server) - self.servers_jobs_ids.extend(ssh.check_running_jobs_ids()) + with SSHClient(server) as ssh: + self.servers_jobs_ids.extend(ssh.check_running_jobs_ids()) else: self.servers_jobs_ids.extend(check_running_jobs_ids()) From 211446c66298efd686feb19572ba008f5c2cae3c Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Fri, 13 Mar 2020 13:33:32 -0400 Subject: [PATCH 14/22] Add a decorator to check the connections By using a decorator, we can check the ssh and sftp connections before running the function. If the connection is invalid, then the decorator will also try to reconnect, and only execute the function when successfully connected. For efficiency, since the connection will be checked when calling `_send_command_to_server()`, those methods which are based on this method need not to be double checked. --- arc/job/ssh.py | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index f6c881e9eb..14950f980b 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -26,6 +26,32 @@ logger = get_logger() +def check_connections(function: Callable[..., Any]) -> Callable[..., Any]: + """ + A decorator designned for ``SSHClient``to check SSH connections before + calling a method. It first checks if ``self._ssh`` is available in a + SSHClient instance and then checks if you can send ``ls`` and get response + to make sure your connection still alive. If connection is bad, this + decorator will reconnect the SSH channel, to avoid connection related + error when executing the method. + """ + def decorator(*args, **kwargs) -> Any: + self = args[0] + if self._ssh is None: # not sure if some status may cause False + self._sftp, self._ssh = self.connect() + # test connection, reference: + # https://stackoverflow.com/questions/ + # 20147902/how-to-know-if-a-paramiko-ssh-channel-is-disconnected + # According to author, maybe no better way + try: + self._ssh.exec_command('ls') + except Exception as e: + logger.debug(f'The connection is no longer valid. {e}') + self.connect() + return function(*args, **kwargs) + return decorator + + class SSHClient(object): """ This is a class for communicating with remote servers via SSH. @@ -59,7 +85,8 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, exc_traceback): self.close() - def _send_command_to_server(self, command: str, remote_path: str='') -> (list, list): + @check_connections + def _send_command_to_server(self, command: str, remote_path: str= '') -> (list, list): """ A wapper for exec_command in paramiko. SSHClient. Send commands to the server. @@ -142,6 +169,7 @@ def upload_file(self, remote_file_path: str, local_file_path: str='', file_strin raise ServerError(f'Could not write file {remote_file_path} on {self.server}. ' f'Tried {max_times_to_try} times.') + @check_connections def _upload_file(self, remote_file_path: str, local_file_path: str = '', file_string: str = ''): """ Upload a file. If `file_string` is given, write it as the content of the file. @@ -204,6 +232,7 @@ def download_file(self, remote_file_path: str, local_file_path: str): raise ServerError(f'Could not download file {remote_file_path} from {self.server}. ' f'Tried {max_times_to_try} times.') + @check_connections def _download_file(self, remote_file_path: str, local_file_path: str): """ Download a file from the server. @@ -222,6 +251,7 @@ def _download_file(self, remote_file_path: str, local_file_path: str): logger.debug( f'Got an IOError when trying to download file {remote_file_path} from {self.server}') + @check_connections def read_remote_file(self, remote_file_path: str) -> list: """ Read a remote file. @@ -408,7 +438,7 @@ def close(self): if self._ssh is not None: self._ssh.close() - + @check_connections def get_last_modified_time(self, remote_file_path: str): """ Get the last modified time of a remote file. From 2a443b46bd06cb0984ad791ad1e4497216d7205a Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Fri, 13 Mar 2020 14:29:31 -0400 Subject: [PATCH 15/22] Simplify functions with hard-coded connection checks Upload_file, download_file and check_job_status are simplified due to the use of the decorator. We still log the abnormals, but more certain it is less likely due to the connection issue. --- arc/job/ssh.py | 110 ++++++------------------------------------------- 1 file changed, 12 insertions(+), 98 deletions(-) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index 14950f980b..e29b9ff3b3 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -123,7 +123,7 @@ def _send_command_to_server(self, command: str, remote_path: str= '') -> (list, stderr = stderr.readlines() return stdout, stderr - def upload_file(self, remote_file_path: str, local_file_path: str='', file_string: str=''): + def upload_file(self, remote_file_path: str, local_file_path: str = '', file_string: str = ''): """ A modulator method of _upload_file(). Upload a local file or contents from a string to the remote server. @@ -149,37 +149,6 @@ def upload_file(self, remote_file_path: str, local_file_path: str='', file_strin if not self._check_dir_exists(remote_dir_path): self._create_dir(remote_dir_path) - i, max_times_to_try = 1, 30 - success = False - sleep_time = 10 # seconds - while i < max_times_to_try and not success: - try: - self._upload_file(remote_file_path, - local_file_path, file_string) - except IOError: - logger.error(f'Could not upload file {local_file_path} to {self.server}!') - logger.error(f'ARC is sleeping for {sleep_time * i} seconds before re-trying, ' - f'please check your connectivity.') - logger.info('ZZZZZ..... ZZZZZ.....') - time.sleep(sleep_time * i) # in seconds - i += 1 - else: - success = True - if not success: - raise ServerError(f'Could not write file {remote_file_path} on {self.server}. ' - f'Tried {max_times_to_try} times.') - - @check_connections - def _upload_file(self, remote_file_path: str, local_file_path: str = '', file_string: str = ''): - """ - Upload a file. If `file_string` is given, write it as the content of the file. - Else, if `local_file_path` is given, copy it to `remote_file_path`. - - Args: - remote_file_path (str): The path to write into on the remote server. - local_file_path (str, optional): The local file path to be copied to the remote location. - file_string (str, optional): The file content to be copied and saved as the remote file. - """ try: if file_string: with self._sftp.open(remote_file_path, 'w') as f_remote: @@ -188,12 +157,13 @@ def _upload_file(self, remote_file_path: str, local_file_path: str = '', file_st self._sftp.put(localpath=local_file_path, remotepath=remote_file_path) except IOError: - logger.debug( - f'Got an IOError when trying to upload file {remote_file_path} from {self.server}') - raise IOError( - f'Got an IOError when trying to upload file {remote_file_path} from {self.server}') - - def download_file(self, remote_file_path: str, local_file_path: str): + logger.deug(f'Could not upload file {local_file_path} to {self.server}!') + raise ServerError(f'Could not write file {remote_file_path} on {self.server}. ') + + def download_file(self, + remote_file_path: str, + local_file_path: str, + ) -> None: """ A modulator function of _download_file(). Download a file from the server. @@ -204,10 +174,6 @@ def download_file(self, remote_file_path: str, local_file_path: str): Raises: ServerError: If the file cannot be downloaded with maximum times to try """ - i, max_times_to_try = 1, 30 - success = False - sleep_time = 10 # seconds - if not self._check_file_exists(remote_file_path): # Check if a file exists # This doesn't have a real impact now to avoid screwing up ESS trsh @@ -216,40 +182,13 @@ def download_file(self, remote_file_path: str, local_file_path: str): # an empty file will be created at the local path logger.debug( f'{remote_file_path} does not exist on {self.server}.') - - while i < max_times_to_try and not success: - self._download_file(remote_file_path, local_file_path) - if os.path.isfile(local_file_path): - success = True - else: - logger.error(f'Could not download file {remote_file_path} from {self.server}!') - logger.error(f'ARC is sleeping for {sleep_time * i} seconds before re-trying, ' - f'please check your connectivity.') - logger.info('ZZZZZ..... ZZZZZ.....') - time.sleep(sleep_time * i) # in seconds - i += 1 - if not success: - raise ServerError(f'Could not download file {remote_file_path} from {self.server}. ' - f'Tried {max_times_to_try} times.') - - @check_connections - def _download_file(self, remote_file_path: str, local_file_path: str): - """ - Download a file from the server. - - Args: - remote_file_path (str): The remote path to be downloaded from. - local_file_path (str): The local path to be downloaded to. - - Raises: - IOError: Cannot download file via sftp. - """ try: self._sftp.get(remotepath=remote_file_path, localpath=local_file_path) except IOError: logger.debug( f'Got an IOError when trying to download file {remote_file_path} from {self.server}') + raise ServerError(f'Could not download file {remote_file_path} from {self.server}. ') @check_connections def read_remote_file(self, remote_file_path: str) -> list: @@ -267,40 +206,15 @@ def read_remote_file(self, remote_file_path: str) -> list: return content def check_job_status(self, job_id: int) -> str: - """ - A modulator method of _check_job_status(). Check job's status. - - Args: - job_id (int): The job's ID. - - Returns: - str: Possible statuses: `before_submission`, `running`, `errored on node xx`, - `done`, and `connection error` - """ - i = 1 - sleep_time = 1 # minutes - while i < 30: - result = self._check_job_status(job_id) - if result == 'connection error': - logger.error(f'ARC is sleeping for {sleep_time * i} min before re-trying, ' - f'please check your connectivity.') - logger.info('ZZZZZ..... ZZZZZ.....') - time.sleep(sleep_time * i * 60) # in seconds - else: - i = 1000 - i += 1 - return result - - def _check_job_status(self, job_id: int) -> str: """ Check job's status. Args: job_id (int): The job's ID. - + Returns: str: Possible statuses: `before_submission`, `running`, `errored on node xx`, - `done`, and `connection error` + `done`, and `errored: ...` """ cmd = check_status_command[servers[self.server]['cluster_soft']] + ' -u $USER' stdout, stderr = self._send_command_to_server(cmd) @@ -310,7 +224,7 @@ def _check_job_status(self, job_id: int) -> str: if stderr: logger.info('\n\n') logger.error(f'Could not check status of job {job_id} due to {stderr}') - return 'connection error' + return f'errored: {stderr}' return check_job_status_in_stdout(job_id=job_id, stdout=stdout, server=self.server) def delete_job(self, job_id: int): From 9f31eaf7ef9bc825ad2838d69416b515737d5e39 Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Sat, 14 Mar 2020 12:18:19 -0400 Subject: [PATCH 16/22] Clean up the type hints and comments in SSHClient --- arc/job/ssh.py | 126 +++++++++++++++++++++++++++---------------------- 1 file changed, 69 insertions(+), 57 deletions(-) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index e29b9ff3b3..0fa4742d4e 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -14,13 +14,18 @@ import os import re import time +from typing import Any, Callable, List, Optional, Tuple, Union import paramiko from arc.common import get_logger from arc.exceptions import InputError, ServerError -from arc.settings import check_status_command, delete_command, list_available_nodes_command, \ - servers, submit_command, submit_filename +from arc.settings import (check_status_command, + delete_command, + list_available_nodes_command, + servers, + submit_command, + submit_filename) logger = get_logger() @@ -64,8 +69,10 @@ class SSHClient(object): address (str): The server's address. un (str): The username to use on the server. key (str): A path to a file containing the RSA SSH private key to the server. + _ssh (paramiko.SSHClient): A high-level representation of a session with an SSH server. + _sftp (paramiko.sftp_client.SFTPClient): SFTP client used to perform remote file operations. """ - def __init__(self, server=''): + def __init__(self, server: str = '') -> None: if server == '': raise ValueError('A server name must be specified') if server not in servers.keys(): @@ -78,27 +85,29 @@ def __init__(self, server=''): self._ssh = None logging.getLogger("paramiko").setLevel(logging.WARNING) - def __enter__(self): + def __enter__(self) -> 'SSHClient': self.connect() return self - def __exit__(self, exc_type, exc_value, exc_traceback): + def __exit__(self, exc_type, exc_value, exc_traceback) -> None: self.close() @check_connections - def _send_command_to_server(self, command: str, remote_path: str= '') -> (list, list): + def _send_command_to_server(self, + command: Union[str, list], + remote_path: Optional[str] = '', + ) -> Tuple[list, list]: """ - A wapper for exec_command in paramiko. SSHClient. Send commands to the server. + A wrapper for exec_command in paramiko.SSHClient. Send commands to the server. Args: - command (str or list): A string or an array of string commands to send. - remote_path (str, optional): The directory path at which the command will be executed. - - Returns: - list: A list of lines of standard output stream. + command (Union[str, list]): A string or an array of string commands to send. + remote_path (Optional[str]): The directory path at which the command will be executed. Returns: - list: A list of lines of standard error stream. + Tuple[list, list]: + - A list of lines of standard output stream. + - A list of lines of the standard error stream. """ if isinstance(command, list): command = '; '.join(command) @@ -123,15 +132,18 @@ def _send_command_to_server(self, command: str, remote_path: str= '') -> (list, stderr = stderr.readlines() return stdout, stderr - def upload_file(self, remote_file_path: str, local_file_path: str = '', file_string: str = ''): + def upload_file(self, + remote_file_path: str, + local_file_path: Optional[str] = '', + file_string: Optional[str] = '', + ) -> None: """ - A modulator method of _upload_file(). Upload a local file or contents - from a string to the remote server. + Upload a local file or contents from a string to the remote server. Args: remote_file_path (str): The path to write into on the remote server. - local_file_path (str, optional): The local file path to be copied to the remote location. - file_string (str, optional): The file content to be copied and saved as the remote file. + local_file_path (Optional[str]): The local file path to be copied to the remote location. + file_string (Optional[str]): The file content to be copied and saved as the remote file. Raises: InputError: If both `local_file_path` or `file_string` are invalid, @@ -157,15 +169,15 @@ def upload_file(self, remote_file_path: str, local_file_path: str = '', file_str self._sftp.put(localpath=local_file_path, remotepath=remote_file_path) except IOError: - logger.deug(f'Could not upload file {local_file_path} to {self.server}!') + logger.debug(f'Could not upload file {local_file_path} to {self.server}!') raise ServerError(f'Could not write file {remote_file_path} on {self.server}. ') - + def download_file(self, remote_file_path: str, local_file_path: str, ) -> None: """ - A modulator function of _download_file(). Download a file from the server. + Download a file from the server. Args: remote_file_path (str): The remote path to be downloaded from. @@ -227,7 +239,7 @@ def check_job_status(self, job_id: int) -> str: return f'errored: {stderr}' return check_job_status_in_stdout(job_id=job_id, stdout=stdout, server=self.server) - def delete_job(self, job_id: int): + def delete_job(self, job_id: int) -> None: """ Deletes a running job. @@ -253,18 +265,19 @@ def check_running_jobs_ids(self) -> list: running_jobs_ids.append(int(status_line.split()[0])) return running_jobs_ids - def submit_job(self, remote_path: str) -> (str, int): + def submit_job(self, remote_path: str) -> Tuple[str, int]: """ Submit a job to the server. Args: - remote_path (str): The remote path contains the input file and the submission script. + remote_path (str): The remote path contains the input file + and the submission script. Returns: - str: A string indicate the status of job submission. Either `errored` or `submitted`. - - Returns: - int: the job ID of the submitted job. + Tuple[str, int]: + - A string indicate the status of job submission. + Either `errored` or `submitted`. + - The job ID of the submitted job. """ job_status = '' job_id = 0 @@ -285,21 +298,15 @@ def submit_job(self, remote_path: str) -> (str, int): elif cluster_soft.lower() == 'slurm': job_id = int(stdout[0].split()[3]) else: - raise ValueError(f'Unrecognized cluster software {servers[self.server]["cluster_soft"]}') + raise ValueError(f'Unrecognized cluster software: {cluster_soft}') return job_status, job_id - def connect(self): + def connect(self) -> None: """ A modulator function for _connect(). Connect to the server. Raises: ServerError: Cannot connect to the server with maximum times to try - - Returns: - paramiko.sftp_client.SFTPClient - - Returns: - paramiko.SSHClient """ times_tried = 0 max_times_to_try = 1440 # continue trying for 24 hrs (24 hr * 60 min/hr)... @@ -321,15 +328,14 @@ def connect(self): time.sleep(interval) raise ServerError(f'Could not connect to server {self.server} even after {times_tried} trials.') - def _connect(self): + def _connect(self) -> Tuple[paramiko.sftp_client.SFTPClient, paramiko.SSHClient]: """ Connect via paramiko, and open a SSH session as well as a SFTP session. Returns: - paramiko.sftp_client.SFTPClient - - Returns: - paramiko.SSHClient + Tuple[paramiko.sftp_client.SFTPClient, paramiko.SSHClient]: + - An SFTP client used to perform remote file operations. + - A high-level representation of a session with an SSH server. """ ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) @@ -343,7 +349,7 @@ def _connect(self): sftp = ssh.open_sftp() return sftp, ssh - def close(self): + def close(self) -> None: """ Close the connection to paramiko SSHClient and SFTPClient """ @@ -353,7 +359,9 @@ def close(self): self._ssh.close() @check_connections - def get_last_modified_time(self, remote_file_path: str): + def get_last_modified_time(self, + remote_file_path: str, + ) -> Optional[datetime.datetime]: """ Get the last modified time of a remote file. @@ -374,7 +382,6 @@ def list_dir(self, remote_path: str = '') -> list: List directory contents. Args: - mode (str): The mode change to be applied, can be either octal or symbolic. remote_path (str, optional): The directory path at which the command will be executed. """ command = f'ls -alF' @@ -394,9 +401,6 @@ def list_available_nodes(self) -> list: """ List available nodes on the server. - Args: - mode (str): The mode change to be applied, can be either octal or symbolic. - Returns: list: lines of the node hostnames. """ @@ -418,23 +422,26 @@ def list_available_nodes(self) -> list: def change_mode(self, mode: str, path: str, - recursive: bool = False, - remote_path: str = ''): + recursive: Optional[bool] = False, + remote_path: Optional[str] = '', + ) -> None: """ Change the mode to a file or a directory. Args: mode (str): The mode change to be applied, can be either octal or symbolic. path (str): The path to the file or the directory to be changed. - recursive (bool, optional): Whether to recursively change the mode to all files + recursive (Optional[bool]): Whether to recursively change the mode to all files under a directory.``True`` for recursively change. - remote_path (str, optional): The directory path at which the command will be executed. + remote_path (Optional[str]): The directory path at which the command will be executed. """ recursive = '-R' if recursive else '' command = f'chmod {recursive} {mode} {path}' self._send_command_to_server(command, remote_path) - def _check_file_exists(self, remote_file_path: str) -> bool: + def _check_file_exists(self, + remote_file_path: str, + ) -> bool: """ Check if a file exists on the remote server. @@ -450,7 +457,8 @@ def _check_file_exists(self, remote_file_path: str) -> bool: return True def _check_dir_exists(self, - remote_dir_path: str) -> bool: + remote_dir_path: str, + ) -> bool: """ Check if a directory exists on the remote server. @@ -465,7 +473,7 @@ def _check_dir_exists(self, if len(stdout): return True - def _create_dir(self, remote_path: str): + def _create_dir(self, remote_path: str) -> None: """ Create a new directory on the server. @@ -479,13 +487,16 @@ def _create_dir(self, remote_path: str): f'Cannot create dir for the given path ({remote_path}).\nGot: {stderr}') -def check_job_status_in_stdout(job_id, stdout, server): +def check_job_status_in_stdout(job_id: int, + stdout: Union[list, str], + server: str, + ) -> str: """ A helper function for checking job status. Args: job_id (int): the job ID recognized by the server. - stdout (list, str): The output of a queue status check. + stdout (Union[list, str]): The output of a queue status check. server (str): The server name. Returns: @@ -516,7 +527,8 @@ def check_job_status_in_stdout(job_id, stdout, server): raise ValueError(f'Unknown cluster software {servers[server]["cluster_soft"]}') -def delete_all_arc_jobs(server_list, jobs=None): + +def delete_all_arc_jobs(server_list: list, jobs: Optional[List[str]] = None) -> None: """ Delete all ARC-spawned jobs (with job name starting with `a` and a digit) from :list:servers (`servers` could also be a string of one server name) From ff219c1da6ddfe03ded54c383c251350c356949b Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Sat, 14 Mar 2020 14:23:53 -0400 Subject: [PATCH 17/22] Logging exception from _ssh.exec_command for future debug --- arc/job/ssh.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index 0fa4742d4e..8ddf8b83e7 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -123,11 +123,13 @@ def _send_command_to_server(self, f'Cannot execute command at given remote_path({remote_path})') try: _, stdout, stderr = self._ssh.exec_command(command) - except: # SSHException: Timeout opening channel. + except Exception as e: # SSHException: Timeout opening channel. + logger.debug(f'ssh timed-out in the first trial. Got:{e}') try: # try again _, stdout, stderr = self._ssh.exec_command(command) - except: - return '', 'ssh timed-out after two trials' + except Exception as e: + logger.debug(f'ssh timed-out after two trials. Got:{e}') + return ['',], ['ssh timed-out after two trials',] stdout = stdout.readlines() stderr = stderr.readlines() return stdout, stderr From 5d617cef28729b5715409b1bbadb6a11f4174db8 Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Sat, 14 Mar 2020 14:36:23 -0400 Subject: [PATCH 18/22] Increase banner time out to mitigate banner related issue --- arc/job/ssh.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index 8ddf8b83e7..0cce0e945e 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -343,11 +343,14 @@ def _connect(self) -> Tuple[paramiko.sftp_client.SFTPClient, paramiko.SSHClient] ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.load_system_host_keys(filename=self.key) try: - ssh.connect(hostname=self.address, username=self.un) + # If the server accepts the connection but the SSH daemon doesn't respond in + # 15 seconds (default in paramiko) due to network congestion, faulty switches, + # etc..., common solution is to enlarging the timeout variable. + ssh.connect(hostname=self.address, username=self.un, banner_timeout=200) except: # This sometimes gives "SSHException: Error reading SSH protocol banner[Error 104] Connection reset by peer" # Try again: - ssh.connect(hostname=self.address, username=self.un) + ssh.connect(hostname=self.address, username=self.un, banner_timeout=200) sftp = ssh.open_sftp() return sftp, ssh From 86974504ae1ee94c36e93383b44a5909c03a92ab Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Wed, 18 Mar 2020 11:53:51 -0400 Subject: [PATCH 19/22] BugFix: submit script not able to copy files with weird names By adding `""`, now the bash can recognize uncommon file/folder names, e.g. names with '[]' or names with ' ' (space). Be sure to use `"` instead of `'`, since the their behaviors are different. --- arc/job/submit.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/arc/job/submit.py b/arc/job/submit.py index 22479bd0d9..f2cacbe63b 100644 --- a/arc/job/submit.py +++ b/arc/job/submit.py @@ -41,12 +41,12 @@ cd $WorkDir . $g09root/g09/bsd/g09.profile -cp $SubmitDir/input.gjf . -cp $SubmitDir/check.chk . +cp "$SubmitDir/input.gjf" . +cp "$SubmitDir/check.chk" . g09 < input.gjf > input.log formchk check.chk check.fchk -cp * $SubmitDir/ +cp * "$SubmitDir/" rm -rf $GAUSS_SCRDIR rm -rf $WorkDir @@ -86,10 +86,10 @@ mkdir -p $WorkDir cd $WorkDir -cp $SubmitDir/input.inp . +cp "$SubmitDir/input.inp" . ${ORCA_DIR}/orca input.inp > input.log -cp * $SubmitDir/ +cp * "$SubmitDir/" rm -rf $WorkDir @@ -129,12 +129,12 @@ cd $WorkDir . $g16root/g16/bsd/g16.profile -cp $SubmitDir/input.gjf . -cp $SubmitDir/check.chk . +cp "$SubmitDir/input.gjf" . +cp "$SubmitDir/check.chk" . g16 < input.gjf > input.log formchk check.chk check.fchk -cp * $SubmitDir/ +cp * "$SubmitDir/" rm -rf $GAUSS_SCRDIR rm -rf $WorkDir @@ -165,12 +165,12 @@ mkdir -p $sdir cd $sdir -cp $SubmitDir/input.in . +cp "$SubmitDir/input.in" . molpro -n {cpus} -d $sdir input.in -cp input.* $SubmitDir/ -cp geometry*.* $SubmitDir/ +cp input.* "$SubmitDir/" +cp geometry*.* "$SubmitDir/" rm -rf $sdir @@ -225,10 +225,10 @@ mkdir -p $WorkDir cd $WorkDir -cp $SubmitDir/input.inp . +cp "$SubmitDir/input.inp" . ${ORCA_DIR}/orca input.inp > input.log -cp * $SubmitDir/ +cp * "$SubmitDir/" rm -rf $WorkDir @@ -409,10 +409,10 @@ mkdir -p $WorkDir cd $WorkDir -cp $SubmitDir/input.in . +cp "$SubmitDir/input.in" . /opt/orca/orca input.in > input.log -cp * $SubmitDir/ +cp * "$SubmitDir/" rm -rf $WorkDir From 761e12b515680866d3c450a4073e8f5eac8dbdfc Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Tue, 28 Apr 2020 15:28:53 -0400 Subject: [PATCH 20/22] Add delete_jobs() to SSHClient and clean delete_all_arc_jobs --- arc/job/ssh.py | 41 +++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/arc/job/ssh.py b/arc/job/ssh.py index 0cce0e945e..502e1d3538 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -12,7 +12,6 @@ import datetime import logging import os -import re import time from typing import Any, Callable, List, Optional, Tuple, Union @@ -241,16 +240,34 @@ def check_job_status(self, job_id: int) -> str: return f'errored: {stderr}' return check_job_status_in_stdout(job_id=job_id, stdout=stdout, server=self.server) - def delete_job(self, job_id: int) -> None: + def delete_job(self, job_id: Union[int, str]) -> None: """ Deletes a running job. Args: - job_id (int): The job's ID. + job_id (Union[int, str]): The job's ID. """ cmd = delete_command[servers[self.server]['cluster_soft']] + ' ' + str(job_id) self._send_command_to_server(cmd) + def delete_jobs(self, + jobs: Optional[List[Union[str, int]]] = None + ) -> None: + """ + Delete all of the jobs on a specific server. + + Args: + jobs (Optional[List[str, int]]): Specific ARC job IDs to delete. + """ + jobs_message = f'{len(jobs)}' if jobs is not None else 'all' + print(f'\nDeleting {jobs_message} ARC jobs from {self.server}...') + + running_job_ids = self.check_running_jobs_ids() + for job_id in running_job_ids: + if jobs is None or str(job_id) in jobs: + self.delete_job(job_id) + print(f'deleted job {job_id}') + def check_running_jobs_ids(self) -> list: """ Check all jobs submitted by the user on a server. @@ -532,7 +549,6 @@ def check_job_status_in_stdout(job_id: int, raise ValueError(f'Unknown cluster software {servers[server]["cluster_soft"]}') - def delete_all_arc_jobs(server_list: list, jobs: Optional[List[str]] = None) -> None: """ Delete all ARC-spawned jobs (with job name starting with `a` and a digit) from :list:servers @@ -547,22 +563,7 @@ def delete_all_arc_jobs(server_list: list, jobs: Optional[List[str]] = None) -> if isinstance(server_list, str): server_list = [server_list] for server in server_list: - jobs_message = f'{len(jobs)}' if jobs is not None else 'all' - print(f'\nDeleting {jobs_message} ARC jobs from {server}...') - cmd = check_status_command[servers[server]['cluster_soft']] + ' -u $USER' with SSHClient(server) as ssh: - stdout = ssh._send_command_to_server(cmd)[0] - for status_line in stdout: - s = re.search(r' a\d+', status_line) - if s is not None: - job_id = s.group()[1:] - if job_id in jobs or jobs is None: - if servers[server]['cluster_soft'].lower() == 'slurm': - server_job_id = status_line.split()[0] - ssh.delete_job(server_job_id) - print(f'deleted job {job_id} ({server_job_id} on server)') - elif servers[server]['cluster_soft'].lower() == 'oge': - ssh.delete_job(job_id) - print(f'deleted job {job_id}') + ssh.delete_jobs(jobs) if server_list: print('\ndone.') From d2d350035a1c12ea957dc38696e5c1515c7bb9e3 Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Tue, 28 Apr 2020 15:38:45 -0400 Subject: [PATCH 21/22] Minor: Modify the formatting in job.py --- arc/job/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arc/job/job.py b/arc/job/job.py index f9cf2d78be..965e5cf38c 100644 --- a/arc/job/job.py +++ b/arc/job/job.py @@ -1593,7 +1593,7 @@ def deduce_software(self): elif 'terachem' in esss: self.software = 'terachem' elif self.job_type in ['conformer', 'opt', 'freq', 'optfreq', 'sp', - 'directed_scan']: + 'directed_scan']: if self.method == 'hf': if 'gaussian' in esss: self.software = 'gaussian' From 9b09e4144dd384c7a16aaea0fbd120f89144d6ac Mon Sep 17 00:00:00 2001 From: Xiaorui Dong Date: Wed, 29 Apr 2020 20:59:51 -0400 Subject: [PATCH 22/22] BugFix: Correctly set the Molpro output name --- arc/job/job.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/arc/job/job.py b/arc/job/job.py index 965e5cf38c..fc1420e976 100644 --- a/arc/job/job.py +++ b/arc/job/job.py @@ -1310,10 +1310,9 @@ def _download_output_file(self): # download molpro log file (in addition to the output file) if self.software.lower() == 'molpro': - remote_log_file_path = os.path.join(self.remote_path, 'input.log') - local_log_file_path = os.path.join(self.local_path, 'output.log') - ssh.download_file(remote_file_path=remote_log_file_path, local_file_path=local_log_file_path) - if not os.path.isfile(local_log_file_path): + remote_log_file_path = os.path.join(self.remote_path, output_filename[self.software]) + ssh.download_file(remote_file_path=remote_log_file_path, local_file_path=self.local_path_to_output_file) + if not os.path.isfile(self.local_path_to_output_file): logger.warning(f'Could not download Molpro log file for {self.job_name} ' \ f'(this is not the output file)')