diff --git a/dls_backup.py b/dls_backup.py index 4640847..1d387a1 100755 --- a/dls_backup.py +++ b/dls_backup.py @@ -1,10 +1,8 @@ #!/usr/bin/env python3 -# coding: utf8 # a wrapper script to launch dls_backup_bl with correct module name from dls_backup_bl import BackupBeamline -if __name__ == '__main__': +if __name__ == "__main__": BackupBeamline().main() - diff --git a/dls_backup_gui.py b/dls_backup_gui.py index c5c5292..d323970 100755 --- a/dls_backup_gui.py +++ b/dls_backup_gui.py @@ -1,9 +1,8 @@ #!/usr/bin/env python3 -# coding: utf8 # a wrapper script to launch dls_backup_gui with correct module name from dls_backup_gui import dls_backup_gui -if __name__ == '__main__': +if __name__ == "__main__": dls_backup_gui.main() diff --git a/pyproject.toml b/pyproject.toml index 8761e2f..42a00a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,15 +13,16 @@ classifiers = [ ] description = "A backup tool for beamlines for pmacs, terminal servers, zebras" dependencies = [ - "pyqt5", - "pexpect", - "dls-pmacanalyse >=2.1.0", - "numpy", + "attrs>=19.1.0", "cothread", + "dls-pmacanalyse >=3.0.0", + "epicscorelibs", "gitpython", - "attrs>=19.1.0", - "urllib3==1.26.20", # pinning because we use deprecated DEFAULT_CIPHERS + "numpy", + "pyqt5", + "pexpect", "requests", + "urllib3==1.26.20", # pinning because we use deprecated DEFAULT_CIPHERS ] # Add project dependencies here, e.g. ["click", "numpy"] dynamic = ["version"] license.file = "LICENSE" @@ -39,6 +40,7 @@ dev = [ "pre-commit", "pytest", "pytest-cov", + "types-requests", "ruff", "tox-direct", "types-mock", @@ -112,6 +114,9 @@ lint.select = [ "UP", # pyupgrade - https://docs.astral.sh/ruff/rules/#pyupgrade-up "SLF", # self - https://docs.astral.sh/ruff/settings/#lintflake8-self ] +lint.ignore = [ + "E501", # allow long lines where ruff does not reformat +] [tool.ruff.lint.per-file-ignores] # By default, private member access is allowed in tests diff --git a/src/dls_backup_bl/backup.py b/src/dls_backup_bl/backup.py index a9baae1..6e58379 100644 --- a/src/dls_backup_bl/backup.py +++ b/src/dls_backup_bl/backup.py @@ -6,7 +6,6 @@ from logging import getLogger from multiprocessing.pool import ThreadPool from pathlib import Path -from typing import List from .brick import Brick from .config import BackupsConfig @@ -22,7 +21,7 @@ BACKUP ABORTED -The configuration file contains no devices for backup. +The configuration file contains no devices for backup. Please import the dls-pmac-analyse cfg file with --import-cfg and / or use dls-edit-backup.py to complete the device configuration. """ @@ -32,15 +31,15 @@ There is no backup area set up for this beamline. -Please import the dls-pmac-analyse cfg file with --import-cfg and / or +Please import the dls-pmac-analyse cfg file with --import-cfg and / or use dls-backup-gui.py to complete the device configuration. """ class Positions(Enum): - save = 'save' - restore = 'restore' - compare = 'compare' + save = "save" + restore = "restore" + compare = "compare" # noinspection PyBroadException @@ -53,9 +52,9 @@ def __init__(self): self.defaults: Defaults = None self.config: BackupsConfig = None - self.motor_controllers: List = None - self.terminal_servers: List = None - self.zebras: List = None + self.motor_controllers: list = None + self.terminal_servers: list = None + self.zebras: list = None self.email: str = None def setup_logging(self, level: str): @@ -72,21 +71,24 @@ def setup_logging(self, level: str): """ # basic config sets up the debugging log file - msg_f = '%(asctime)s %(levelname)-8s %(message)s (%(name)s)' - date_f = '%y-%m-%d %H:%M:%S' + msg_f = "%(asctime)s %(levelname)-8s %(message)s (%(name)s)" + date_f = "%y-%m-%d %H:%M:%S" logging.basicConfig( - level=logging.DEBUG, format=msg_f, datefmt=date_f, - filename=str(self.defaults.log_file), filemode='w' + level=logging.DEBUG, + format=msg_f, + datefmt=date_f, + filename=str(self.defaults.log_file), + filemode="w", ) # critical log file for emails and record of activity critical = logging.FileHandler( - filename=str(self.defaults.critical_log_file), mode='w' + filename=str(self.defaults.critical_log_file), mode="w" ) critical.setLevel(logging.CRITICAL) # console log file for immediate feedback - numeric_level = getattr(logging, level.upper(), None) + numeric_level = getattr(logging, level.upper(), 0) # suppress verbose logging in dependent libraries if numeric_level > logging.DEBUG: @@ -97,8 +99,8 @@ def setup_logging(self, level: str): console = logging.StreamHandler() # set a format which is simpler for console use formatter = logging.Formatter( - '%(asctime)s %(levelname)-10s %(message)s (%(name)s)', - datefmt='%y-%m-%d %H:%M:%S' + "%(asctime)s %(levelname)-10s %(message)s (%(name)s)", + datefmt="%y-%m-%d %H:%M:%S", ) # tell the handler to use this format console.setFormatter(formatter) @@ -112,57 +114,105 @@ def setup_logging(self, level: str): def parse_args(self): # Setup an argument Parser parser = argparse.ArgumentParser( - description='Backup PMAC & GeoBrick motor controllers, terminal ' - 'servers, and Zebra boxes. ' - 'RECOMMENDATION: run this program from a ' - 'workstation on the beamline to be backed up and ' - 'provide NO arguments except --email (see below for ' - 'defaults).', - usage="%(prog)s [options]") - parser.add_argument('-i', '--import-cfg', action="store", - help="import brick configuration from a " - "dls-pmac-analyse configuration file.") - parser.add_argument('-b', '--beamline', action="store", - help="Name of the beamline to backup. " - "The format is 'i16' or 'b07'. Defaults to " - "the current beamline") - parser.add_argument('--domain', action="store", - help='When BLXXY is not appropriate, use domain for' - ' the backup folder name. e.g. --domain ME01D') - parser.add_argument('--dir', action="store", - help="Directory to save backups to. Defaults to" - "/dls_sw/motion/Backups/BLXXY") - parser.add_argument('-j', action="store", dest="json_file", - help="JSON file of devices to be backed up. " - "Defaults to DIR/BLXXY-backup.json") - parser.add_argument('-r', '--retries', action="store", type=int, - default=4, - help="Number of times to attempt backup. " - "Defaults to 4") - parser.add_argument('-t', '--threads', action="store", type=int, - default=Defaults.threads, - help="Number of processor threads to use (Number " - "of simultaneous backups). Defaults to" - "10") - parser.add_argument('-e', '--email', action="store", - help="Email address to send backup reports to.") - parser.add_argument('-l', '--log-level', action="store", - default='info', - help="Set logging to error, warning, info, debug") - parser.add_argument('-d', '--devices', action="store", nargs='+', - help="only backup the listed named device") - parser.add_argument('-p', '--positions', action="store", - # todo make this neat, using Positions Enum - type=str, choices=['save', 'restore', 'compare'], - help="save and restore motor positions") - parser.add_argument('--folder', action="store_true", - help="report the motion backup folder that the " - "tool will use.") + description="Backup PMAC & GeoBrick motor controllers, terminal " + "servers, and Zebra boxes. " + "RECOMMENDATION: run this program from a " + "workstation on the beamline to be backed up and " + "provide NO arguments except --email (see below for " + "defaults).", + usage="%(prog)s [options]", + ) + parser.add_argument( + "-i", + "--import-cfg", + action="store", + help="import brick configuration from a " + "dls-pmac-analyse configuration file.", + ) + parser.add_argument( + "-b", + "--beamline", + action="store", + help="Name of the beamline to backup. " + "The format is 'i16' or 'b07'. Defaults to " + "the current beamline", + ) + parser.add_argument( + "--domain", + action="store", + help="When BLXXY is not appropriate, use domain for" + " the backup folder name. e.g. --domain ME01D", + ) + parser.add_argument( + "--dir", + action="store", + help="Directory to save backups to. Defaults to" + "/dls_sw/motion/Backups/BLXXY", + ) + parser.add_argument( + "-j", + action="store", + dest="json_file", + help="JSON file of devices to be backed up. " + "Defaults to DIR/BLXXY-backup.json", + ) + parser.add_argument( + "-r", + "--retries", + action="store", + type=int, + default=4, + help="Number of times to attempt backup. " "Defaults to 4", + ) + parser.add_argument( + "-t", + "--threads", + action="store", + type=int, + default=Defaults.threads, + help="Number of processor threads to use (Number " + "of simultaneous backups). Defaults to" + "10", + ) + parser.add_argument( + "-e", + "--email", + action="store", + help="Email address to send backup reports to.", + ) + parser.add_argument( + "-l", + "--log-level", + action="store", + default="info", + help="Set logging to error, warning, info, debug", + ) + parser.add_argument( + "-d", + "--devices", + action="store", + nargs="+", + help="only backup the listed named device", + ) + parser.add_argument( + "-p", + "--positions", + action="store", + # todo make this neat, using Positions Enum + type=str, + choices=["save", "restore", "compare"], + help="save and restore motor positions", + ) + parser.add_argument( + "--folder", + action="store_true", + help="report the motion backup folder that the " "tool will use.", + ) # Parse the command line arguments self.args = parser.parse_args() - def do_geobricks(self, pmacs: List[str] = None): + def do_geobricks(self, pmacs: list[str] | None = None): count = 0 # Go through every motor controller listed in JSON file for motor_controller in self.config.motion_controllers: @@ -175,17 +225,14 @@ def do_geobricks(self, pmacs: List[str] = None): uses_ts = int(port) != 1025 # Add a backup job to the pool - args = ( - controller, server, port, uses_ts, self.defaults - ) + args = (controller, server, port, uses_ts, self.defaults) - if not pmacs or any([(i in controller) for i in pmacs]): + if not pmacs or any((i in controller) for i in pmacs): count += 1 b = Brick(*args) - if self.args.positions == 'save' or \ - self.args.positions == 'compare': + if self.args.positions == "save" or self.args.positions == "compare": func = b.backup_positions - elif self.args.positions == 'restore': + elif self.args.positions == "restore": func = b.restore_positions else: func = b.backup_controller @@ -193,23 +240,21 @@ def do_geobricks(self, pmacs: List[str] = None): self.thread_pool.apply_async(func) return count - def do_t_servers(self, servers: List[str] = None): + def do_t_servers(self, servers: list[str] | None = None): count = 0 # Go through every terminal server listed in JSON file for terminal_server in self.config.terminal_servers: # Pull out the server details server = terminal_server.server - args = ( - server, terminal_server.ts_type, self.defaults - ) + args = (server, terminal_server.ts_type, self.defaults) # allows substring match of any devices entry against this server - if not servers or any([(i in server) for i in servers]): + if not servers or any((i in server) for i in servers): count += 1 # Add a backup job to the pool self.thread_pool.apply_async(backup_terminal_server, args) return count - def do_zebras(self, zebras: str = None): + def do_zebras(self, zebras: str | None = None): count = 0 # Go through every zebra listed in JSON file for z in self.config.zebras: @@ -219,7 +264,7 @@ def do_zebras(self, zebras: str = None): args = (name, self.defaults) # allows substring match of any devices entry against this server - if not zebras or any([(i in name) for i in zebras]): + if not zebras or any((i in name) for i in zebras): count += 1 # call zebra backup in main thread since it uses cothread backup_zebra(*args) @@ -242,16 +287,12 @@ def send_email(self): return try: - e_from = "From: {}\r\n".format(self.defaults.diamond_sender) - e_to = "To: {}\r\n".format(self.email) - e_subject = "Subject: {} Backup Report\r\n\r\n".format( - self.defaults.beamline - ) + e_from = f"From: {self.defaults.diamond_sender}\r\n" + e_to = f"To: {self.email}\r\n" + e_subject = f"Subject: {self.defaults.beamline} Backup Report\r\n\r\n" msg = e_from + e_to + e_subject + e_text mail_server = smtplib.SMTP(self.defaults.diamond_smtp) - mail_server.sendmail( - self.defaults.diamond_sender, self.email, msg - ) + mail_server.sendmail(self.defaults.diamond_sender, self.email, msg) mail_server.quit() log.critical("Sent Email report") except BaseException: @@ -260,19 +301,24 @@ def send_email(self): log.debug(msg, exc_info=True) def check_restore(self): - print("\nAre you sure? This will restore the most recent commit " - "and\noverwrite the motor positions on all pmacs (Y/N)") + print( + "\nAre you sure? This will restore the most recent commit " + "and\noverwrite the motor positions on all pmacs (Y/N)" + ) reply = input() - if reply[0].lower() != 'y': + if reply[0].lower() != "y": exit(0) restore_positions(self.defaults) def do_backups(self): - if self.args.positions == 'restore': + if self.args.positions == "restore": self.check_restore() else: - log.info("START OF BACKUP for beamline %s to %s", - self.defaults.beamline, self.defaults.backup_folder) + log.info( + "START OF BACKUP for beamline %s to %s", + self.defaults.beamline, + self.defaults.backup_folder, + ) # Initiate a thread pool with the desired number of threads self.thread_pool = ThreadPool(self.args.threads) @@ -290,8 +336,7 @@ def do_backups(self): # finish up self.sort_log() if total == 0: - log.critical("Nothing was backed up " - "(incorrect --devices argument?)") + log.critical("Nothing was backed up " "(incorrect --devices argument?)") if not self.args.positions: commit_changes(self.defaults, do_positions=False) @@ -308,7 +353,8 @@ def do_backups(self): print("The following command reviews the position files") print( f"more {self.defaults.motion_folder}/*" - f"{self.defaults.positions_suffix}") + f"{self.defaults.positions_suffix}" + ) def cancel(self, sig, frame): log.critical("Cancelled by the user") @@ -323,8 +369,11 @@ def main(self): signal.signal(signal.SIGINT, self.cancel) self.defaults = Defaults( - self.args.beamline, self.args.dir, self.args.json_file, - self.args.retries, domain=self.args.domain + self.args.beamline, + self.args.dir, + self.args.json_file, + self.args.retries, + domain=self.args.domain, ) if self.args.folder: diff --git a/src/dls_backup_bl/brick.py b/src/dls_backup_bl/brick.py index dd141fa..a32acc0 100644 --- a/src/dls_backup_bl/brick.py +++ b/src/dls_backup_bl/brick.py @@ -4,9 +4,9 @@ import telnetlib from logging import getLogger +from dls_pmacanalyse.errors import PmacReadError from dls_pmacanalyse.globalconfig import GlobalConfig from dls_pmacanalyse.pmac import Pmac -from dls_pmacanalyse.errors import PmacReadError from dls_pmaclib.dls_pmacremote import ( PmacEthernetInterface, PmacTelnetInterface, @@ -44,7 +44,7 @@ def __del__(self): self.pti.disconnect() def _check_connection(self): - for attempt_num in range(self.defaults.retries): + for _attempt_num in range(self.defaults.retries): try: t = telnetlib.Telnet() t.open(self.server, self.port, timeout=2) @@ -137,8 +137,8 @@ def backup_positions(self): def restore_positions(self): log.info(f"Sending motor positions for {self.desc}.") - - positionSFList = self.getPositionSF(self.controller,self.defaults) + + positionSFList = self.getPositionSF(self.controller, self.defaults) for attempt_num in range(self.defaults.retries): try: @@ -151,28 +151,26 @@ def restore_positions(self): # Mx62 in some cases cannot be written directly to the controller as the maximum # acceptable value appears to be 2^35. Here the value of Mx62 is calculated as a factor # of 1/(ix08*23) is written to the pmac as an expression - for i,l in enumerate(lines): - newL = l.split('=') + for i, line in enumerate(lines): + newL = line.split("=") newL = [a.strip() for a in newL] - if '62' in newL[0]: + if "62" in newL[0]: # Determine axis number M variable is related to - if newL[0] == 'M' or 'm': - axisNo = int(int(newL[0][1:])/100) - newL[1] = int(newL[1])*(1/positionSFList[axisNo]) - newL[1] = f'{int(newL[1])}/{1/positionSFList[axisNo]}' - lines[i] = f'{newL[0]} = {newL[1]}\n' - - pmc = list( - [ - (n + 1, l[:-1]) - for n, l in enumerate(lines) - if re.search(self.restore_commands, l) - ] - ) + if newL[0] == "M" or "m": + axisNo = int(int(newL[0][1:]) / 100) + newL[1] = int(newL[1]) * (1 / positionSFList[axisNo]) + newL[1] = f"{int(newL[1])}/{1/positionSFList[axisNo]}" + lines[i] = f"{newL[0]} = {newL[1]}\n" + + pmc = [ + (n + 1, line[:-1]) + for n, line in enumerate(lines) + if re.search(self.restore_commands, line) + ] # send ctrl K to kill all axes (otherwise the servo loop # will fight the change of position) self.pti.sendCommand("\u000b") - for (success, line, cmd, response) in self.pti.sendSeries(pmc): + for success, _line, cmd, response in self.pti.sendSeries(pmc): if not success: log.critical( f"ERROR: command '{cmd}' failed for {self.desc} (" @@ -243,9 +241,8 @@ def backup_controller(self): get_i08 = {i: re.compile(rf"i{i:d}08 *= *(-?[0-9]+)") for i in range(1, 33)} @classmethod - def getPositionSF(cls,brick,defaults: Defaults): - - scaleFactors = [0]*33 + def getPositionSF(cls, brick, defaults: Defaults): + scaleFactors = [0] * 33 pmc_file = defaults.motion_folder / (brick + ".pmc") try: with pmc_file.open("r") as f: @@ -256,13 +253,12 @@ def getPositionSF(cls,brick,defaults: Defaults): ) pmc = "" - for axis in range(1,33): + for axis in range(1, 33): r = re.search(cls.get_i08[axis], pmc) - a = int(r[1]) - if r: + if r is not None: scaleFactors[axis] = int(r[1]) * 32 else: - scaleFactors[axis]=1024 + scaleFactors[axis] = 1024 return scaleFactors @classmethod @@ -275,7 +271,7 @@ def diff_to_counts(cls, brick: str, diff_output: str, defaults: Defaults): :param defaults: a Defaults structure with names of folders etc. :return: str: human readable list of count differences per axis """ - scaleFactors = cls.getPositionSF(brick,defaults) + scaleFactors = cls.getPositionSF(brick, defaults) output = "" old_plcs = { diff --git a/src/dls_backup_bl/config.py b/src/dls_backup_bl/config.py index dbd4091..3ee3e24 100644 --- a/src/dls_backup_bl/config.py +++ b/src/dls_backup_bl/config.py @@ -3,7 +3,6 @@ from enum import IntEnum from logging import getLogger from pathlib import Path -from typing import List import attr @@ -15,6 +14,7 @@ # represents the configuration in memory the class BackupsConfig is the # root of those representations and provides load and save methods + class JsonAbleDictionaryTuple: def __getitem__(self, item): return self.__dict__[item] @@ -53,9 +53,9 @@ class Zebra(JsonAbleDictionaryTuple): @attr.s(auto_attribs=True) class BackupsConfig(JsonAbleDictionaryTuple): - motion_controllers: List[MotorController] - terminal_servers: List[TerminalServer] - zebras: List[Zebra] + motion_controllers: list[MotorController] + terminal_servers: list[TerminalServer] + zebras: list[Zebra] @classmethod def my_types(cls): @@ -71,12 +71,9 @@ def from_json(json_file: Path): try: with json_file.open() as f: raw_items = json.loads(f.read()) - m = [MotorController(*i.values()) for i in - raw_items["motion_controllers"]] - t = [TerminalServer(*i.values()) for i in - raw_items["terminal_servers"]] - z = [Zebra(*i.values()) for i in - raw_items["zebras"]] + m = [MotorController(*i.values()) for i in raw_items["motion_controllers"]] + t = [TerminalServer(*i.values()) for i in raw_items["terminal_servers"]] + z = [Zebra(*i.values()) for i in raw_items["zebras"]] except Exception: msg = "JSON file missing or invalid" log.debug(msg, exc_info=True) @@ -99,14 +96,14 @@ def dumps(self): return json.dumps(self, cls=ComplexEncoder, sort_keys=True, indent=4) def count_devices(self): - return len(self.motion_controllers) + \ - len(self.terminal_servers) + \ - len(self.zebras) + return ( + len(self.motion_controllers) + len(self.terminal_servers) + len(self.zebras) + ) class ComplexEncoder(json.JSONEncoder): def default(self, obj): - if hasattr(obj, '__dict__'): + if hasattr(obj, "__dict__"): return obj.__dict__ else: return json.JSONEncoder.default(self, obj) diff --git a/src/dls_backup_bl/config_schema.json b/src/dls_backup_bl/config_schema.json index ee29852..83e99d1 100644 --- a/src/dls_backup_bl/config_schema.json +++ b/src/dls_backup_bl/config_schema.json @@ -27,4 +27,4 @@ } }, "required": ["motor_controllers", "terminal_servers", "zebras"] -} \ No newline at end of file +} diff --git a/src/dls_backup_bl/defaults.py b/src/dls_backup_bl/defaults.py index f282da1..9da0fd2 100644 --- a/src/dls_backup_bl/defaults.py +++ b/src/dls_backup_bl/defaults.py @@ -1,16 +1,17 @@ import shutil import tempfile -from pathlib import Path from os import environ +from pathlib import Path class Defaults: """ manage default values for paths and other settings """ + # public fixed defaults - diamond_smtp: str = 'outbox.rl.ac.uk' - diamond_sender: str = 'backup_bl@diamond.ac.uk' + diamond_smtp: str = "outbox.rl.ac.uk" + diamond_sender: str = "backup_bl@diamond.ac.uk" root_folder = Path("/dls_sw/work/motion/Backups/") positions_suffix = "_positions.pmc" positions_file = "positions_comparison.txt" @@ -25,29 +26,32 @@ class Defaults: _retries: int = 4 def __init__( - self, beamline: str = None, backup_folder: Path = None, - config_file: Path = None, retries: int = 0, - config_file_only: bool = False, - domain: str = None + self, + beamline: str | None = None, + backup_folder: Path | None = None, + config_file: Path | None = None, + retries: int = 0, + config_file_only: bool = False, + domain: str | None = None, ): """ - Create an object to hold important file paths. - Pass in command line parameters which override defaults: - - :param beamline: the name of the beamline in the form 'i16' - :param backup_folder: override the default location for backups - :param config_file: where to read config if not from default - :param retries: number of backup retries on failure - :param config_file_only: if this is true do not require a valid - beamline setting when config_file is supplied. this is for - use by the GUI - :param domain: override the beamline name to give no BLXXY folder name + Create an object to hold important file paths. + Pass in command line parameters which override defaults: + + :param beamline: the name of the beamline in the form 'i16' + :param backup_folder: override the default location for backups + :param config_file: where to read config if not from default + :param retries: number of backup retries on failure + :param config_file_only: if this is true do not require a valid + beamline setting when config_file is supplied. this is for + use by the GUI + :param domain: override the beamline name to give no BLXXY folder name """ self._retries = retries if int(retries) > 0 else Defaults._retries self.temp_dir: Path = Path(tempfile.mkdtemp()) if config_file_only and config_file is not None: - self._beamline = '' + self._beamline = "" elif domain: self._beamline = domain else: @@ -61,9 +65,7 @@ def __init__( if config_file: self._config_file = Path(config_file) else: - name = Path("{}-{}".format( - self._beamline, Defaults._config_file_suffix) - ) + name = Path(f"{self._beamline}-{Defaults._config_file_suffix}") self._config_file = self._backup_folder / name def __del__(self): @@ -84,21 +86,21 @@ def get_beamline(self, short_form): assert short_form is not None # special cases - if short_form == 'i02-2': - self._beamline = 'BL02I' + if short_form == "i02-2": + self._beamline = "BL02I" else: bl_letter = short_form[0].upper() - bl_nums = short_form[1:].split('-') + bl_nums = short_form[1:].split("-") if len(bl_nums) == 2: bl_letter = chr(ord(bl_letter) + 1) bl_no = int(bl_nums[0]) - self._beamline = \ - "BL{:02d}{}".format(bl_no, bl_letter) + self._beamline = f"BL{bl_no:02d}{bl_letter}" except (IndexError, AssertionError, ValueError, TypeError): print( "\n\nBeamline must be of the form i16 or i09-1." "\nCheck environment variable ${BEAMLINE} or use argument " - "--beamline (-b)") + "--beamline (-b)" + ) exit(1) def check_folders(self): diff --git a/src/dls_backup_bl/importjson.py b/src/dls_backup_bl/importjson.py index fe86207..dda04a7 100644 --- a/src/dls_backup_bl/importjson.py +++ b/src/dls_backup_bl/importjson.py @@ -1,9 +1,10 @@ from logging import getLogger from pathlib import Path -from dls_backup_bl.config import BackupsConfig, MotorController from dls_pmacanalyse import GlobalConfig +from dls_backup_bl.config import BackupsConfig, MotorController + log = getLogger(__name__) @@ -23,8 +24,6 @@ def import_json(cfg_file: Path, json_file): json_config.motion_controllers.pop(i) break json_config.motion_controllers.append(mc) - log.info("imported pmac {} at {}:{}".format( - pmac, details.host, details.port - )) + log.info(f"imported pmac {pmac} at {details.host}:{details.port}") json_config.save(json_file) diff --git a/src/dls_backup_bl/repository.py b/src/dls_backup_bl/repository.py index 260c101..0e94916 100644 --- a/src/dls_backup_bl/repository.py +++ b/src/dls_backup_bl/repository.py @@ -80,7 +80,7 @@ def commit_changes(defaults: Defaults, do_positions=False): # Link to beamline backup git repository in the motion area try: set_home() - + try: git_repo = Repo(defaults.backup_folder) except InvalidGitRepositoryError: @@ -134,7 +134,7 @@ def commit_changes(defaults: Defaults, do_positions=False): def restore_positions(defaults: Defaults): try: set_home() - + git_repo = Repo(defaults.backup_folder) cli = git_repo.git diff --git a/src/dls_backup_bl/tserver.py b/src/dls_backup_bl/tserver.py index c4b751f..19f9897 100644 --- a/src/dls_backup_bl/tserver.py +++ b/src/dls_backup_bl/tserver.py @@ -1,9 +1,9 @@ import hashlib +import os import re from logging import getLogger from pathlib import Path -import os import pexpect import requests @@ -11,9 +11,11 @@ log = getLogger(__name__) -requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += 'HIGH:!DH:!aNULL' +requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += "HIGH:!DH:!aNULL" # type: ignore try: - requests.packages.urllib3.contrib.pyopenssl.DEFAULT_SSL_CIPHER_LIST += 'HIGH:!DH:!aNULL' + requests.packages.urllib3.contrib.pyopenssl.DEFAULT_SSL_CIPHER_LIST += ( # type: ignore + "HIGH:!DH:!aNULL" + ) except AttributeError: # no pyopenssl support used / needed / available pass @@ -22,8 +24,14 @@ # todo make ts_type an enum # todo use Path instead of system class TsConfig: - def __init__(self, ts: str, backup_directory: Path, username: str = None, - password: str = None, ts_type: str = None): + def __init__( + self, + ts: str, + backup_directory: Path, + username: str | None = None, + password: str | None = None, + ts_type: str = "", + ): self.ts = ts self.path: Path = backup_directory self.desc = f"Terminal server {ts} type {ts_type}" @@ -37,8 +45,7 @@ def __init__(self, ts: str, backup_directory: Path, username: str = None, ) elif ts_type.lower() == "acs": self.success = self.get_acs_config( - username or "root", password or "tslinux", - "/mnt/flash/config.tgz" + username or "root", password or "tslinux", "/mnt/flash/config.tgz" ) elif ts_type.lower() == "acsold": self.success = self.get_acs_config( @@ -49,30 +56,28 @@ def __init__(self, ts: str, backup_directory: Path, username: str = None, @staticmethod def make_moxa_login(page: str, username: str, password: str): - match = re.search("(?:fake_challenge|FakeChallenge) value=([^>]*)>", - page) + match = re.search("(?:fake_challenge|FakeChallenge) value=([^>]*)>", page) if match is None: - raise ValueError( - "This web page that doesn't look like a moxa login screen" - ) + raise ValueError("This web page that doesn't look like a moxa login screen") fake_challenge = match.groups()[0] # do what the function SetPass() javascript does on the login screen - md = hashlib.md5(fake_challenge.encode('utf8')).hexdigest() + md = hashlib.md5(fake_challenge.encode("utf8")).hexdigest() p = "" for c in password: - p += "%x" % ord(c) + p += f"{ord(c):x}" md5_pass = "" for i in range(len(p)): m = int(p[i], 16) n = int(md[i], 16) md5_pass += "%x" % (m ^ n) - login_data = dict( - Username=username, - MD5Password=md5_pass, - Password='', - FakeChallenge=fake_challenge) + login_data = { + "Username": username, + "MD5Password": md5_pass, + "Password": "", + "FakeChallenge": fake_challenge, + } return login_data def get_moxa_config(self, username, password): @@ -87,8 +92,8 @@ def get_moxa_config(self, username, password): session.post(url, data=login, verify=False) response = session.get(f"{url}/ConfExp.htm", verify=False) - m = re.search(r'csrf_token value=([^>]*)>', response.text) - data = {'csrf_token': m[1]} if m else {} + m = re.search(r"csrf_token value=([^>]*)>", response.text) + data = {"csrf_token": m[1]} if m else {} response = session.post(f"{url}/Config.txt", data=data, verify=False) @@ -99,31 +104,31 @@ def get_moxa_config(self, username, password): def get_acs_config(self, username, password, remote_path): tar = self.path / (self.ts + "_config.tar.gz") - child = pexpect.spawn( - 'scp %s@%s:%s %s' % (username, self.ts, remote_path, str(tar))) + child = pexpect.spawn(f"scp {username}@{self.ts}:{remote_path} {str(tar)}") i = child.expect( - ['Are you sure you want to continue connecting (yes/no)?', - 'Password:'], timeout=120) + ["Are you sure you want to continue connecting (yes/no)?", "Password:"], + timeout=120, + ) if i == 0: child.sendline("yes") - child.expect('Password:', timeout=120) + child.expect("Password:", timeout=120) child.sendline(password) i = child.expect([pexpect.EOF, "scp: [^ ]* No such file or directory"]) try: os.chmod(str(tar), 0o664) - except Exception as e: - msg= "Warning: Permissions for ACS Terminal server backup file could not be changed." + except Exception: + msg = "Warning: Permissions for ACS Terminal server backup file could not be changed." log.critical(msg) pass if i == 1: - log.error("Remote path %s doesn't exist on this ACS" % remote_path) + log.error(f"Remote path {remote_path} doesn't exist on this ACS") return False else: return True def backup_terminal_server(server: str, ts_type: str, defaults: Defaults): - desc = "terminal server {} type {}".format(server, ts_type) + desc = f"terminal server {server} type {ts_type}" # If backup fails retry specified number of times before giving up for attempt_num in range(defaults.retries): @@ -131,19 +136,15 @@ def backup_terminal_server(server: str, ts_type: str, defaults: Defaults): try: t = TsConfig(server, defaults.ts_folder, None, None, ts_type) if t.success: - log.critical("SUCCESS backed up {}".format(desc)) + log.critical(f"SUCCESS backed up {desc}") else: - log.critical("ERROR failed to back up {}".format(desc)) + log.critical(f"ERROR failed to back up {desc}") except Exception: - msg = "ERROR: {} backup failed on attempt {} of {}".format( - desc, attempt_num + 1, defaults.retries) + msg = f"ERROR: {desc} backup failed on attempt {attempt_num + 1} of {defaults.retries}" log.debug(msg, exc_info=True) log.error(msg) continue break else: - msg = "ERROR: {} all {} attempts failed".format( - desc, defaults.retries - ) + msg = f"ERROR: {desc} all {defaults.retries} attempts failed" log.critical(msg) - diff --git a/src/dls_backup_bl/zebra.py b/src/dls_backup_bl/zebra.py index 34a782a..c51f6df 100644 --- a/src/dls_backup_bl/zebra.py +++ b/src/dls_backup_bl/zebra.py @@ -1,7 +1,7 @@ from logging import getLogger from time import sleep -from cothread.catools import caput, caget +from cothread.catools import caget, caput from dls_backup_bl.defaults import Defaults @@ -9,19 +9,20 @@ def backup_zebra(name: str, defaults: Defaults): - desc = "zebra {}".format(name) + desc = f"zebra {name}" - for AttemptNum in range(defaults.retries): + for _AttemptNum in range(defaults.retries): # noinspection PyBroadException try: - log.info('Backing up {}'.format(desc)) + log.info(f"Backing up {desc}") folder = defaults.zebra_folder / name # todo may need a (empty) temp path and then copy to zebra_folder - caput('%s:%s' % (str(name), 'CONFIG_FILE'), str(folder), datatype=999) - caput('%s:%s' % (str(name), 'CONFIG_WRITE.PROC'), 1, timeout=60, - wait=True) + caput("{}:{}".format(str(name), "CONFIG_FILE"), str(folder), datatype=999) + caput( + "{}:{}".format(str(name), "CONFIG_WRITE.PROC"), 1, timeout=60, wait=True + ) # Store button PV triggered successfully pv_name = f"{str(name)}:CONFIG_STATUS" log.info(f"checking status {pv_name}") @@ -38,18 +39,18 @@ def backup_zebra(name: str, defaults: Defaults): elif str(pv).startswith("Can't open '"): raise RuntimeWarning(pv) elif pv == "Done": - log.critical("SUCCESS backed up {}".format(desc)) + log.critical(f"SUCCESS backed up {desc}") except TimeoutError: - msg = "ERROR: Timeout connecting to {} check IOC".format(desc) + msg = f"ERROR: Timeout connecting to {desc} check IOC" log.error(msg) continue except BaseException: - msg = "ERROR: Problem backing up ".format(name) + msg = "ERROR: Problem backing up ".format() log.debug(msg, exc_info=True) log.error(msg) continue break else: - msg = "ERROR: {} all {} attempts failed".format(defaults.retries, desc) + msg = f"ERROR: {defaults.retries} all {desc} attempts failed" log.critical(msg) diff --git a/src/dls_backup_gui/backupeditor.py b/src/dls_backup_gui/backupeditor.py index 50edf1c..d964890 100644 --- a/src/dls_backup_gui/backupeditor.py +++ b/src/dls_backup_gui/backupeditor.py @@ -2,16 +2,25 @@ from logging import getLogger from pathlib import Path -from PyQt5.QtCore import Qt, QSize, QSettings, QRect -from PyQt5.QtGui import QStandardItemModel, QFont, QStandardItem +from PyQt5.QtCore import QRect, QSettings, QSize, Qt +from PyQt5.QtGui import QFont, QStandardItem, QStandardItemModel from PyQt5.QtWidgets import ( - QLabel, QWidget, QDesktopWidget, - QTabWidget, QTableView, QAbstractItemView, QPushButton, - QHBoxLayout, QToolBar, QStatusBar, - QVBoxLayout, QMessageBox + QAbstractItemView, + QDesktopWidget, + QHBoxLayout, + QLabel, + QMessageBox, + QPushButton, + QStatusBar, + QTableView, + QTabWidget, + QToolBar, + QVBoxLayout, + QWidget, ) from dls_backup_bl.config import BackupsConfig + from .entries import EntryPopup log = getLogger(__name__) @@ -35,14 +44,12 @@ def centre_window(self): # noinspection PyAttributeOutsideInit def initialise_ui(self): - # Set up the window and centre it on the screen - self.setWindowTitle('Backup Editor') + # Set up the window and centre it on the screen + self.setWindowTitle("Backup Editor") self.MinimumSize = QSize(750, 450) self.resize(self.MinimumSize) self.setMinimumSize(self.MinimumSize) - self.setWindowFlags( - Qt.WindowCloseButtonHint | - Qt.WindowMinimizeButtonHint) + self.setWindowFlags(Qt.WindowCloseButtonHint | Qt.WindowMinimizeButtonHint) # self.setWindowIcon(QIcon('icon.png')) self.centre_window() @@ -70,15 +77,15 @@ def initialise_ui(self): self.DeviceList.setSelectionBehavior(QAbstractItemView.SelectRows) # Create an Add Entry button - self.AddEntryButton = QPushButton('New', self) + self.AddEntryButton = QPushButton("New", self) self.AddEntryButton.setIconSize(QSize(24, 24)) # Create a Remove Entry button - self.RemoveEntryButton = QPushButton('Delete', self) + self.RemoveEntryButton = QPushButton("Delete", self) self.RemoveEntryButton.setIconSize(QSize(24, 24)) # Create an Edit Entry button - self.EditEntryButton = QPushButton('Edit', self) + self.EditEntryButton = QPushButton("Edit", self) self.EditEntryButton.setIconSize(QSize(24, 24)) # Create layout for the entry buttons @@ -90,15 +97,17 @@ def initialise_ui(self): # Add the table to the tab layout self.DeviceLayout = QHBoxLayout() self.DeviceLayout.addWidget(self.DeviceList) - # Set an initial state + # Set an initial state self.tab_widgets[0].setLayout(self.DeviceLayout) # Link the buttons to their actions self.Tabs.currentChanged.connect(self.tab_selected) self.AddEntryButton.clicked.connect( - partial(self.open_add_entry_dialog, edit_mode=False)) + partial(self.open_add_entry_dialog, edit_mode=False) + ) self.EditEntryButton.clicked.connect( - partial(self.open_add_entry_dialog, edit_mode=True)) + partial(self.open_add_entry_dialog, edit_mode=True) + ) self.RemoveEntryButton.clicked.connect(self.remove_entry) # Create a tool bar @@ -107,7 +116,7 @@ def initialise_ui(self): # Create a status bar self.StatusBar = QStatusBar(self) - # Create a file path label and set font + # Create a file path label and set font self.FilePathFont = QFont() self.FilePathFont.setBold(True) self.FilePathFont.setPointSize(12) @@ -155,9 +164,7 @@ def display_entries(self): for ColumnNum in range(0, self.NumColumns): self.CurrentColumnWidth = self.DeviceList.columnWidth(ColumnNum) - self.DeviceList.setColumnWidth( - ColumnNum, self.CurrentColumnWidth + 20 - ) + self.DeviceList.setColumnWidth(ColumnNum, self.CurrentColumnWidth + 20) self.DeviceList.selectRow(0) self.button_refresh() @@ -172,34 +179,35 @@ def button_refresh(self): def remove_entry(self): self.SelectedDeviceList = "" self.NumColumns = self.DeviceList.model().columnCount() - self.NumRows = int( - len(self.DeviceList.selectedIndexes()) / self.NumColumns) + self.NumRows = int(len(self.DeviceList.selectedIndexes()) / self.NumColumns) self.SelectedIndexes = self.DeviceList.selectedIndexes() for Row in range(0, self.NumRows): self.RowString = "" self.SelectedRow = self.SelectedIndexes[Row * self.NumColumns].row() for Column in range(0, self.NumColumns): - self.RowString = self.RowString + self.DeviceList.model().item( - self.SelectedRow, Column).text() + "\t" - self.SelectedDeviceList = self.SelectedDeviceList + "\n" + \ - self.RowString + self.RowString = ( + self.RowString + + self.DeviceList.model().item(self.SelectedRow, Column).text() + + "\t" + ) + self.SelectedDeviceList = self.SelectedDeviceList + "\n" + self.RowString # Find the number of rows before a removal - self.LastRow = (self.DeviceList.model().rowCount() - 1) + self.LastRow = self.DeviceList.model().rowCount() - 1 self.MsgBoxResponse = QMessageBox.question( - self, "Remove?", + self, + "Remove?", "Are you sure you want to remove:\n" + self.SelectedDeviceList, - QMessageBox.Yes, QMessageBox.No + QMessageBox.Yes, + QMessageBox.No, ) if self.MsgBoxResponse == QMessageBox.Yes: - self.SelectedDevice = str( - self.Tabs.tabText(self.Tabs.currentIndex())) + self.SelectedDevice = str(self.Tabs.tabText(self.Tabs.currentIndex())) self.SelectedIndexes.sort() self.LastSelectedRow = self.SelectedIndexes[-1].row() for Row in range((self.NumRows - 1), -1, -1): - self.SelectedRow = self.SelectedIndexes[ - Row * self.NumColumns].row() + self.SelectedRow = self.SelectedIndexes[Row * self.NumColumns].row() # print self.SelectedRow del self.config[self.SelectedDevice][self.SelectedRow] self.config.save(self.file) @@ -208,13 +216,12 @@ def remove_entry(self): # If the selected index was the last row in the list if self.LastSelectedRow == self.LastRow: # Select the new bottom of the list - self.NewSelectedRow = (self.DeviceList.model().rowCount() - 1) + self.NewSelectedRow = self.DeviceList.model().rowCount() - 1 else: # Otherwise select the same index in the list self.NewSelectedRow = self.LastSelectedRow # Create an index from this row and set it - self.NewIndex = self.DeviceList.model().index( - self.NewSelectedRow, 0) + self.NewIndex = self.DeviceList.model().index(self.NewSelectedRow, 0) self.DeviceList.setCurrentIndex(self.NewIndex) def open_add_entry_dialog(self, edit_mode): diff --git a/src/dls_backup_gui/dls_backup_gui.py b/src/dls_backup_gui/dls_backup_gui.py index db38d25..c66776f 100644 --- a/src/dls_backup_gui/dls_backup_gui.py +++ b/src/dls_backup_gui/dls_backup_gui.py @@ -7,6 +7,7 @@ from PyQt5.QtWidgets import QApplication, QMessageBox from dls_backup_bl.defaults import Defaults + from .backupeditor import BackupEditor log = getLogger(__name__) @@ -15,21 +16,37 @@ def parse_args(): # Setup an argument Parser parser = argparse.ArgumentParser( - description='Edit a dls-backup-bl beamline configuration file', - usage="%(prog)s [options]") - parser.add_argument('-b', '--beamline', action="store", - help="Name of the beamline to backup. " - "The format is 'i16' or 'b07'. Defaults to " - "the current beamline") - parser.add_argument('--domain', action="store", - help='When BLXXY is not appropriate, use domain for' - ' the backup folder name. e.g. --domain ME01D') - parser.add_argument('-j', action="store", dest="json_file", - help="JSON file of devices to be backed up. " - "Defaults to DIR/$(BEAMLINE)-backup.json") - parser.add_argument('-l', '--log-level', action="store", - default='info', - help="Set logging to error, warning, info, debug") + description="Edit a dls-backup-bl beamline configuration file", + usage="%(prog)s [options]", + ) + parser.add_argument( + "-b", + "--beamline", + action="store", + help="Name of the beamline to backup. " + "The format is 'i16' or 'b07'. Defaults to " + "the current beamline", + ) + parser.add_argument( + "--domain", + action="store", + help="When BLXXY is not appropriate, use domain for" + " the backup folder name. e.g. --domain ME01D", + ) + parser.add_argument( + "-j", + action="store", + dest="json_file", + help="JSON file of devices to be backed up. " + "Defaults to DIR/$(BEAMLINE)-backup.json", + ) + parser.add_argument( + "-l", + "--log-level", + action="store", + default="info", + help="Set logging to error, warning, info, debug", + ) # Parse the command line arguments return parser.parse_args() @@ -40,21 +57,24 @@ def main(): # console log file for immediate feedback numeric_level = getattr(logging, args.log_level.upper(), None) - logging.basicConfig( - level=numeric_level - ) + logging.basicConfig(level=numeric_level) - defaults = Defaults(beamline=args.beamline, config_file=args.json_file, - config_file_only=True, - domain=args.domain) + defaults = Defaults( + beamline=args.beamline, + config_file=args.json_file, + config_file_only=True, + domain=args.domain, + ) app = QApplication(sys.argv) if not defaults.config_file.exists(): go = QMessageBox.question( - None, "New Backup Area", + None, + "New Backup Area", f"There is no backup area for {defaults.beamline}\n" f"do you want to create one ?", - QMessageBox.Yes, QMessageBox.No + QMessageBox.Yes, + QMessageBox.No, ) else: go = True diff --git a/src/dls_backup_gui/entries.py b/src/dls_backup_gui/entries.py index 814c8c2..8424f41 100644 --- a/src/dls_backup_gui/entries.py +++ b/src/dls_backup_gui/entries.py @@ -1,18 +1,21 @@ #!/usr/bin/python -# -*- coding: utf-8 -*- # WHY DOES THIS NOT WORK?? # from .backupeditor import BackupEditor -from collections import OrderedDict from functools import partial from logging import getLogger from PyQt5.QtWidgets import ( - QDialog, QGridLayout, - QLineEdit, QLabel, QPushButton, - QHBoxLayout, QVBoxLayout, - QMessageBox) + QDialog, + QGridLayout, + QHBoxLayout, + QLabel, + QLineEdit, + QMessageBox, + QPushButton, + QVBoxLayout, +) log = getLogger(__name__) @@ -23,7 +26,7 @@ def __init__(self, EditMode, parent=None): self.parent = parent self.EditMode = EditMode - # Create the required layouts + # Create the required layouts self.GridLayout = QGridLayout() self.ButtonsLayout = QHBoxLayout() self.VerLayout = QVBoxLayout() @@ -42,31 +45,32 @@ def __init__(self, EditMode, parent=None): self.GridLayout.addWidget(temp, i, 1) # Create the cancel button and add it to the buttons layout - self.CancelButton = QPushButton('Cancel', self) + self.CancelButton = QPushButton("Cancel", self) self.CancelButton.clicked.connect(self.close) self.ButtonsLayout.addWidget(self.CancelButton) # Setup the add next button and add it to the button layout - self.AddNextButton = QPushButton('Add Next', self) + self.AddNextButton = QPushButton("Add Next", self) self.AddNextButton.setEnabled(False) - self.AddNextButton.clicked.connect( - partial(self.AddEditEntry, EditMode, True)) + self.AddNextButton.clicked.connect(partial(self.AddEditEntry, EditMode, True)) self.ButtonsLayout.addWidget(self.AddNextButton) if EditMode: self.AddNextButton.setVisible(False) - self.setWindowTitle('Edit Entry') + self.setWindowTitle("Edit Entry") for n in range(0, len(self.FieldsList)): self.LineEditList[n].setText( - self.parent.DeviceList.selectedIndexes()[n].data()) + self.parent.DeviceList.selectedIndexes()[n].data() + ) else: - self.setWindowTitle('Add Entry') + self.setWindowTitle("Add Entry") # Setup the finish button and add it to the button layout - self.AddFinishButton = QPushButton('Finished', self) + self.AddFinishButton = QPushButton("Finished", self) self.AddFinishButton.setEnabled(False) - self.AddFinishButton.clicked.connect(partial(self.AddEditEntry, - EditMode, False)) + self.AddFinishButton.clicked.connect( + partial(self.AddEditEntry, EditMode, False) + ) # self.Name.textChanged.connect(partial(self.TextChanged, EditMode)) self.ButtonsLayout.addWidget(self.AddFinishButton) @@ -80,9 +84,9 @@ def __init__(self, EditMode, parent=None): self.setLayout(self.VerLayout) def TextChanged(self, thing, obj): - for n, letter in enumerate(thing.text()): + for _n, letter in enumerate(thing.text()): UnicodeNum = letter.toUtf8() - Ordinal = ord(UnicodeNum) + ord(UnicodeNum) def ButtonVisibility(self): Present = False @@ -114,25 +118,20 @@ def ButtonVisibility(self): def AddEditEntry(self, EditMode, NextEntry): self.SelectedDevice = str( - self.parent.Tabs.tabText(self.parent.Tabs.currentIndex())) - self.RowNumber = self.parent.DeviceList.selectionModel( - ).currentIndex().row() - - values = [ - self.LineEditList[i].text() for i in - range(len(self.FieldsList)) - ] + self.parent.Tabs.tabText(self.parent.Tabs.currentIndex()) + ) + self.RowNumber = self.parent.DeviceList.selectionModel().currentIndex().row() + + values = [self.LineEditList[i].text() for i in range(len(self.FieldsList))] try: new_data = self.data_type(*values) except ValueError as e: QMessageBox.warning( - self, "Warning", - "Invalid field\n" + str(e), - QMessageBox.Ok) + self, "Warning", "Invalid field\n" + str(e), QMessageBox.Ok + ) else: if EditMode: - self.parent.config[self.SelectedDevice][ - self.RowNumber] = new_data + self.parent.config[self.SelectedDevice][self.RowNumber] = new_data else: self.parent.config[self.SelectedDevice].append(new_data) diff --git a/tests/test_brick.json b/tests/test_brick.json index 626d11e..7a5bcd4 100644 --- a/tests/test_brick.json +++ b/tests/test_brick.json @@ -29,4 +29,4 @@ ], "zebras": [ ] -} \ No newline at end of file +}