diff --git a/arc/job/job.py b/arc/job/job.py index 5d2ba4d0a9..fc1420e976 100644 --- a/arc/job/job.py +++ b/arc/job/job.py @@ -1228,40 +1228,38 @@ def write_input_file(self): self._upload_check_file(local_check_file_path=self.checkfile) def _upload_submit_file(self): - ssh = SSHClient(self.server) - ssh.send_command_to_server(command='mkdir -p {0}'.format(self.remote_path)) remote_file_path = os.path.join(self.remote_path, submit_filename[servers[self.server]['cluster_soft']]) - ssh.upload_file(remote_file_path=remote_file_path, file_string=self.submit) + with SSHClient(self.server) as ssh: + ssh.upload_file(remote_file_path=remote_file_path, file_string=self.submit) def _upload_input_file(self): - ssh = SSHClient(self.server) - ssh.send_command_to_server(command='mkdir -p {0}'.format(self.remote_path)) - if self.input is not None: - remote_file_path = os.path.join(self.remote_path, input_filename[self.software]) - ssh.upload_file(remote_file_path=remote_file_path, file_string=self.input) - for up_file in self.additional_files_to_upload: - if up_file['source'] == 'path': - local_file_path = up_file['local'] - elif up_file['source'] == 'input_files': - local_file_path = input_files[up_file['local']] - else: - raise JobError('Unclear file source for {0}. Should either be "path" of "input_files", ' - 'got: {1}'.format(up_file['name'], up_file['source'])) - ssh.upload_file(remote_file_path=up_file['remote'], local_file_path=local_file_path) - if up_file['make_x']: - ssh.send_command_to_server(command='chmod +x {0}'.format(up_file['name']), remote_path=self.remote_path) - self.initial_time = ssh.get_last_modified_time( - remote_file_path=os.path.join(self.remote_path, submit_filename[servers[self.server]['cluster_soft']])) + with SSHClient(self.server) as ssh: + if self.input is not None: + remote_file_path = os.path.join(self.remote_path, input_filename[self.software]) + ssh.upload_file(remote_file_path=remote_file_path, file_string=self.input) + for up_file in self.additional_files_to_upload: + if up_file['source'] == 'path': + local_file_path = up_file['local'] + elif up_file['source'] == 'input_files': + local_file_path = input_files[up_file['local']] + else: + raise JobError(f'Unclear file source for {up_file["name"]}. Should either be "path" or ' + f'"input_files", got: {up_file["source"]}') + ssh.upload_file(remote_file_path=up_file['remote'], local_file_path=local_file_path) + if up_file['make_x']: + ssh.change_mode(mode='+x', path=up_file['name'], remote_path=self.remote_path) + self.initial_time = ssh.get_last_modified_time( + remote_file_path=os.path.join(self.remote_path, submit_filename[servers[self.server]['cluster_soft']])) def _upload_check_file(self, local_check_file_path=None): if self.server != 'local': - ssh = SSHClient(self.server) remote_check_file_path = os.path.join(self.remote_path, 'check.chk') - local_check_file_path = os.path.join(self.local_path, 'check.chk') if remote_check_file_path is None\ + local_check_file_path = os.path.join(self.local_path, 'check.chk') if local_check_file_path is None\ else local_check_file_path if os.path.isfile(local_check_file_path) and self.software.lower() == 'gaussian': - ssh.upload_file(remote_file_path=remote_check_file_path, local_file_path=local_check_file_path) - logger.debug('uploading checkpoint file for {0}'.format(self.job_name)) + with SSHClient(self.server) as ssh: + ssh.upload_file(remote_file_path=remote_check_file_path, local_file_path=local_check_file_path) + logger.debug(f'uploading checkpoint file for {self.job_name}') else: # running locally, just copy the check file to the job folder new_check_file_path = os.path.join(self.local_path, 'check.chk') @@ -1271,67 +1269,66 @@ def _download_output_file(self): """ Download ESS output, orbitals check file, and the Gaussian check file, if relevant. """ - ssh = SSHClient(self.server) - - # download output file - remote_file_path = os.path.join(self.remote_path, output_filename[self.software]) - ssh.download_file(remote_file_path=remote_file_path, local_file_path=self.local_path_to_output_file) - if not os.path.isfile(self.local_path_to_output_file): - raise JobError('output file for {0} was not downloaded properly'.format(self.job_name)) - self.final_time = ssh.get_last_modified_time(remote_file_path=remote_file_path) - - # download orbitals FChk file - if self.job_type == 'orbitals': - remote_file_path = os.path.join(self.remote_path, 'input.FChk') - ssh.download_file(remote_file_path=remote_file_path, local_file_path=self.local_path_to_orbitals_file) - if not os.path.isfile(self.local_path_to_orbitals_file): - logger.warning('Orbitals FChk file for {0} was not downloaded properly ' - '(this is not the Gaussian formatted check file...)'.format(self.job_name)) - - # download Gaussian check file - if self.software.lower() == 'gaussian': - remote_check_file_path = os.path.join(self.remote_path, 'check.chk') - ssh.download_file(remote_file_path=remote_check_file_path, local_file_path=self.local_path_to_check_file) - if not os.path.isfile(self.local_path_to_check_file): - logger.warning('Gaussian check file for {0} was not downloaded properly'.format(self.job_name)) - - # download Orca .hess hessian file generated by frequency calculations - # Hessian is useful when the user would like to project rotors - if self.software.lower() == 'orca' and self.job_type == 'freq': - remote_hess_file_path = os.path.join(self.remote_path, 'input.hess') - ssh.download_file(remote_file_path=remote_hess_file_path, local_file_path=self.local_path_to_hess_file) - if not os.path.isfile(self.local_path_to_hess_file): - logger.warning(f'Orca hessian file for {self.job_name} was not downloaded properly') - - # download Lennard_Jones data file - if self.software.lower() == 'onedmin': - remote_lj_file_path = os.path.join(self.remote_path, 'lj.dat') - ssh.download_file(remote_file_path=remote_lj_file_path, local_file_path=self.local_path_to_lj_file) - if not os.path.isfile(self.local_path_to_lj_file): - logger.warning('Lennard-Jones data file for {0} was not downloaded properly'.format(self.job_name)) - - # download molpro log file (in addition to the output file) - if self.software.lower() == 'molpro': - remote_log_file_path = os.path.join(self.remote_path, 'input.log') - local_log_file_path = os.path.join(self.local_path, 'output.log') - ssh.download_file(remote_file_path=remote_log_file_path, local_file_path=local_log_file_path) - if not os.path.isfile(local_log_file_path): - logger.warning('Could not download Molpro log file for {0} ' - '(this is not the output file)'.format(self.job_name)) - - # download terachem files (in addition to the output file) - if self.software.lower() == 'terachem': - base_path = os.path.join(self.remote_path, 'scr') - filenames = ['results.dat', 'output.molden', 'charge.xls', 'charge_mull.xls', 'optlog.xls', 'optim.xyz', - 'Frequencies.dat', 'I_matrix.dat', 'Mass.weighted.modes.dat', 'moments_of_inertia.dat', - 'output.basis', 'output.geometry', 'output.molden', 'Reduced.mass.dat', 'results.dat'] - for filename in filenames: - remote_log_file_path = os.path.join(base_path, filename) - local_log_file_path = os.path.join(self.local_path, 'scr', filename) - ssh.download_file(remote_file_path=remote_log_file_path, local_file_path=local_log_file_path) - xyz_path = os.path.join(base_path, 'optim.xyz') - if os.path.isfile(xyz_path): - self.local_path_to_xyz = xyz_path + with SSHClient(self.server) as ssh: + + # download output file + remote_file_path = os.path.join(self.remote_path, output_filename[self.software]) + ssh.download_file(remote_file_path=remote_file_path, local_file_path=self.local_path_to_output_file) + if not os.path.isfile(self.local_path_to_output_file): + raise JobError(f'output file for {self.job_name} was not downloaded properly') + self.final_time = ssh.get_last_modified_time(remote_file_path=remote_file_path) + + # download orbitals FChk file + if self.job_type == 'orbitals': + remote_file_path = os.path.join(self.remote_path, 'input.FChk') + ssh.download_file(remote_file_path=remote_file_path, local_file_path=self.local_path_to_orbitals_file) + if not os.path.isfile(self.local_path_to_orbitals_file): + logger.warning(f'Orbitals FChk file for {self.job_name} was not downloaded properly ' + f'(this is not the Gaussian formatted check file...)') + + # download Gaussian check file + if self.software.lower() == 'gaussian': + remote_check_file_path = os.path.join(self.remote_path, 'check.chk') + ssh.download_file(remote_file_path=remote_check_file_path, local_file_path=self.local_path_to_check_file) + if not os.path.isfile(self.local_path_to_check_file): + logger.warning(f'Gaussian check file for {self.job_name} was not downloaded properly') + + # download Orca .hess hessian file generated by frequency calculations + # Hessian is useful when the user would like to project rotors + if self.software.lower() == 'orca' and self.job_type == 'freq': + remote_hess_file_path = os.path.join(self.remote_path, 'input.hess') + ssh.download_file(remote_file_path=remote_hess_file_path, local_file_path=self.local_path_to_hess_file) + if not os.path.isfile(self.local_path_to_hess_file): + logger.warning(f'Orca hessian file for {self.job_name} was not downloaded properly') + + # download Lennard_Jones data file + if self.software.lower() == 'onedmin': + remote_lj_file_path = os.path.join(self.remote_path, 'lj.dat') + ssh.download_file(remote_file_path=remote_lj_file_path, local_file_path=self.local_path_to_lj_file) + if not os.path.isfile(self.local_path_to_lj_file): + logger.warning(f'Lennard-Jones data file for {self.job_name} was not downloaded properly') + + # download molpro log file (in addition to the output file) + if self.software.lower() == 'molpro': + remote_log_file_path = os.path.join(self.remote_path, output_filename[self.software]) + ssh.download_file(remote_file_path=remote_log_file_path, local_file_path=self.local_path_to_output_file) + if not os.path.isfile(self.local_path_to_output_file): + logger.warning(f'Could not download Molpro log file for {self.job_name} ' \ + f'(this is not the output file)') + + # download terachem files (in addition to the output file) + if self.software.lower() == 'terachem': + base_path = os.path.join(self.remote_path, 'scr') + filenames = ['results.dat', 'output.molden', 'charge.xls', 'charge_mull.xls', 'optlog.xls', 'optim.xyz', + 'Frequencies.dat', 'I_matrix.dat', 'Mass.weighted.modes.dat', 'moments_of_inertia.dat', + 'output.basis', 'output.geometry', 'output.molden', 'Reduced.mass.dat', 'results.dat'] + for filename in filenames: + remote_log_file_path = os.path.join(base_path, filename) + local_log_file_path = os.path.join(self.local_path, 'scr', filename) + ssh.download_file(remote_file_path=remote_log_file_path, local_file_path=local_log_file_path) + xyz_path = os.path.join(base_path, 'optim.xyz') + if os.path.isfile(xyz_path): + self.local_path_to_xyz = xyz_path def run(self): """ @@ -1352,15 +1349,9 @@ def run(self): logger.debug('writing input file...') self.write_input_file() if self.server != 'local': - ssh = SSHClient(self.server) logger.debug('submitting job...') # submit_job returns job server status and job server id - try: - self.job_status[0], self.job_id = ssh.submit_job(remote_path=self.remote_path) - except IndexError: - # if the connection broke, the files might not have been uploaded correctly - self.write_submit_script() - self.write_input_file() + with SSHClient(self.server) as ssh: self.job_status[0], self.job_id = ssh.submit_job(remote_path=self.remote_path) else: # running locally @@ -1372,9 +1363,9 @@ def delete(self): """ logger.debug('Deleting job {name} for {label}'.format(name=self.job_name, label=self.species_name)) if self.server != 'local': - ssh = SSHClient(self.server) - logger.debug('deleting job on {0}...'.format(self.server)) - ssh.delete_job(self.job_id) + logger.debug(f'deleting job on {self.server}...') + with SSHClient(self.server) as ssh: + ssh.delete_job(self.job_id) else: logger.debug('deleting job locally...') delete_job(job_id=self.job_id) @@ -1421,7 +1412,6 @@ def _get_additional_job_info(self): """ Download the additional information of stdout and stderr from the server. """ - ssh = None lines1, lines2 = list(), list() content = '' cluster_soft = servers[self.server]['cluster_soft'].lower() @@ -1429,21 +1419,20 @@ def _get_additional_job_info(self): local_file_path1 = os.path.join(self.local_path, 'out.txt') local_file_path2 = os.path.join(self.local_path, 'err.txt') if self.server != 'local': - ssh = SSHClient(self.server) remote_file_path = os.path.join(self.remote_path, 'out.txt') - try: - ssh.download_file(remote_file_path=remote_file_path, local_file_path=local_file_path1) - except (TypeError, IOError) as e: - logger.warning('Got the following error when trying to download out.txt for {0}:'.format( - self.job_name)) - logger.warning(e) - remote_file_path = os.path.join(self.remote_path, 'err.txt') - try: - ssh.download_file(remote_file_path=remote_file_path, local_file_path=local_file_path2) - except (TypeError, IOError) as e: - logger.warning('Got the following error when trying to download err.txt for {0}:'.format( - self.job_name)) - logger.warning(e) + with SSHClient(self.server) as ssh: + try: + ssh.download_file(remote_file_path=remote_file_path, + local_file_path=local_file_path1) + except (TypeError, IOError) as e: + logger.warning(f'Got the following error when trying to download out.txt for {self.job_name}:') + logger.warning(e) + remote_file_path = os.path.join(self.remote_path, 'err.txt') + try: + ssh.download_file(remote_file_path=remote_file_path, local_file_path=local_file_path2) + except (TypeError, IOError) as e: + logger.warning(f'Got the following error when trying to download err.txt for {self.job_name}:') + logger.warning(e) if os.path.isfile(local_file_path1): with open(local_file_path1, 'r') as f: lines1 = f.readlines() @@ -1455,8 +1444,8 @@ def _get_additional_job_info(self): content += ''.join([line for line in lines2]) elif cluster_soft == 'slurm': if self.server != 'local': - ssh = SSHClient(self.server) - response = ssh.send_command_to_server(command='ls -alF', remote_path=self.remote_path) + with SSHClient(self.server) as ssh: + response = ssh.list_dir(remote_path=self.remote_path) else: response = execute_command('ls -alF {0}'.format(self.local_path)) files = list() @@ -1468,11 +1457,12 @@ def _get_additional_job_info(self): if self.server != 'local': remote_file_path = os.path.join(self.remote_path, file_name) try: - ssh.download_file(remote_file_path=remote_file_path, local_file_path=local_file_path) + with SSHClient(self.server) as ssh: + ssh.download_file(remote_file_path=remote_file_path, + local_file_path=local_file_path) except (TypeError, IOError) as e: - logger.warning('Got the following error when trying to download {0} for {1}:'.format( - file_name, self.job_name)) - logger.warning(e) + logger.warning(f'Got the following error when trying to download {file_name} ' \ + f'for {self.job_name}: {e}') if os.path.isfile(local_file_path): with open(local_file_path, 'r') as f: lines1 = f.readlines() @@ -1485,8 +1475,8 @@ def _check_job_server_status(self): Possible statuses: `initializing`, `running`, `errored on node xx`, `done`. """ if self.server != 'local': - ssh = SSHClient(self.server) - return ssh.check_job_status(self.job_id) + with SSHClient(self.server) as ssh: + return ssh.check_job_status(self.job_id) else: return check_job_status(self.job_id) @@ -1602,7 +1592,7 @@ def deduce_software(self): elif 'terachem' in esss: self.software = 'terachem' elif self.job_type in ['conformer', 'opt', 'freq', 'optfreq', 'sp', - 'directed_scan']: + 'directed_scan']: if self.method == 'hf': if 'gaussian' in esss: self.software = 'gaussian' diff --git a/arc/job/ssh.py b/arc/job/ssh.py index 1a44e0b8fc..502e1d3538 100644 --- a/arc/job/ssh.py +++ b/arc/job/ssh.py @@ -12,19 +12,50 @@ import datetime import logging import os -import re import time +from typing import Any, Callable, List, Optional, Tuple, Union import paramiko from arc.common import get_logger from arc.exceptions import InputError, ServerError -from arc.settings import servers, check_status_command, submit_command, submit_filename, delete_command +from arc.settings import (check_status_command, + delete_command, + list_available_nodes_command, + servers, + submit_command, + submit_filename) logger = get_logger() +def check_connections(function: Callable[..., Any]) -> Callable[..., Any]: + """ + A decorator designned for ``SSHClient``to check SSH connections before + calling a method. It first checks if ``self._ssh`` is available in a + SSHClient instance and then checks if you can send ``ls`` and get response + to make sure your connection still alive. If connection is bad, this + decorator will reconnect the SSH channel, to avoid connection related + error when executing the method. + """ + def decorator(*args, **kwargs) -> Any: + self = args[0] + if self._ssh is None: # not sure if some status may cause False + self._sftp, self._ssh = self.connect() + # test connection, reference: + # https://stackoverflow.com/questions/ + # 20147902/how-to-know-if-a-paramiko-ssh-channel-is-disconnected + # According to author, maybe no better way + try: + self._ssh.exec_command('ls') + except Exception as e: + logger.debug(f'The connection is no longer valid. {e}') + self.connect() + return function(*args, **kwargs) + return decorator + + class SSHClient(object): """ This is a class for communicating with remote servers via SSH. @@ -37,8 +68,10 @@ class SSHClient(object): address (str): The server's address. un (str): The username to use on the server. key (str): A path to a file containing the RSA SSH private key to the server. + _ssh (paramiko.SSHClient): A high-level representation of a session with an SSH server. + _sftp (paramiko.sftp_client.SFTPClient): SFTP client used to perform remote file operations. """ - def __init__(self, server=''): + def __init__(self, server: str = '') -> None: if server == '': raise ValueError('A server name must be specified') if server not in servers.keys(): @@ -47,178 +80,229 @@ def __init__(self, server=''): self.address = servers[server]['address'] self.un = servers[server]['un'] self.key = servers[server]['key'] + self._sftp = None + self._ssh = None logging.getLogger("paramiko").setLevel(logging.WARNING) - def send_command_to_server(self, command, remote_path=''): + def __enter__(self) -> 'SSHClient': + self.connect() + return self + + def __exit__(self, exc_type, exc_value, exc_traceback) -> None: + self.close() + + @check_connections + def _send_command_to_server(self, + command: Union[str, list], + remote_path: Optional[str] = '', + ) -> Tuple[list, list]: """ - Send commands to server. `command` is either a sting or an array of string commands to send. - If remote_path is not an empty string, the command will be executed in the directory path it points to. - Returns lists of stdout, stderr corresponding to the commands sent. + A wrapper for exec_command in paramiko.SSHClient. Send commands to the server. + + Args: + command (Union[str, list]): A string or an array of string commands to send. + remote_path (Optional[str]): The directory path at which the command will be executed. + + Returns: + Tuple[list, list]: + - A list of lines of standard output stream. + - A list of lines of the standard error stream. """ - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh.load_system_host_keys(filename=self.key) - try: - ssh.connect(hostname=self.address, username=self.un) - except: - return '', 'paramiko failed to connect' if isinstance(command, list): command = '; '.join(command) if remote_path != '': # execute command in remote_path directory. - # Since each `.exec_command()` is a single session, `cd` has to be added to all commands. - command = f'cd {remote_path}; {command}' + # Check remote path existence, otherwise the cmd will be invalid + # and even yield different behaviors. + # Make sure to change directory back after the command is executed + if self._check_dir_exists(remote_path): + command = f'cd "{remote_path}"; {command}; cd ' + else: + raise InputError( + f'Cannot execute command at given remote_path({remote_path})') try: - _, stdout, stderr = ssh.exec_command(command) - except: # SSHException: Timeout opening channel. + _, stdout, stderr = self._ssh.exec_command(command) + except Exception as e: # SSHException: Timeout opening channel. + logger.debug(f'ssh timed-out in the first trial. Got:{e}') try: # try again - _, stdout, stderr = ssh.exec_command(command) - except: - return '', 'ssh timed-out after two trials' + _, stdout, stderr = self._ssh.exec_command(command) + except Exception as e: + logger.debug(f'ssh timed-out after two trials. Got:{e}') + return ['',], ['ssh timed-out after two trials',] stdout = stdout.readlines() stderr = stderr.readlines() - ssh.close() return stdout, stderr - def upload_file(self, remote_file_path, local_file_path='', file_string=''): + def upload_file(self, + remote_file_path: str, + local_file_path: Optional[str] = '', + file_string: Optional[str] = '', + ) -> None: """ - Upload `local_file_path` or the contents of `file_string` to `remote_file_path`. - Either `file_string` or `local_file_path` must be given. + Upload a local file or contents from a string to the remote server. + + Args: + remote_file_path (str): The path to write into on the remote server. + local_file_path (Optional[str]): The local file path to be copied to the remote location. + file_string (Optional[str]): The file content to be copied and saved as the remote file. + + Raises: + InputError: If both `local_file_path` or `file_string` are invalid, + or `local_file_path` does not exists. + ServerError: If the file cannot be uploaded with maximum times to try """ + if not local_file_path and not file_string: + raise InputError('Cannot not upload file to server. Either `file_string` or `local_file_path`' + ' must be specified') if local_file_path and not os.path.isfile(local_file_path): raise InputError(f'Cannot upload a non-existing file. ' f'Check why file in path {local_file_path} is missing.') - sftp, ssh = self.connect() - i, max_times_to_try = 1, 30 - success = False - sleep_time = 10 # seconds - while i < max_times_to_try: - try: - write_file(sftp, remote_file_path, local_file_path, file_string) - except IOError: - logger.error(f'Could not upload file {local_file_path} to {self.server}!') - logger.error(f'ARC is sleeping for {sleep_time * i} seconds before re-trying, ' - f'please check your connectivity.') - logger.info('ZZZZZ..... ZZZZZ.....') - time.sleep(sleep_time * i) # in seconds - else: - success = True - i = 1000 - i += 1 - if not success: - raise ServerError(f'Could not write file {remote_file_path} on {self.server}. ' - f'Tried {max_times_to_try} times.') - sftp.close() - ssh.close() - - def download_file(self, remote_file_path, local_file_path): - """ - Download a file from `remote_file_path` to `local_file_path`. - """ - i, max_times_to_try = 1, 30 - success = False - sleep_time = 10 # seconds - while i < 30: - self._download_file(remote_file_path, local_file_path) - if os.path.isfile(local_file_path): - success = True - i = 1000 + # If the directory does not exist, _upload_file cannot create a file based on the given path + remote_dir_path = os.path.dirname(remote_file_path) + if not self._check_dir_exists(remote_dir_path): + self._create_dir(remote_dir_path) + + try: + if file_string: + with self._sftp.open(remote_file_path, 'w') as f_remote: + f_remote.write(file_string) else: - logger.error(f'Could not download file {remote_file_path} from {self.server}!') - logger.error(f'ARC is sleeping for {sleep_time * i} seconds before re-trying, ' - f'please check your connectivity.') - logger.info('ZZZZZ..... ZZZZZ.....') - time.sleep(sleep_time * i) # in seconds - i += 1 - if not success: - raise ServerError(f'Could not download file {remote_file_path} from {self.server}. ' - f'Tried {max_times_to_try} times.') + self._sftp.put(localpath=local_file_path, + remotepath=remote_file_path) + except IOError: + logger.debug(f'Could not upload file {local_file_path} to {self.server}!') + raise ServerError(f'Could not write file {remote_file_path} on {self.server}. ') - def _download_file(self, remote_file_path, local_file_path): + def download_file(self, + remote_file_path: str, + local_file_path: str, + ) -> None: """ - Download a file from `remote_file_path` to `local_file_path`. + Download a file from the server. + + Args: + remote_file_path (str): The remote path to be downloaded from. + local_file_path (str): The local path to be downloaded to. + + Raises: + ServerError: If the file cannot be downloaded with maximum times to try """ - sftp, ssh = self.connect() + if not self._check_file_exists(remote_file_path): + # Check if a file exists + # This doesn't have a real impact now to avoid screwing up ESS trsh + # but introduce an opportunity for better troubleshooting. + # The current behavior is that if the remote path does not exist + # an empty file will be created at the local path + logger.debug( + f'{remote_file_path} does not exist on {self.server}.') try: - sftp.get(remotepath=remote_file_path, localpath=local_file_path) + self._sftp.get(remotepath=remote_file_path, + localpath=local_file_path) except IOError: - logger.debug(f'Got an IOError when trying to download file {remote_file_path} from {self.server}') - sftp.close() - ssh.close() + logger.debug( + f'Got an IOError when trying to download file {remote_file_path} from {self.server}') + raise ServerError(f'Could not download file {remote_file_path} from {self.server}. ') - def read_remote_file(self, remote_path, filename): + @check_connections + def read_remote_file(self, remote_file_path: str) -> list: """ - Read a remote file. `remote_path` is the remote path (required), a `filename` is also required. - Returns the file's content. + Read a remote file. + + Args: + remote_file_path (str): The remote path to be read. + + Returns: + list: A list of lines read from the file. """ - sftp, ssh = self.connect() - full_path = os.path.join(remote_path, filename) - with sftp.open(full_path, 'r') as f_remote: + with self._sftp.open(remote_file_path, 'r') as f_remote: content = f_remote.readlines() - sftp.close() - ssh.close() return content - def check_job_status(self, job_id): - """ - A modulator method of _check_job_status() + def check_job_status(self, job_id: int) -> str: """ - i = 1 - sleep_time = 1 # minutes - while i < 30: - result = self._check_job_status(job_id) - if result == 'connection error': - logger.error(f'ARC is sleeping for {sleep_time * i} min before re-trying, ' - f'please check your connectivity.') - logger.info('ZZZZZ..... ZZZZZ.....') - time.sleep(sleep_time * i * 60) # in seconds - else: - i = 1000 - i += 1 - return result + Check job's status. - def _check_job_status(self, job_id): - """ - Possible statuses: `before_submission`, `running`, `errored on node xx`, `done` - Status line formats: - pharos: '540420 0.45326 xq1340b user_name r 10/26/2018 11:08:30 long1@node18.cluster' - rmg: '14428 debug xq1371m2 user_name R 50-04:04:46 1 node06' + Args: + job_id (int): The job's ID. + + Returns: + str: Possible statuses: `before_submission`, `running`, `errored on node xx`, + `done`, and `errored: ...` """ cmd = check_status_command[servers[self.server]['cluster_soft']] + ' -u $USER' - stdout, stderr = self.send_command_to_server(cmd) + stdout, stderr = self._send_command_to_server(cmd) + # Status line formats: + # OGE: '540420 0.45326 xq1340b user_name r 10/26/2018 11:08:30 long1@node18.cluster' + # SLURM: '14428 debug xq1371m2 user_name R 50-04:04:46 1 node06' if stderr: logger.info('\n\n') logger.error(f'Could not check status of job {job_id} due to {stderr}') - return 'connection error' + return f'errored: {stderr}' return check_job_status_in_stdout(job_id=job_id, stdout=stdout, server=self.server) - def delete_job(self, job_id): + def delete_job(self, job_id: Union[int, str]) -> None: """ - Deletes a running job + Deletes a running job. + + Args: + job_id (Union[int, str]): The job's ID. """ cmd = delete_command[servers[self.server]['cluster_soft']] + ' ' + str(job_id) - self.send_command_to_server(cmd) + self._send_command_to_server(cmd) + + def delete_jobs(self, + jobs: Optional[List[Union[str, int]]] = None + ) -> None: + """ + Delete all of the jobs on a specific server. - def check_running_jobs_ids(self): + Args: + jobs (Optional[List[str, int]]): Specific ARC job IDs to delete. + """ + jobs_message = f'{len(jobs)}' if jobs is not None else 'all' + print(f'\nDeleting {jobs_message} ARC jobs from {self.server}...') + + running_job_ids = self.check_running_jobs_ids() + for job_id in running_job_ids: + if jobs is None or str(job_id) in jobs: + self.delete_job(job_id) + print(f'deleted job {job_id}') + + def check_running_jobs_ids(self) -> list: """ - Return a list of ``int`` representing job IDs of all jobs submitted by the user on a server + Check all jobs submitted by the user on a server. + + Returns: + list: A list of job IDs """ running_jobs_ids = list() cmd = check_status_command[servers[self.server]['cluster_soft']] + ' -u $USER' - stdout = self.send_command_to_server(cmd)[0] + stdout = self._send_command_to_server(cmd)[0] for i, status_line in enumerate(stdout): if (servers[self.server]['cluster_soft'].lower() == 'slurm' and i > 0)\ or (servers[self.server]['cluster_soft'].lower() == 'oge' and i > 1): running_jobs_ids.append(int(status_line.split()[0])) return running_jobs_ids - def submit_job(self, remote_path): - """Submit a job""" + def submit_job(self, remote_path: str) -> Tuple[str, int]: + """ + Submit a job to the server. + + Args: + remote_path (str): The remote path contains the input file + and the submission script. + + Returns: + Tuple[str, int]: + - A string indicate the status of job submission. + Either `errored` or `submitted`. + - The job ID of the submitted job. + """ job_status = '' job_id = 0 - cmd = submit_command[servers[self.server]['cluster_soft']] + ' '\ - + submit_filename[servers[self.server]['cluster_soft']] - stdout, stderr = self.send_command_to_server(cmd, remote_path) + cluster_soft = servers[self.server]['cluster_soft'] + cmd = submit_command[cluster_soft] + ' ' + submit_filename[cluster_soft] + stdout, stderr = self._send_command_to_server(cmd, remote_path) if len(stderr) > 0 or len(stdout) == 0: logger.warning(f'Got stderr when submitting job:\n{stderr}') job_status = 'errored' @@ -228,23 +312,28 @@ def submit_job(self, remote_path): f'settings, such as cpus and memory, in ARC/arc/settings.py') elif 'submitted' in stdout[0].lower(): job_status = 'running' - if servers[self.server]['cluster_soft'].lower() == 'oge': + if cluster_soft.lower() == 'oge': job_id = int(stdout[0].split()[2]) - elif servers[self.server]['cluster_soft'].lower() == 'slurm': + elif cluster_soft.lower() == 'slurm': job_id = int(stdout[0].split()[3]) else: - raise ValueError(f'Unrecognized cluster software {servers[self.server]["cluster_soft"]}') + raise ValueError(f'Unrecognized cluster software: {cluster_soft}') return job_status, job_id - def connect(self): - """A helper function for calling self.try_connecting until successful""" + def connect(self) -> None: + """ + A modulator function for _connect(). Connect to the server. + + Raises: + ServerError: Cannot connect to the server with maximum times to try + """ times_tried = 0 - max_times_to_try = 1440 # continue trying for 24 hrs... + max_times_to_try = 1440 # continue trying for 24 hrs (24 hr * 60 min/hr)... interval = 60 # wait 60 sec between trials while times_tried < max_times_to_try: times_tried += 1 try: - sftp, ssh = self.try_connecting() + self._sftp, self._ssh = self._connect() except Exception as e: if not times_tried % 10: logger.info(f'Tried connecting to {self.server} {times_tried} times with no success...' @@ -254,66 +343,182 @@ def connect(self): f'\nGot: {e}') else: logger.debug(f'Successfully connected to {self.server} at the {times_tried} trial.') - return sftp, ssh + return time.sleep(interval) raise ServerError(f'Could not connect to server {self.server} even after {times_tried} trials.') - def try_connecting(self): - """A helper function for connecting via paramiko, returns the `sftp` and `ssh` objects""" + def _connect(self) -> Tuple[paramiko.sftp_client.SFTPClient, paramiko.SSHClient]: + """ + Connect via paramiko, and open a SSH session as well as a SFTP session. + + Returns: + Tuple[paramiko.sftp_client.SFTPClient, paramiko.SSHClient]: + - An SFTP client used to perform remote file operations. + - A high-level representation of a session with an SSH server. + """ ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.load_system_host_keys(filename=self.key) try: - ssh.connect(hostname=self.address, username=self.un) + # If the server accepts the connection but the SSH daemon doesn't respond in + # 15 seconds (default in paramiko) due to network congestion, faulty switches, + # etc..., common solution is to enlarging the timeout variable. + ssh.connect(hostname=self.address, username=self.un, banner_timeout=200) except: # This sometimes gives "SSHException: Error reading SSH protocol banner[Error 104] Connection reset by peer" # Try again: - ssh.connect(hostname=self.address, username=self.un) + ssh.connect(hostname=self.address, username=self.un, banner_timeout=200) sftp = ssh.open_sftp() return sftp, ssh - def get_last_modified_time(self, remote_file_path): - """returns the last modified time of `remote_file_path` in a datetime format""" - sftp, ssh = self.connect() + def close(self) -> None: + """ + Close the connection to paramiko SSHClient and SFTPClient + """ + if self._sftp is not None: + self._sftp.close() + if self._ssh is not None: + self._ssh.close() + + @check_connections + def get_last_modified_time(self, + remote_file_path: str, + ) -> Optional[datetime.datetime]: + """ + Get the last modified time of a remote file. + + Args: + remote_file_path (str): The remote file path to check. + + Returns: + datetime.datetime: the last modified time of the file + """ try: - timestamp = sftp.stat(remote_file_path).st_mtime + timestamp = self._sftp.stat(remote_file_path).st_mtime except IOError: return None - sftp.close() - ssh.close() return datetime.datetime.fromtimestamp(timestamp) + def list_dir(self, remote_path: str = '') -> list: + """ + List directory contents. -def write_file(sftp, remote_file_path, local_file_path='', file_string=''): - """ - Write a file. If `file_string` is given, write it as the content of the file. - Else, if `local_file_path` is given, copy it to `remote_file_path`. + Args: + remote_path (str, optional): The directory path at which the command will be executed. + """ + command = f'ls -alF' + return self._send_command_to_server(command, remote_path)[0] - Args: - sftp (paramiko's SFTP): The SFTP object. - remote_file_path (str): The path to write into on the remote server. - local_file_path (str, optional): A local file path to be copied into the remote location. - file_string (str): The file content to be copied and saved as the remote file. - """ - with sftp.open(remote_file_path, 'w') as f_remote: - if file_string: - f_remote.write(file_string) - elif local_file_path: - # with open(local_file_path, 'r') as f_local: - # f_remote.write(f_local.readlines()) - sftp.put(localpath=local_file_path, remotepath=remote_file_path) - else: - raise ValueError('Could not upload file to server. Either `file_string` or `local_file_path`' - ' must be specified') + def find_package(self, package_name: str) -> list: + """ + Find the path to the package. + Args: + package_name (str): The name of the package to search for. + """ + command = f'. ~/.bashrc; which {package_name}' + return self._send_command_to_server(command)[0] + + def list_available_nodes(self) -> list: + """ + List available nodes on the server. -def check_job_status_in_stdout(job_id, stdout, server): + Returns: + list: lines of the node hostnames. + """ + cluster_soft = servers[self.server]['cluster_soft'] + cmd = list_available_nodes_command[cluster_soft] + stdout = self._send_command_to_server(command=cmd)[0] + if cluster_soft.lower() == 'oge': + # Stdout line example: + # long1@node01.cluster BIP 0/0/8 -NA- lx24-amd64 aAdu + nodes = [line.split()[0].split('@')[1] + for line in stdout if '0/0/8' in line] + elif cluster_soft.lower() == 'slurm': + # Stdout line example: + # node01 alloc 1.00 none + nodes = [line.split()[0] for line in stdout + if line.split()[1] in ['mix', 'alloc', 'idle']] + return nodes + + def change_mode(self, + mode: str, + path: str, + recursive: Optional[bool] = False, + remote_path: Optional[str] = '', + ) -> None: + """ + Change the mode to a file or a directory. + + Args: + mode (str): The mode change to be applied, can be either octal or symbolic. + path (str): The path to the file or the directory to be changed. + recursive (Optional[bool]): Whether to recursively change the mode to all files + under a directory.``True`` for recursively change. + remote_path (Optional[str]): The directory path at which the command will be executed. + """ + recursive = '-R' if recursive else '' + command = f'chmod {recursive} {mode} {path}' + self._send_command_to_server(command, remote_path) + + def _check_file_exists(self, + remote_file_path: str, + ) -> bool: + """ + Check if a file exists on the remote server. + + Args: + remote_file_path (str): The path to the file on the remote server. + + Returs: + bool: If the file exists on the remote server. ``True`` if exist. + """ + command = f'[ -f "{remote_file_path}" ] && echo "File exists"' + stdout, _ = self._send_command_to_server(command, remote_path='') + if len(stdout): + return True + + def _check_dir_exists(self, + remote_dir_path: str, + ) -> bool: + """ + Check if a directory exists on the remote server. + + Args: + remote_dir_path (str): The path to the directory on the remote server. + + Returns: + bool: If the directory exists on the remote server. ``True`` if exist. + """ + command = f'[ -d "{remote_dir_path}" ] && echo "Dir exists"' + stdout, _ = self._send_command_to_server(command) + if len(stdout): + return True + + def _create_dir(self, remote_path: str) -> None: + """ + Create a new directory on the server. + + Args: + remote_path (str): The path to the directory to create on the remote server. + """ + command = f'mkdir -p "{remote_path}"' + _, stderr = self._send_command_to_server(command) + if stderr: + raise ServerError( + f'Cannot create dir for the given path ({remote_path}).\nGot: {stderr}') + + +def check_job_status_in_stdout(job_id: int, + stdout: Union[list, str], + server: str, + ) -> str: """ A helper function for checking job status. Args: job_id (int): the job ID recognized by the server. - stdout (list, str): The output of a queue status check. + stdout (Union[list, str]): The output of a queue status check. server (str): The server name. Returns: @@ -344,7 +549,7 @@ def check_job_status_in_stdout(job_id, stdout, server): raise ValueError(f'Unknown cluster software {servers[server]["cluster_soft"]}') -def delete_all_arc_jobs(server_list, jobs=None): +def delete_all_arc_jobs(server_list: list, jobs: Optional[List[str]] = None) -> None: """ Delete all ARC-spawned jobs (with job name starting with `a` and a digit) from :list:servers (`servers` could also be a string of one server name) @@ -358,22 +563,7 @@ def delete_all_arc_jobs(server_list, jobs=None): if isinstance(server_list, str): server_list = [server_list] for server in server_list: - jobs_message = f'{len(jobs)}' if jobs is not None else 'all' - print(f'\nDeleting {jobs_message} ARC jobs from {server}...') - cmd = check_status_command[servers[server]['cluster_soft']] + ' -u $USER' - ssh = SSHClient(server) - stdout = ssh.send_command_to_server(cmd)[0] - for status_line in stdout: - s = re.search(r' a\d+', status_line) - if s is not None: - job_id = s.group()[1:] - if job_id in jobs or jobs is None: - if servers[server]['cluster_soft'].lower() == 'slurm': - server_job_id = status_line.split()[0] - ssh.delete_job(server_job_id) - print(f'deleted job {job_id} ({server_job_id} on server)') - elif servers[server]['cluster_soft'].lower() == 'oge': - ssh.delete_job(job_id) - print(f'deleted job {job_id}') + with SSHClient(server) as ssh: + ssh.delete_jobs(jobs) if server_list: print('\ndone.') diff --git a/arc/job/submit.py b/arc/job/submit.py index 22479bd0d9..f2cacbe63b 100644 --- a/arc/job/submit.py +++ b/arc/job/submit.py @@ -41,12 +41,12 @@ cd $WorkDir . $g09root/g09/bsd/g09.profile -cp $SubmitDir/input.gjf . -cp $SubmitDir/check.chk . +cp "$SubmitDir/input.gjf" . +cp "$SubmitDir/check.chk" . g09 < input.gjf > input.log formchk check.chk check.fchk -cp * $SubmitDir/ +cp * "$SubmitDir/" rm -rf $GAUSS_SCRDIR rm -rf $WorkDir @@ -86,10 +86,10 @@ mkdir -p $WorkDir cd $WorkDir -cp $SubmitDir/input.inp . +cp "$SubmitDir/input.inp" . ${ORCA_DIR}/orca input.inp > input.log -cp * $SubmitDir/ +cp * "$SubmitDir/" rm -rf $WorkDir @@ -129,12 +129,12 @@ cd $WorkDir . $g16root/g16/bsd/g16.profile -cp $SubmitDir/input.gjf . -cp $SubmitDir/check.chk . +cp "$SubmitDir/input.gjf" . +cp "$SubmitDir/check.chk" . g16 < input.gjf > input.log formchk check.chk check.fchk -cp * $SubmitDir/ +cp * "$SubmitDir/" rm -rf $GAUSS_SCRDIR rm -rf $WorkDir @@ -165,12 +165,12 @@ mkdir -p $sdir cd $sdir -cp $SubmitDir/input.in . +cp "$SubmitDir/input.in" . molpro -n {cpus} -d $sdir input.in -cp input.* $SubmitDir/ -cp geometry*.* $SubmitDir/ +cp input.* "$SubmitDir/" +cp geometry*.* "$SubmitDir/" rm -rf $sdir @@ -225,10 +225,10 @@ mkdir -p $WorkDir cd $WorkDir -cp $SubmitDir/input.inp . +cp "$SubmitDir/input.inp" . ${ORCA_DIR}/orca input.inp > input.log -cp * $SubmitDir/ +cp * "$SubmitDir/" rm -rf $WorkDir @@ -409,10 +409,10 @@ mkdir -p $WorkDir cd $WorkDir -cp $SubmitDir/input.in . +cp "$SubmitDir/input.in" . /opt/orca/orca input.in > input.log -cp * $SubmitDir/ +cp * "$SubmitDir/" rm -rf $WorkDir diff --git a/arc/job/trsh.py b/arc/job/trsh.py index 6224332fab..df71da3d65 100644 --- a/arc/job/trsh.py +++ b/arc/job/trsh.py @@ -25,7 +25,6 @@ from arc.settings import (delete_command, inconsistency_ab, inconsistency_az, - list_available_nodes_command, maximum_barrier, preserve_param_in_scan_stable, rotor_scan_resolution, @@ -1120,54 +1119,61 @@ def trsh_job_on_server(server: str, bool: Whether to re-run the job, `True` to rerun. """ server_nodes = server_nodes if server_nodes is not None else list() + cluster_soft = servers[server]['cluster_soft'] if job_server_status != 'done': logger.error(f'Job {job_name} has server status "{job_server_status}" on {server}.') # delete current server run - command = delete_command[servers[server]['cluster_soft']] + ' ' + str(job_id) if server == 'local': - execute_command(command) + cmd = delete_command[cluster_soft] + ' ' + str(job_id) + execute_command(cmd) return None, True else: - ssh = SSHClient(server) - ssh.send_command_to_server(command) - - if servers[server]['cluster_soft'].lower() == 'oge': - logger.error('Troubleshooting by changing node.') - ssh = SSHClient(server) - # find available nodes - stdout = ssh.send_command_to_server(command=list_available_nodes_command[servers[server]['cluster_soft']])[0] - for line in stdout: - node = line.split()[0].split('.')[0].split('node')[1] - if servers[server]['cluster_soft'] == 'OGE' and '0/0/8' in line and node not in server_nodes: - server_nodes.append(node) - break - else: - logger.error(f'Could not find an available node on the server {server}') - # TODO: continue troubleshooting; if all else fails, put the job to sleep, - # and try again searching for a node - return None, False - - # modify the submit file - content = ssh.read_remote_file(remote_path=remote_path, - filename=submit_filename[servers[server]['cluster_soft']]) - for i, line in enumerate(content): - if '#$ -l h=node' in line: - content[i] = '#$ -l h=node{0}.cluster'.format(node) - break - else: - content.insert(7, '#$ -l h=node{0}.cluster'.format(node)) - content = ''.join(content) # convert list into a single string, not to upset paramiko - # resubmit - ssh.upload_file(remote_file_path=os.path.join(remote_path, - submit_filename[servers[server]['cluster_soft']]), file_string=content) - return node, True - - elif servers[server]['cluster_soft'].lower() == 'slurm': - # TODO: change node on Slurm - return None, True + with SSHClient(server) as ssh: + ssh.delete_job(job_id) + + # find available node + logger.error('Troubleshooting by changing node.') + ssh = SSHClient(server) + nodes = ssh.list_available_nodes() + for node in nodes: + if node not in server_nodes: + server_nodes.append(node) + break + else: + logger.error(f'Could not find an available node on the server {server}') + # TODO: continue troubleshooting; if all else fails, put the job to sleep, + # and try again searching for a node + return None, False + + # modify the submit file + remote_submit_file = os.path.join(remote_path, submit_filename[cluster_soft]) + with SSHClient(server) as ssh: + content = ssh.read_remote_file(remote_file_path=remote_submit_file) + if cluster_soft.lower() == 'oge': + node_assign = '#$ -l h=' + insert_line_num = 7 + elif cluster_soft.lower() == 'slurm': + node_assign = '#$BATCH -w, --nodelist=' + insert_line_num = 5 + else: + # Other software? + logger.denug(f'Unknown cluster software {cluster_soft} is encountered when ' + f'troubleshooting by changing node.') + return None, False + for i, line in enumerate(content): + if node_assign in line: + content[i] = node_assign + node + break + else: + content.insert(insert_line_num, node_assign + node) + content = ''.join(content) # convert list into a single string, not to upset paramiko - return None, False + # resubmit + with SSHClient(server) as ssh: + ssh.upload_file(remote_file_path=os.path.join(remote_path, + submit_filename[cluster_soft]), file_string=content) + return node, True def scan_quality_check(label: str, diff --git a/arc/main.py b/arc/main.py index 686924d390..417fdd5ead 100644 --- a/arc/main.py +++ b/arc/main.py @@ -825,56 +825,49 @@ def determine_ess_settings(self, diagnostics=False): continue if diagnostics: logger.info('\nTrying {0}'.format(server)) - ssh = SSHClient(server) - - cmd = '. ~/.bashrc; which g03' - g03 = ssh.send_command_to_server(cmd)[0] - cmd = '. ~/.bashrc; which g09' - g09 = ssh.send_command_to_server(cmd)[0] - cmd = '. ~/.bashrc; which g16' - g16 = ssh.send_command_to_server(cmd)[0] - if g03 or g09 or g16: - if diagnostics: - logger.info(f' Found Gaussian on {server}: g03={g03}, g09={g09}, g16={g16}') - self.ess_settings['gaussian'].append(server) - elif diagnostics: - logger.info(f' Did NOT find Gaussian on {server}') - - cmd = '. ~/.bashrc; which qchem' - qchem = ssh.send_command_to_server(cmd)[0] - if qchem: - if diagnostics: - logger.info(f' Found QChem on {server}') - self.ess_settings['qchem'].append(server) - elif diagnostics: - logger.info(f' Did NOT find QChem on {server}') - - cmd = '. ~/.bashrc; which orca' - orca = ssh.send_command_to_server(cmd)[0] - if orca: - if diagnostics: - logger.info(f' Found Orca on {server}') - self.ess_settings['orca'].append(server) - elif diagnostics: - logger.info(f' Did NOT find Orca on {server}') - - cmd = '. ~/.bashrc; which terachem' - terachem = ssh.send_command_to_server(cmd)[0] - if terachem: - if diagnostics: - logging.info(f' Found TeraChem on {server}') - self.ess_settings['terachem'].append(server) - elif diagnostics: - logging.info(f' Did NOT find TeraChem on {server}') - - cmd = '. .bashrc; which molpro' - molpro = ssh.send_command_to_server(cmd)[0] - if molpro: - if diagnostics: - logger.info(f' Found Molpro on {server}') - self.ess_settings['molpro'].append(server) - elif diagnostics: - logger.info(f' Did NOT find Molpro on {server}') + with SSHClient(server) as ssh: + + g03 = ssh.find_package('g03') + g09 = ssh.find_package('g09') + g16 = ssh.find_package('g16') + if g03 or g09 or g16: + if diagnostics: + logger.info(f' Found Gaussian on {server}: g03={g03}, g09={g09}, g16={g16}') + self.ess_settings['gaussian'].append(server) + elif diagnostics: + logger.info(f' Did NOT find Gaussian on {server}') + + qchem = ssh.find_package('qchem') + if qchem: + if diagnostics: + logger.info(f' Found QChem on {server}') + self.ess_settings['qchem'].append(server) + elif diagnostics: + logger.info(f' Did NOT find QChem on {server}') + + orca = ssh.find_package('orca') + if orca: + if diagnostics: + logger.info(f' Found Orca on {server}') + self.ess_settings['orca'].append(server) + elif diagnostics: + logger.info(f' Did NOT find Orca on {server}') + + terachem = ssh.find_package('terachem') + if terachem: + if diagnostics: + logging.info(f' Found TeraChem on {server}') + self.ess_settings['terachem'].append(server) + elif diagnostics: + logging.info(f' Did NOT find TeraChem on {server}') + + molpro = ssh.find_package('molpro') + if molpro: + if diagnostics: + logger.info(f' Found Molpro on {server}') + self.ess_settings['molpro'].append(server) + elif diagnostics: + logger.info(f' Did NOT find Molpro on {server}') if diagnostics: logger.info('\n\n') if 'gaussian' in self.ess_settings.keys(): diff --git a/arc/scheduler.py b/arc/scheduler.py index 095056a9a2..84d790d0a7 100644 --- a/arc/scheduler.py +++ b/arc/scheduler.py @@ -2511,8 +2511,8 @@ def get_servers_jobs_ids(self): self.servers_jobs_ids = list() for server in self.servers: if server != 'local': - ssh = SSHClient(server) - self.servers_jobs_ids.extend(ssh.check_running_jobs_ids()) + with SSHClient(server) as ssh: + self.servers_jobs_ids.extend(ssh.check_running_jobs_ids()) else: self.servers_jobs_ids.extend(check_running_jobs_ids()) diff --git a/arc/settings.py b/arc/settings.py index a42a6157e5..1688245752 100644 --- a/arc/settings.py +++ b/arc/settings.py @@ -102,7 +102,7 @@ 'Slurm': '/usr/bin/scancel'} list_available_nodes_command = {'OGE': 'export SGE_ROOT=/opt/sge; /opt/sge/bin/lx24-amd64/qstat -f | grep "/8 " | grep "long" | grep -v "8/8"| grep -v "aAu"', - 'Slurm': 'sinfo'} + 'Slurm': 'sinfo -o "%n %t %O %E"'} submit_filename = {'OGE': 'submit.sh', 'Slurm': 'submit.sl'}