From 89776e5d1f93771f1f08282ad5cfce1e218c7303 Mon Sep 17 00:00:00 2001 From: Constantine Evans Date: Thu, 7 Mar 2024 16:06:01 +0000 Subject: [PATCH] Machine.block --- CHANGELOG.md | 4 +++ src/qslib/machine.py | 86 ++++++++++++++++++++++++-------------------- 2 files changed, 52 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fed47d4..61be70e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ SPDX-License-Identifier: EUPL-1.2 # Changelog +## Development + +- `Machine.block` command to directly control block temperatures. + ## Version 0.11.0 - Initial support for 384-well blocks, at least in data/file reading. diff --git a/src/qslib/machine.py b/src/qslib/machine.py index d718654..b564e5c 100644 --- a/src/qslib/machine.py +++ b/src/qslib/machine.py @@ -251,9 +251,7 @@ def run_command_to_ack(self, command: str | SCPICommand) -> str: raise ConnectionError(f"Not connected to {self.host}") loop = asyncio.get_event_loop() try: - return loop.run_until_complete( - self.connection.run_command(command, just_ack=True) - ) + return loop.run_until_complete(self.connection.run_command(command, just_ack=True)) except CommandError as e: e.__traceback__ = None raise e @@ -351,15 +349,11 @@ def list_files( ) -> list[str] | list[dict[str, Any]]: loop = asyncio.get_event_loop() return loop.run_until_complete( - self.connection.list_files( - path, leaf=leaf, verbose=verbose, recursive=recursive - ) + self.connection.list_files(path, leaf=leaf, verbose=verbose, recursive=recursive) ) @_ensure_connection(AccessLevel.Observer) - def read_file( - self, path: str, context: str | None = None, leaf: str = "FILE" - ) -> bytes: + def read_file(self, path: str, context: str | None = None, leaf: str = "FILE") -> bytes: """Read a file. Parameters @@ -375,9 +369,7 @@ def read_file( bytes returned file """ - return asyncio.get_event_loop().run_until_complete( - self.connection.read_file(path, context, leaf) - ) + return asyncio.get_event_loop().run_until_complete(self.connection.read_file(path, context, leaf)) @_ensure_connection(AccessLevel.Controller) def write_file(self, path: str, data: str | bytes) -> None: @@ -385,11 +377,7 @@ def write_file(self, path: str, data: str | bytes) -> None: data = data.encode() self.run_command_bytes( - b"FILE:WRITE " - + path.encode() - + b" \n" - + base64.encodebytes(data) - + b"\n" + b"FILE:WRITE " + path.encode() + b" \n" + base64.encodebytes(data) + b"\n" ) @_ensure_connection(AccessLevel.Observer) @@ -405,9 +393,7 @@ def list_runs_in_storage(self) -> list[str]: """ x = self.run_command("FILE:LIST? public_run_complete:") a = x.split("\n")[1:-1] - return [ - re.sub("^public_run_complete:", "", s)[:-4] for s in a if s.endswith(".eds") - ] + return [re.sub("^public_run_complete:", "", s)[:-4] for s in a if s.endswith(".eds")] @_ensure_connection(AccessLevel.Observer) def load_run_from_storage(self, path: str) -> "Experiment": # type: ignore @@ -418,9 +404,7 @@ def load_run_from_storage(self, path: str) -> "Experiment": # type: ignore return Experiment.from_machine_storage(self, path) @_ensure_connection(AccessLevel.Guest) - def save_run_from_storage( - self, machine_path: str, download_path: str | IO[bytes], overwrite: bool = False - ) -> None: + def save_run_from_storage(self, machine_path: str, download_path: str | IO[bytes], overwrite: bool = False) -> None: """Download a file from run storage on the machine. Parameters @@ -451,9 +435,7 @@ def save_run_from_storage( @_ensure_connection(AccessLevel.Observer) def _get_log_from_byte(self, name: str | bytes, byte: int) -> bytes: - logfuture: Future[ - tuple[bytes, bytes, Future[tuple[bytes, bytes, None]] | None] - ] = asyncio.Future() + logfuture: Future[tuple[bytes, bytes, Future[tuple[bytes, bytes, None]] | None]] = asyncio.Future() if self.connection is None: raise Exception if isinstance(name, bytes): @@ -489,9 +471,7 @@ def machine_status(self) -> MachineStatus: @_ensure_connection(AccessLevel.Observer) def get_running_protocol(self) -> Protocol: p = _unwrap_tags(self.run_command("PROT? ${Protocol}")) - pn, svs, rm = self.run_command( - "RET ${Protocol} ${SampleVolume} ${RunMode}" - ).split() + pn, svs, rm = self.run_command("RET ${Protocol} ${SampleVolume} ${RunMode}").split() p = f"PROT -volume={svs} -runmode={rm} {pn} " + p return Protocol.from_scpicommand(SCPICommand.from_string(p)) @@ -509,9 +489,7 @@ def set_access_level( " Change max_access level to continue." ) - self.run_command( - f"ACC -stealth={stealth} -exclusive={exclusive} {access_level}" - ) + self.run_command(f"ACC -stealth={stealth} -exclusive={exclusive} {access_level}") log.debug(f"Took access level {access_level} {exclusive=} {stealth=}") self._current_access_level = access_level @@ -560,6 +538,42 @@ def drawer_close(self, lower_cover: bool = True, check: bool = True) -> None: if lower_cover: self.cover_lower(check=check, ensure_drawer=False) + @property + @_ensure_connection(AccessLevel.Observer) + def block(self) -> tuple[bool, float]: + """Returns whether the block is currently temperature-controlled, and the current block temperature setting.""" + sbool, v = self.run_command("BLOCK?").split() + sbool = sbool.lower() + v = float(v) + + if sbool == "on": + return True, v + elif sbool == "off": + return False, v + else: + raise ValueError(f"Block status {sbool} {v} is not understood.") + + @block.setter + @_ensure_connection(AccessLevel.Controller) + def block(self, value: float | None | False | True | tuple[bool, float]): + """Set the block temperature control. + + If a float is given, it will be set to that temperature; None or False will + turn off the block temperature control, and True will turn it on at the current set temperature. A tuple can be given + to specify both the on/off status and the temperature.""" + if (value is None) or (value is False): + bcom = "OFF" + elif value is True: + bcom = "ON" + elif isinstance(value, tuple): + bcom = f"{'ON' if value[0] else 'OFF'} {float(value[1])}" + else: + try: + bcom = f"ON {float(value)}" + except ValueError: + raise ValueError(f"Block value {value} is not understood.") + self.run_command(f"BLOCK {bcom}") + @property def status(self) -> RunStatus: """Return the current status of the run.""" @@ -693,14 +707,10 @@ def at_access( log.debug(f"Took access level {access_level} {exclusive=} {stealth=}.") yield self self.set_access_level(fac, fex, fst) - log.debug( - f"Dropped access level {access_level}, returning to {fac} exclusive={fex} stealth={fst}." - ) + log.debug(f"Dropped access level {access_level}, returning to {fac} exclusive={fex} stealth={fst}.") @contextmanager - def ensured_connection( - self, access_level: AccessLevel = AccessLevel.Observer - ) -> Generator[Machine, None, None]: + def ensured_connection(self, access_level: AccessLevel = AccessLevel.Observer) -> Generator[Machine, None, None]: if self.automatic: was_connected = self.connected if not was_connected: